[Python-checkins] cpython: Issue #18138: Implement cadata argument of SSLContext.load_verify_location()

christian.heimes python-checkins at python.org
Thu Nov 21 03:35:12 CET 2013


http://hg.python.org/cpython/rev/234e3c8dc52f
changeset:   87302:234e3c8dc52f
user:        Christian Heimes <christian at cheimes.de>
date:        Thu Nov 21 03:35:02 2013 +0100
summary:
  Issue #18138: Implement cadata argument of SSLContext.load_verify_location()
to load CA certificates and CRL from memory. It supports PEM and DER
encoded strings.

files:
  Doc/library/ssl.rst  |   11 +-
  Lib/test/test_ssl.py |   88 ++++++++++++-
  Misc/NEWS            |    4 +
  Modules/_ssl.c       |  208 +++++++++++++++++++++++++-----
  4 files changed, 274 insertions(+), 37 deletions(-)


diff --git a/Doc/library/ssl.rst b/Doc/library/ssl.rst
--- a/Doc/library/ssl.rst
+++ b/Doc/library/ssl.rst
@@ -821,6 +821,7 @@
 
    .. versionadded:: 3.4
 
+
 .. method:: SSLContext.load_cert_chain(certfile, keyfile=None, password=None)
 
    Load a private key and the corresponding certificate.  The *certfile*
@@ -851,7 +852,7 @@
    .. versionchanged:: 3.3
       New optional argument *password*.
 
-.. method:: SSLContext.load_verify_locations(cafile=None, capath=None)
+.. method:: SSLContext.load_verify_locations(cafile=None, capath=None, cadata=None)
 
    Load a set of "certification authority" (CA) certificates used to validate
    other peers' certificates when :data:`verify_mode` is other than
@@ -867,6 +868,14 @@
    following an `OpenSSL specific layout
    <http://www.openssl.org/docs/ssl/SSL_CTX_load_verify_locations.html>`_.
 
+   The *cadata* object, if present, is either an ASCII string of one or more
+   PEM-encoded certificates or a bytes-like object of DER-encoded
+   certificates. Like with *capath* extra lines around PEM-encoded
+   certificates are ignored but at least one certificate must be present.
+
+   .. versionchanged:: 3.4
+      New optional argument *cadata*
+
 .. method:: SSLContext.get_ca_certs(binary_form=False)
 
    Get a list of loaded "certification authority" (CA) certificates. If the
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py
--- a/Lib/test/test_ssl.py
+++ b/Lib/test/test_ssl.py
@@ -25,7 +25,8 @@
 PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
 HOST = support.HOST
 
-data_file = lambda name: os.path.join(os.path.dirname(__file__), name)
+def data_file(*name):
+    return os.path.join(os.path.dirname(__file__), *name)
 
 # The custom key and certificate files used in test_ssl are generated
 # using Lib/test/make_ssl_certs.py.
@@ -43,6 +44,9 @@
 KEY_PASSWORD = "somepass"
 CAPATH = data_file("capath")
 BYTES_CAPATH = os.fsencode(CAPATH)
+CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
+CAFILE_CACERT = data_file("capath", "5ed36f99.0")
+
 
 # Two keys and certs signed by the same CA (for SNI tests)
 SIGNED_CERTFILE = data_file("keycert3.pem")
@@ -726,7 +730,7 @@
         ctx.load_verify_locations(BYTES_CERTFILE)
         ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
         self.assertRaises(TypeError, ctx.load_verify_locations)
-        self.assertRaises(TypeError, ctx.load_verify_locations, None, None)
+        self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
         with self.assertRaises(OSError) as cm:
             ctx.load_verify_locations(WRONGCERT)
         self.assertEqual(cm.exception.errno, errno.ENOENT)
@@ -738,6 +742,64 @@
         # Issue #10989: crash if the second argument type is invalid
         self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
 
