[pypy-commit] pypy py3.5: Implement the win32 function ssl.enum_certificates()

arigo pypy.commits at gmail.com
Mon Apr 23 12:15:22 EDT 2018


Author: Armin Rigo <arigo at tunes.org>
Branch: py3.5
Changeset: r94428:b129a34163ac
Date: 2018-04-23 17:47 +0200
http://bitbucket.org/pypy/pypy/changeset/b129a34163ac/

Log:	Implement the win32 function ssl.enum_certificates()

diff --git a/lib_pypy/_cffi_ssl/README.md b/lib_pypy/_cffi_ssl/README.md
--- a/lib_pypy/_cffi_ssl/README.md
+++ b/lib_pypy/_cffi_ssl/README.md
@@ -14,6 +14,8 @@
 
 * ``_cffi_src/openssl/x509_vfy.py`` for issue #2605 (ca4d0c90f5a1)
 
+* ``_cffi_src/openssl/pypy_win32_extra.py`` for Win32-only functionality like ssl.enum_certificates()
+
 
 # Tests?
 
diff --git a/lib_pypy/_cffi_ssl/_cffi_src/openssl/pypy_win32_extra.py b/lib_pypy/_cffi_ssl/_cffi_src/openssl/pypy_win32_extra.py
new file mode 100644
--- /dev/null
+++ b/lib_pypy/_cffi_ssl/_cffi_src/openssl/pypy_win32_extra.py
@@ -0,0 +1,68 @@
+#
+# An extra bit of logic for the Win32-only functionality that is missing from the
+# version from cryptography.
+#
+
+import sys
+
+INCLUDES = """
+#include <Wincrypt.h>
+"""
+
+TYPES = """
+typedef ... *HCERTSTORE;
+typedef ... *HCRYPTPROV_LEGACY;
+typedef struct {
+    DWORD      dwCertEncodingType;
+    BYTE       *pbCertEncoded;
+    DWORD      cbCertEncoded;
+    ...;
+} CERT_CONTEXT, *PCCERT_CONTEXT;
+typedef struct {
+    DWORD cUsageIdentifier;
+    LPSTR *rgpszUsageIdentifier;
+    ...;
+} CERT_ENHKEY_USAGE, *PCERT_ENHKEY_USAGE;
+"""
+
+FUNCTIONS = """
+HCERTSTORE WINAPI CertOpenStore(
+         LPCSTR            lpszStoreProvider,
+         DWORD             dwMsgAndCertEncodingType,
+         HCRYPTPROV_LEGACY hCryptProv,
+         DWORD             dwFlags,
+         const char        *pvPara
+);
+PCCERT_CONTEXT WINAPI CertEnumCertificatesInStore(
+         HCERTSTORE     hCertStore,
+         PCCERT_CONTEXT pPrevCertContext
+);
+BOOL WINAPI CertFreeCertificateContext(
+         PCCERT_CONTEXT pCertContext
+);
+BOOL WINAPI CertCloseStore(
+         HCERTSTORE hCertStore,
+         DWORD      dwFlags
+);
+BOOL WINAPI CertGetEnhancedKeyUsage(
+         PCCERT_CONTEXT     pCertContext,
+         DWORD              dwFlags,
+         PCERT_ENHKEY_USAGE pUsage,
+         DWORD              *pcbUsage
+);
+"""
+
+MACROS = """
+#define CERT_STORE_READONLY_FLAG ...
+#define CERT_SYSTEM_STORE_LOCAL_MACHINE ...
+#define CRYPT_E_NOT_FOUND ...
+#define CERT_FIND_PROP_ONLY_ENHKEY_USAGE_FLAG ...
+#define CERT_FIND_EXT_ONLY_ENHKEY_USAGE_FLAG ...
+#define X509_ASN_ENCODING ...
+#define PKCS_7_ASN_ENCODING ...
+
+static const LPCSTR CERT_STORE_PROV_SYSTEM_A;
+"""
+
+CUSTOMIZATIONS = """
+"""
diff --git a/lib_pypy/_cffi_ssl/_stdssl/__init__.py b/lib_pypy/_cffi_ssl/_stdssl/__init__.py
--- a/lib_pypy/_cffi_ssl/_stdssl/__init__.py
+++ b/lib_pypy/_cffi_ssl/_stdssl/__init__.py
@@ -24,6 +24,7 @@
 from enum import IntEnum as _IntEnum
 
 if sys.platform == 'win32':
+    from _cffi_ssl._stdssl.win32_extra import enum_certificates
     HAVE_POLL = False
 else:
     from select import poll, POLLIN, POLLOUT
