[Python-checkins] cpython: Issue #18138: Implement cadata argument of SSLContext.load_verify_location()
christian.heimes
python-checkins at python.org
Thu Nov 21 03:35:12 CET 2013
http://hg.python.org/cpython/rev/234e3c8dc52f
changeset: 87302:234e3c8dc52f
user: Christian Heimes <christian at cheimes.de>
date: Thu Nov 21 03:35:02 2013 +0100
summary:
Issue #18138: Implement cadata argument of SSLContext.load_verify_location()
to load CA certificates and CRL from memory. It supports PEM and DER
encoded strings.
files:
Doc/library/ssl.rst | 11 +-
Lib/test/test_ssl.py | 88 ++++++++++++-
Misc/NEWS | 4 +
Modules/_ssl.c | 208 +++++++++++++++++++++++++-----
4 files changed, 274 insertions(+), 37 deletions(-)
diff --git a/Doc/library/ssl.rst b/Doc/library/ssl.rst
--- a/Doc/library/ssl.rst
+++ b/Doc/library/ssl.rst
@@ -821,6 +821,7 @@
.. versionadded:: 3.4
+
.. method:: SSLContext.load_cert_chain(certfile, keyfile=None, password=None)
Load a private key and the corresponding certificate. The *certfile*
@@ -851,7 +852,7 @@
.. versionchanged:: 3.3
New optional argument *password*.
-.. method:: SSLContext.load_verify_locations(cafile=None, capath=None)
+.. method:: SSLContext.load_verify_locations(cafile=None, capath=None, cadata=None)
Load a set of "certification authority" (CA) certificates used to validate
other peers' certificates when :data:`verify_mode` is other than
@@ -867,6 +868,14 @@
following an `OpenSSL specific layout
<http://www.openssl.org/docs/ssl/SSL_CTX_load_verify_locations.html>`_.
+ The *cadata* object, if present, is either an ASCII string of one or more
+ PEM-encoded certificates or a bytes-like object of DER-encoded
+ certificates. Like with *capath* extra lines around PEM-encoded
+ certificates are ignored but at least one certificate must be present.
+
+ .. versionchanged:: 3.4
+ New optional argument *cadata*
+
.. method:: SSLContext.get_ca_certs(binary_form=False)
Get a list of loaded "certification authority" (CA) certificates. If the
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py
--- a/Lib/test/test_ssl.py
+++ b/Lib/test/test_ssl.py
@@ -25,7 +25,8 @@
PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
HOST = support.HOST
-data_file = lambda name: os.path.join(os.path.dirname(__file__), name)
+def data_file(*name):
+ return os.path.join(os.path.dirname(__file__), *name)
# The custom key and certificate files used in test_ssl are generated
# using Lib/test/make_ssl_certs.py.
@@ -43,6 +44,9 @@
KEY_PASSWORD = "somepass"
CAPATH = data_file("capath")
BYTES_CAPATH = os.fsencode(CAPATH)
+CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
+CAFILE_CACERT = data_file("capath", "5ed36f99.0")
+
# Two keys and certs signed by the same CA (for SNI tests)
SIGNED_CERTFILE = data_file("keycert3.pem")
@@ -726,7 +730,7 @@
ctx.load_verify_locations(BYTES_CERTFILE)
ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
self.assertRaises(TypeError, ctx.load_verify_locations)
- self.assertRaises(TypeError, ctx.load_verify_locations, None, None)
+ self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
with self.assertRaises(OSError) as cm:
ctx.load_verify_locations(WRONGCERT)
self.assertEqual(cm.exception.errno, errno.ENOENT)
@@ -738,6 +742,64 @@
# Issue #10989: crash if the second argument type is invalid
self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
+ def test_load_verify_cadata(self):
+ # test cadata
+ with open(CAFILE_CACERT) as f:
+ cacert_pem = f.read()
+ cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
+ with open(CAFILE_NEURONIO) as f:
+ neuronio_pem = f.read()
+ neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
+
+ # test PEM
+ ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+ self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
+ ctx.load_verify_locations(cadata=cacert_pem)
+ self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
+ ctx.load_verify_locations(cadata=neuronio_pem)
+ self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
+ # cert already in hash table
+ ctx.load_verify_locations(cadata=neuronio_pem)
+ self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
+
+ # combined
+ ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+ combined = "\n".join((cacert_pem, neuronio_pem))
+ ctx.load_verify_locations(cadata=combined)
+ self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
+
+ # with junk around the certs
+ ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+ combined = ["head", cacert_pem, "other", neuronio_pem, "again",
+ neuronio_pem, "tail"]
+ ctx.load_verify_locations(cadata="\n".join(combined))
+ self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
+
+ # test DER
+ ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+ ctx.load_verify_locations(cadata=cacert_der)
+ ctx.load_verify_locations(cadata=neuronio_der)
+ self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
+ # cert already in hash table
+ ctx.load_verify_locations(cadata=cacert_der)
+ self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
+
+ # combined
+ ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+ combined = b"".join((cacert_der, neuronio_der))
+ ctx.load_verify_locations(cadata=combined)
+ self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
+
+ # error cases
+ ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+ self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
+
+ with self.assertRaisesRegex(ssl.SSLError, "no start line"):
+ ctx.load_verify_locations(cadata="broken")
+ with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
+ ctx.load_verify_locations(cadata=b"broken")
+
+
def test_load_dh_params(self):
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
ctx.load_dh_params(DHFILE)
@@ -1057,6 +1119,28 @@
finally:
s.close()
+ def test_connect_cadata(self):
+ with open(CAFILE_CACERT) as f:
+ pem = f.read()
+ der = ssl.PEM_cert_to_DER_cert(pem)
+ with support.transient_internet("svn.python.org"):
+ ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ ctx.verify_mode = ssl.CERT_REQUIRED
+ ctx.load_verify_locations(cadata=pem)
+ with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
+ s.connect(("svn.python.org", 443))
+ cert = s.getpeercert()
+ self.assertTrue(cert)
+
+ # same with DER
+ ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ ctx.verify_mode = ssl.CERT_REQUIRED
+ ctx.load_verify_locations(cadata=der)
+ with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
+ s.connect(("svn.python.org", 443))
+ cert = s.getpeercert()
+ self.assertTrue(cert)
+
@unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
def test_makefile_close(self):
# Issue #5238: creating a file-like object with makefile() shouldn't
diff --git a/Misc/NEWS b/Misc/NEWS
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -59,6 +59,10 @@
Library
-------
+- Issue #18138: Implement cadata argument of SSLContext.load_verify_location()
+ to load CA certificates and CRL from memory. It supports PEM and DER
+ encoded strings.
+
- Issue #18775: Add name and block_size attribute to HMAC object. They now
provide the same API elements as non-keyed cryptographic hash functions.
diff --git a/Modules/_ssl.c b/Modules/_ssl.c
--- a/Modules/_ssl.c
+++ b/Modules/_ssl.c
@@ -2304,60 +2304,200 @@
return NULL;
}
+/* internal helper function, returns -1 on error
+ */
+static int
+_add_ca_certs(PySSLContext *self, void *data, Py_ssize_t len,
+ int filetype)
+{
+ BIO *biobuf = NULL;
+ X509_STORE *store;
+ int retval = 0, err, loaded = 0;
+
+ assert(filetype == SSL_FILETYPE_ASN1 || filetype == SSL_FILETYPE_PEM);
+
+ if (len <= 0) {
+ PyErr_SetString(PyExc_ValueError,
+ "Empty certificate data");
+ return -1;
+ } else if (len > INT_MAX) {
+ PyErr_SetString(PyExc_OverflowError,
+ "Certificate data is too long.");
+ return -1;
+ }
+
+ biobuf = BIO_new_mem_buf(data, len);
+ if (biobuf == NULL) {
+ _setSSLError("Can't allocate buffer", 0, __FILE__, __LINE__);
+ return -1;
+ }
+
+ store = SSL_CTX_get_cert_store(self->ctx);
+ assert(store != NULL);
+
+ while (1) {
+ X509 *cert = NULL;
+ int r;
+
+ if (filetype == SSL_FILETYPE_ASN1) {
+ cert = d2i_X509_bio(biobuf, NULL);
+ } else {
+ cert = PEM_read_bio_X509(biobuf, NULL,
+ self->ctx->default_passwd_callback,
+ self->ctx->default_passwd_callback_userdata);
+ }
+ if (cert == NULL) {
+ break;
+ }
+ r = X509_STORE_add_cert(store, cert);
+ X509_free(cert);
+ if (!r) {
+ err = ERR_peek_last_error();
+ if ((ERR_GET_LIB(err) == ERR_LIB_X509) &&
+ (ERR_GET_REASON(err) == X509_R_CERT_ALREADY_IN_HASH_TABLE)) {
+ /* cert already in hash table, not an error */
+ ERR_clear_error();
+ } else {
+ break;
+ }
+ }
+ loaded++;
+ }
+
+ err = ERR_peek_last_error();
+ if ((filetype == SSL_FILETYPE_ASN1) &&
+ (loaded > 0) &&
+ (ERR_GET_LIB(err) == ERR_LIB_ASN1) &&
+ (ERR_GET_REASON(err) == ASN1_R_HEADER_TOO_LONG)) {
+ /* EOF ASN1 file, not an error */
+ ERR_clear_error();
+ retval = 0;
+ } else if ((filetype == SSL_FILETYPE_PEM) &&
+ (loaded > 0) &&
+ (ERR_GET_LIB(err) == ERR_LIB_PEM) &&
+ (ERR_GET_REASON(err) == PEM_R_NO_START_LINE)) {
+ /* EOF PEM file, not an error */
+ ERR_clear_error();
+ retval = 0;
+ } else {
+ _setSSLError(NULL, 0, __FILE__, __LINE__);
+ retval = -1;
+ }
+
+ BIO_free(biobuf);
+ return retval;
+}
+
+
static PyObject *
load_verify_locations(PySSLContext *self, PyObject *args, PyObject *kwds)
{
- char *kwlist[] = {"cafile", "capath", NULL};
- PyObject *cafile = NULL, *capath = NULL;
+ char *kwlist[] = {"cafile", "capath", "cadata", NULL};
+ PyObject *cafile = NULL, *capath = NULL, *cadata = NULL;
PyObject *cafile_bytes = NULL, *capath_bytes = NULL;
const char *cafile_buf = NULL, *capath_buf = NULL;
- int r;
+ int r = 0, ok = 1;
errno = 0;
if (!PyArg_ParseTupleAndKeywords(args, kwds,
- "|OO:load_verify_locations", kwlist,
- &cafile, &capath))
+ "|OOO:load_verify_locations", kwlist,
+ &cafile, &capath, &cadata))
return NULL;
+
if (cafile == Py_None)
cafile = NULL;
if (capath == Py_None)
capath = NULL;
- if (cafile == NULL && capath == NULL) {
+ if (cadata == Py_None)
+ cadata = NULL;
+
+ if (cafile == NULL && capath == NULL && cadata == NULL) {
PyErr_SetString(PyExc_TypeError,
- "cafile and capath cannot be both omitted");
- return NULL;
+ "cafile, capath and cadata cannot be all omitted");
+ goto error;
}
if (cafile && !PyUnicode_FSConverter(cafile, &cafile_bytes)) {
PyErr_SetString(PyExc_TypeError,
"cafile should be a valid filesystem path");
+ goto error;
+ }
+ if (capath && !PyUnicode_FSConverter(capath, &capath_bytes)) {
+ PyErr_SetString(PyExc_TypeError,
+ "capath should be a valid filesystem path");
+ goto error;
+ }
+
+ /* validata cadata type and load cadata */
+ if (cadata) {
+ Py_buffer buf;
+ PyObject *cadata_ascii = NULL;
+
+ if (PyObject_GetBuffer(cadata, &buf, PyBUF_SIMPLE) == 0) {
+ if (!PyBuffer_IsContiguous(&buf, 'C') || buf.ndim > 1) {
+ PyBuffer_Release(&buf);
+ PyErr_SetString(PyExc_TypeError,
+ "cadata should be a contiguous buffer with "
+ "a single dimension");
+ goto error;
+ }
+ r = _add_ca_certs(self, buf.buf, buf.len, SSL_FILETYPE_ASN1);
+ PyBuffer_Release(&buf);
+ if (r == -1) {
+ goto error;
+ }
+ } else {
+ PyErr_Clear();
+ cadata_ascii = PyUnicode_AsASCIIString(cadata);
+ if (cadata_ascii == NULL) {
+ PyErr_SetString(PyExc_TypeError,
+ "cadata should be a ASCII string or a "
+ "bytes-like object");
+ goto error;
+ }
+ r = _add_ca_certs(self,
+ PyBytes_AS_STRING(cadata_ascii),
+ PyBytes_GET_SIZE(cadata_ascii),
+ SSL_FILETYPE_PEM);
+ Py_DECREF(cadata_ascii);
+ if (r == -1) {
+ goto error;
+ }
+ }
+ }
+
+ /* load cafile or capath */
+ if (cafile || capath) {
+ if (cafile)
+ cafile_buf = PyBytes_AS_STRING(cafile_bytes);
+ if (capath)
+ capath_buf = PyBytes_AS_STRING(capath_bytes);
+ PySSL_BEGIN_ALLOW_THREADS
+ r = SSL_CTX_load_verify_locations(self->ctx, cafile_buf, capath_buf);
+ PySSL_END_ALLOW_THREADS
+ if (r != 1) {
+ ok = 0;
+ if (errno != 0) {
+ ERR_clear_error();
+ PyErr_SetFromErrno(PyExc_IOError);
+ }
+ else {
+ _setSSLError(NULL, 0, __FILE__, __LINE__);
+ }
+ goto error;
+ }
+ }
+ goto end;
+
+ error:
+ ok = 0;
+ end:
+ Py_XDECREF(cafile_bytes);
+ Py_XDECREF(capath_bytes);
+ if (ok) {
+ Py_RETURN_NONE;
+ } else {
return NULL;
}
- if (capath && !PyUnicode_FSConverter(capath, &capath_bytes)) {
- Py_XDECREF(cafile_bytes);
- PyErr_SetString(PyExc_TypeError,
- "capath should be a valid filesystem path");
- return NULL;
- }
- if (cafile)
- cafile_buf = PyBytes_AS_STRING(cafile_bytes);
- if (capath)
- capath_buf = PyBytes_AS_STRING(capath_bytes);
- PySSL_BEGIN_ALLOW_THREADS
- r = SSL_CTX_load_verify_locations(self->ctx, cafile_buf, capath_buf);
- PySSL_END_ALLOW_THREADS
- Py_XDECREF(cafile_bytes);
- Py_XDECREF(capath_bytes);
- if (r != 1) {
- if (errno != 0) {
- ERR_clear_error();
- PyErr_SetFromErrno(PyExc_IOError);
- }
- else {
- _setSSLError(NULL, 0, __FILE__, __LINE__);
- }
- return NULL;
- }
- Py_RETURN_NONE;
}
static PyObject *
--
Repository URL: http://hg.python.org/cpython
More information about the Python-checkins
mailing list