+    def test_load_verify_cadata(self):
+        # test cadata
+        with open(CAFILE_CACERT) as f:
+            cacert_pem = f.read()
+        cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
+        with open(CAFILE_NEURONIO) as f:
+            neuronio_pem = f.read()
+        neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
+
+        # test PEM
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
+        ctx.load_verify_locations(cadata=cacert_pem)
+        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
+        ctx.load_verify_locations(cadata=neuronio_pem)
+        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
+        # cert already in hash table
+        ctx.load_verify_locations(cadata=neuronio_pem)
+        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
+
+        # combined
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+        combined = "\n".join((cacert_pem, neuronio_pem))
+        ctx.load_verify_locations(cadata=combined)
+        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
+
+        # with junk around the certs
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+        combined = ["head", cacert_pem, "other", neuronio_pem, "again",
+                    neuronio_pem, "tail"]
+        ctx.load_verify_locations(cadata="\n".join(combined))
+        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
+
+        # test DER
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+        ctx.load_verify_locations(cadata=cacert_der)
+        ctx.load_verify_locations(cadata=neuronio_der)
+        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
+        # cert already in hash table
+        ctx.load_verify_locations(cadata=cacert_der)
+        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
+
+        # combined
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+        combined = b"".join((cacert_der, neuronio_der))
+        ctx.load_verify_locations(cadata=combined)
+        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
+
+        # error cases
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+        self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
+
+        with self.assertRaisesRegex(ssl.SSLError, "no start line"):
+            ctx.load_verify_locations(cadata="broken")
+        with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
+            ctx.load_verify_locations(cadata=b"broken")
+
+
     def test_load_dh_params(self):
         ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
         ctx.load_dh_params(DHFILE)
@@ -1057,6 +1119,28 @@
             finally:
                 s.close()
 
+    def test_connect_cadata(self):
+        with open(CAFILE_CACERT) as f:
+            pem = f.read()
+        der = ssl.PEM_cert_to_DER_cert(pem)
+        with support.transient_internet("svn.python.org"):
+            ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+            ctx.verify_mode = ssl.CERT_REQUIRED
+            ctx.load_verify_locations(cadata=pem)
+            with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
+                s.connect(("svn.python.org", 443))
+                cert = s.getpeercert()
+                self.assertTrue(cert)
+
+            # same with DER
+            ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+            ctx.verify_mode = ssl.CERT_REQUIRED
+            ctx.load_verify_locations(cadata=der)
+            with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
+                s.connect(("svn.python.org", 443))
+                cert = s.getpeercert()
+                self.assertTrue(cert)
+
     @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
     def test_makefile_close(self):
         # Issue #5238: creating a file-like object with makefile() shouldn't
diff --git a/Misc/NEWS b/Misc/NEWS
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -59,6 +59,10 @@
 Library
 -------
 
+- Issue #18138: Implement cadata argument of SSLContext.load_verify_location()
+  to load CA certificates and CRL from memory. It supports PEM and DER
+  encoded strings.
+
 - Issue #18775: Add name and block_size attribute to HMAC object. They now
   provide the same API elements as non-keyed cryptographic hash functions.
 
diff --git a/Modules/_ssl.c b/Modules/_ssl.c
--- a/Modules/_ssl.c
+++ b/Modules/_ssl.c
@@ -2304,60 +2304,200 @@
     return NULL;
 }
 