diff --git a/lib_pypy/_cffi_ssl/_stdssl/win32_extra.py b/lib_pypy/_cffi_ssl/_stdssl/win32_extra.py
new file mode 100644
--- /dev/null
+++ b/lib_pypy/_cffi_ssl/_stdssl/win32_extra.py
@@ -0,0 +1,66 @@
+from _pypy_openssl import lib, ffi
+
+
+def enum_certificates(store_name):
+    """Retrieve certificates from Windows' cert store.
+
+store_name may be one of 'CA', 'ROOT' or 'MY'.  The system may provide
+more cert storages, too.  The function returns a list of (bytes,
+encoding_type, trust) tuples.  The encoding_type flag can be interpreted
+with X509_ASN_ENCODING or PKCS_7_ASN_ENCODING. The trust setting is either
+a set of OIDs or the boolean True.
+    """
+    hStore = lib.CertOpenStore(lib.CERT_STORE_PROV_SYSTEM_A, 0, ffi.NULL,
+                               lib.CERT_STORE_READONLY_FLAG | lib.CERT_SYSTEM_STORE_LOCAL_MACHINE,
+                               bytes(store_name, "ascii"))
+    if hStore == ffi.NULL:
+        raise WindowsError(*ffi.getwinerror())
+    
+    result = []
+    pCertCtx = ffi.NULL
+    while True:
+        pCertCtx = lib.CertEnumCertificatesInStore(hStore, pCertCtx)
+        if pCertCtx == ffi.NULL:
+            break
+        cert = ffi.buffer(pCertCtx.pbCertEncoded, pCertCtx.cbCertEncoded)[:]
+        enc = certEncodingType(pCertCtx.dwCertEncodingType)
+        keyusage = parseKeyUsage(pCertCtx, lib.CERT_FIND_PROP_ONLY_ENHKEY_USAGE_FLAG)
+        if keyusage is True:
+            keyusage = parseKeyUsage(pCertCtx, lib.CERT_FIND_EXT_ONLY_ENHKEY_USAGE_FLAG)
+        result.append((cert, enc, keyusage))
+
+    if pCertCtx != ffi.NULL:
+        lib.CertFreeCertificateContext(pCertCtx)
+    lib.CertCloseStore(hStore, 0)
+    return result
+
+
+def certEncodingType(encodingType):
+    if encodingType == lib.X509_ASN_ENCODING:
+        return "x509_asn"
+    if encodingType == lib.PKCS_7_ASN_ENCODING:
+        return "pkcs_7_asn"
+    return encodingType
+
+def parseKeyUsage(pCertCtx, flags):
+    pSize = ffi.new("DWORD *")
+    if not lib.CertGetEnhancedKeyUsage(pCertCtx, flags, ffi.NULL, pSize):
+        error_with_message = ffi.getwinerror()
+        if error_with_message[0] == lib.CRYPT_E_NOT_FOUND:
+            return True
+        raise WindowsError(*error_with_message)
+
+    pUsageMem = ffi.new("char[]", pSize[0])
+    pUsage = ffi.cast("PCERT_ENHKEY_USAGE", pUsageMem)
+    if not lib.CertGetEnhancedKeyUsage(pCertCtx, flags, pUsage, pSize):
+        error_with_message = ffi.getwinerror()
+        if error_with_message[0] == lib.CRYPT_E_NOT_FOUND:
+            return True
+        raise WindowsError(*error_with_message)
+
+    retval = set()
+    for i in range(pUsage.cUsageIdentifier):
+        if pUsage.rgpszUsageIdentifier[i]:
+            oid = ffi.string(pUsage.rgpszUsageIdentifier[i]).decode('ascii')
+            retval.add(oid)
+    return retval
diff --git a/lib_pypy/_ssl/__init__.py b/lib_pypy/_ssl/__init__.py
--- a/lib_pypy/_ssl/__init__.py
+++ b/lib_pypy/_ssl/__init__.py
@@ -16,12 +16,14 @@
     RAND_egd          = builtinify(RAND_egd)
 
 import sys
-if sys.platform == "win32" and 'enum_certificates' not in globals():
-    def enum_certificates(*args, **kwds):
-        import warnings
-        warnings.warn("ssl.enum_certificates() is not implemented")
-        return []
-    def enum_crls(*args, **kwds):
-        import warnings
-        warnings.warn("ssl.enum_crls() is not implemented")
-        return []
+if sys.platform == "win32":
+    if 'enum_certificates' not in globals():
+        def enum_certificates(*args, **kwds):
+            import warnings
+            warnings.warn("ssl.enum_certificates() is not implemented")
+            return []
+    if 'enum_crls' not in globals():
+        def enum_crls(*args, **kwds):
+            import warnings
+            warnings.warn("ssl.enum_crls() is not implemented")
+            return []
diff --git a/lib_pypy/_ssl_build.py b/lib_pypy/_ssl_build.py
--- a/lib_pypy/_ssl_build.py
+++ b/lib_pypy/_ssl_build.py
@@ -5,6 +5,11 @@
 from _cffi_ssl._cffi_src.build_openssl import (build_ffi_for_binding,
         _get_openssl_libraries, extra_link_args, compiler_type)
 
+if sys.platform == "win32":
+    pypy_win32_extra = ["pypy_win32_extra"]
+else:
+    pypy_win32_extra = []
+
 ffi = build_ffi_for_binding(
     module_name="_pypy_openssl",
     module_prefix="_cffi_src.openssl.",
@@ -44,10 +49,10 @@
         "x509_vfy",
         "pkcs7",
         "callbacks",
-    ],
+    ] + pypy_win32_extra,
     libraries=_get_openssl_libraries(sys.platform),
     extra_link_args=extra_link_args(compiler_type()),
 )
 
 if __name__ == '__main__':
-    ffi.compile()
+    ffi.compile(verbose=True)


More information about the pypy-commit mailing list