+/* internal helper function, returns -1 on error
+ */
+static int
+_add_ca_certs(PySSLContext *self, void *data, Py_ssize_t len,
+              int filetype)
+{
+    BIO *biobuf = NULL;
+    X509_STORE *store;
+    int retval = 0, err, loaded = 0;
+
+    assert(filetype == SSL_FILETYPE_ASN1 || filetype == SSL_FILETYPE_PEM);
+
+    if (len <= 0) {
+        PyErr_SetString(PyExc_ValueError,
+                        "Empty certificate data");
+        return -1;
+    } else if (len > INT_MAX) {
+        PyErr_SetString(PyExc_OverflowError,
+                        "Certificate data is too long.");
+        return -1;
+    }
+
+    biobuf = BIO_new_mem_buf(data, len);
+    if (biobuf == NULL) {
+        _setSSLError("Can't allocate buffer", 0, __FILE__, __LINE__);
+        return -1;
+    }
+
+    store = SSL_CTX_get_cert_store(self->ctx);
+    assert(store != NULL);
+
+    while (1) {
+        X509 *cert = NULL;
+        int r;
+
+        if (filetype == SSL_FILETYPE_ASN1) {
+            cert = d2i_X509_bio(biobuf, NULL);
+        } else {
+            cert = PEM_read_bio_X509(biobuf, NULL,
+                                     self->ctx->default_passwd_callback,
+                                     self->ctx->default_passwd_callback_userdata);
+        }
+        if (cert == NULL) {
+            break;
+        }
+        r = X509_STORE_add_cert(store, cert);
+        X509_free(cert);
+        if (!r) {
+            err = ERR_peek_last_error();
+            if ((ERR_GET_LIB(err) == ERR_LIB_X509) &&
+                (ERR_GET_REASON(err) == X509_R_CERT_ALREADY_IN_HASH_TABLE)) {
+                /* cert already in hash table, not an error */
+                ERR_clear_error();
+            } else {
+                break;
+            }
+        }
+        loaded++;
+    }
+
+    err = ERR_peek_last_error();
+    if ((filetype == SSL_FILETYPE_ASN1) &&
+            (loaded > 0) &&
+            (ERR_GET_LIB(err) == ERR_LIB_ASN1) &&
+            (ERR_GET_REASON(err) == ASN1_R_HEADER_TOO_LONG)) {
+        /* EOF ASN1 file, not an error */
+        ERR_clear_error();
+        retval = 0;
+    } else if ((filetype == SSL_FILETYPE_PEM) &&
+                   (loaded > 0) &&
+                   (ERR_GET_LIB(err) == ERR_LIB_PEM) &&
+                   (ERR_GET_REASON(err) == PEM_R_NO_START_LINE)) {
+        /* EOF PEM file, not an error */
+        ERR_clear_error();
+        retval = 0;
+    } else {
+        _setSSLError(NULL, 0, __FILE__, __LINE__);
+        retval = -1;
+    }
+
+    BIO_free(biobuf);
+    return retval;
+}
+
+
 static PyObject *
 load_verify_locations(PySSLContext *self, PyObject *args, PyObject *kwds)
 {
-    char *kwlist[] = {"cafile", "capath", NULL};
-    PyObject *cafile = NULL, *capath = NULL;
+    char *kwlist[] = {"cafile", "capath", "cadata", NULL};
+    PyObject *cafile = NULL, *capath = NULL, *cadata = NULL;
     PyObject *cafile_bytes = NULL, *capath_bytes = NULL;
     const char *cafile_buf = NULL, *capath_buf = NULL;
-    int r;
+    int r = 0, ok = 1;
 
     errno = 0;
     if (!PyArg_ParseTupleAndKeywords(args, kwds,
-        "|OO:load_verify_locations", kwlist,
-        &cafile, &capath))
+        "|OOO:load_verify_locations", kwlist,
+        &cafile, &capath, &cadata))
         return NULL;
+
     if (cafile == Py_None)
         cafile = NULL;
     if (capath == Py_None)
         capath = NULL;
-    if (cafile == NULL && capath == NULL) {
+    if (cadata == Py_None)
+        cadata = NULL;
+
+    if (cafile == NULL && capath == NULL && cadata == NULL) {
         PyErr_SetString(PyExc_TypeError,
-                        "cafile and capath cannot be both omitted");
-        return NULL;
+                        "cafile, capath and cadata cannot be all omitted");
+        goto error;
     }
     if (cafile && !PyUnicode_FSConverter(cafile, &cafile_bytes)) {
         PyErr_SetString(PyExc_TypeError,
                         "cafile should be a valid filesystem path");
+        goto error;
+    }
+    if (capath && !PyUnicode_FSConverter(capath, &capath_bytes)) {
+        PyErr_SetString(PyExc_TypeError,
+                        "capath should be a valid filesystem path");
+        goto error;
+    }
+
+    /* validata cadata type and load cadata */
+    if (cadata) {
+        Py_buffer buf;
+        PyObject *cadata_ascii = NULL;
+
+        if (PyObject_GetBuffer(cadata, &buf, PyBUF_SIMPLE) == 0) {
+            if (!PyBuffer_IsContiguous(&buf, 'C') || buf.ndim > 1) {
+                PyBuffer_Release(&buf);
+                PyErr_SetString(PyExc_TypeError,
+                                "cadata should be a contiguous buffer with "
+                                "a single dimension");
+                goto error;
+            }
+            r = _add_ca_certs(self, buf.buf, buf.len, SSL_FILETYPE_ASN1);
+            PyBuffer_Release(&buf);
+            if (r == -1) {
+                goto error;
+            }
+        } else {
+            PyErr_Clear();
+            cadata_ascii = PyUnicode_AsASCIIString(cadata);
+            if (cadata_ascii == NULL) {
+                PyErr_SetString(PyExc_TypeError,
+                                "cadata should be a ASCII string or a "
+                                "bytes-like object");
+                goto error;
+            }
+            r = _add_ca_certs(self,
+                              PyBytes_AS_STRING(cadata_ascii),
+                              PyBytes_GET_SIZE(cadata_ascii),
+                              SSL_FILETYPE_PEM);
+            Py_DECREF(cadata_ascii);
+            if (r == -1) {
+                goto error;
+            }
+        }
+    }
+
+    /* load cafile or capath */
+    if (cafile || capath) {
+        if (cafile)
+            cafile_buf = PyBytes_AS_STRING(cafile_bytes);
+        if (capath)
+            capath_buf = PyBytes_AS_STRING(capath_bytes);
+        PySSL_BEGIN_ALLOW_THREADS
+        r = SSL_CTX_load_verify_locations(self->ctx, cafile_buf, capath_buf);
+        PySSL_END_ALLOW_THREADS
+        if (r != 1) {
+            ok = 0;
+            if (errno != 0) {
+                ERR_clear_error();
+                PyErr_SetFromErrno(PyExc_IOError);
+            }
+            else {
+                _setSSLError(NULL, 0, __FILE__, __LINE__);
+            }
+            goto error;
+        }
+    }
+    goto end;
+
+  error:
+    ok = 0;
+  end:
+    Py_XDECREF(cafile_bytes);
+    Py_XDECREF(capath_bytes);
+    if (ok) {
+        Py_RETURN_NONE;
+    } else {
         return NULL;
     }
-    if (capath && !PyUnicode_FSConverter(capath, &capath_bytes)) {
-        Py_XDECREF(cafile_bytes);
-        PyErr_SetString(PyExc_TypeError,
-                        "capath should be a valid filesystem path");
-        return NULL;
-    }
-    if (cafile)
-        cafile_buf = PyBytes_AS_STRING(cafile_bytes);
-    if (capath)
-        capath_buf = PyBytes_AS_STRING(capath_bytes);
-    PySSL_BEGIN_ALLOW_THREADS
-    r = SSL_CTX_load_verify_locations(self->ctx, cafile_buf, capath_buf);
-    PySSL_END_ALLOW_THREADS
-    Py_XDECREF(cafile_bytes);
-    Py_XDECREF(capath_bytes);
-    if (r != 1) {
-        if (errno != 0) {
-            ERR_clear_error();
-            PyErr_SetFromErrno(PyExc_IOError);
-        }
-        else {
-            _setSSLError(NULL, 0, __FILE__, __LINE__);
-        }
-        return NULL;
-    }
-    Py_RETURN_NONE;
 }
 
 static PyObject *

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list