[Python-checkins] cpython: expose the client's cipher suites from the handshake (closes #23186)

benjamin.peterson python-checkins at python.org
Wed Jan 7 18:14:57 CET 2015


https://hg.python.org/cpython/rev/bc34fcccaca3
changeset:   94059:bc34fcccaca3
user:        Benjamin Peterson <benjamin at python.org>
date:        Wed Jan 07 11:14:26 2015 -0600
summary:
  expose the client's cipher suites from the handshake (closes #23186)

files:
  Doc/library/ssl.rst  |  12 +++++
  Lib/ssl.py           |  10 ++++
  Lib/test/test_ssl.py |  17 +++++++
  Misc/NEWS            |   4 +
  Modules/_ssl.c       |  72 ++++++++++++++++++++++---------
  5 files changed, 94 insertions(+), 21 deletions(-)


diff --git a/Doc/library/ssl.rst b/Doc/library/ssl.rst
--- a/Doc/library/ssl.rst
+++ b/Doc/library/ssl.rst
@@ -925,6 +925,17 @@
    version of the SSL protocol that defines its use, and the number of secret
    bits being used.  If no connection has been established, returns ``None``.
 
+.. method:: SSLSocket.shared_ciphers()
+
+   Return the list of ciphers shared by the client during the handshake.  Each
+   entry of the returned list is a three-value tuple containing the name of the
+   cipher, the version of the SSL protocol that defines its use, and the number
+   of secret bits the cipher uses.  :meth:`~SSLSocket.shared_ciphers` returns
+   ``None`` if no connection has been established or the socket is a client
+   socket.
+
+   .. versionadded:: 3.5
+
 .. method:: SSLSocket.compression()
 
    Return the compression algorithm being used as a string, or ``None``
@@ -1784,6 +1795,7 @@
    - :meth:`~SSLSocket.getpeercert`
    - :meth:`~SSLSocket.selected_npn_protocol`
    - :meth:`~SSLSocket.cipher`
+   - :meth:`~SSLSocket.shared_ciphers`
    - :meth:`~SSLSocket.compression`
    - :meth:`~SSLSocket.pending`
    - :meth:`~SSLSocket.do_handshake`
diff --git a/Lib/ssl.py b/Lib/ssl.py
--- a/Lib/ssl.py
+++ b/Lib/ssl.py
@@ -572,6 +572,10 @@
         ssl_version, secret_bits)``."""
         return self._sslobj.cipher()
 
+    def shared_ciphers(self):
+        """Return the ciphers shared by the client during the handshake."""
+        return self._sslobj.shared_ciphers()
+
     def compression(self):
         """Return the current compression algorithm in use, or ``None`` if
         compression was not negotiated or not supported by one of the peers."""
@@ -784,6 +788,12 @@
         else:
             return self._sslobj.cipher()
 
+    def shared_ciphers(self):
+        self._checkClosed()
+        if not self._sslobj:
+            return None
+        return self._sslobj.shared_ciphers()
+
     def compression(self):
         self._checkClosed()
         if not self._sslobj:
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py
--- a/Lib/test/test_ssl.py
+++ b/Lib/test/test_ssl.py
@@ -1698,11 +1698,13 @@
             sslobj = ctx.wrap_bio(incoming, outgoing, False, 'svn.python.org')
             self.assertIs(sslobj._sslobj.owner, sslobj)
             self.assertIsNone(sslobj.cipher())
+            self.assertIsNone(sslobj.shared_ciphers())
             self.assertRaises(ValueError, sslobj.getpeercert)
             if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
                 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
             self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
             self.assertTrue(sslobj.cipher())
+            self.assertIsNone(sslobj.shared_ciphers())
             self.assertTrue(sslobj.getpeercert())
             if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
                 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
@@ -1776,6 +1778,7 @@
                     self.close()
                     return False
                 else:
+                    self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
                     if self.server.context.verify_mode == ssl.CERT_REQUIRED:
                         cert = self.sslconn.getpeercert()
                         if support.verbose and self.server.chatty:
@@ -1891,6 +1894,7 @@
             self.flag = None
             self.active = False
             self.selected_protocols = []
+            self.shared_ciphers = []
             self.conn_errors = []
             threading.Thread.__init__(self)
             self.daemon = True
@@ -2121,6 +2125,7 @@
                 })
                 s.close()
             stats['server_npn_protocols'] = server.selected_protocols
+            stats['server_shared_ciphers'] = server.shared_ciphers
         return stats
 
     def try_protocol_combo(server_protocol, client_protocol, expect_success,
@@ -3157,6 +3162,18 @@
             self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
             self.assertIn("TypeError", stderr.getvalue())
 
+        def test_shared_ciphers(self):
+            server_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+            client_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+            client_context.set_ciphers("3DES")
+            server_context.set_ciphers("3DES:AES")
+            stats = server_params_test(client_context, server_context)
+            ciphers = stats['server_shared_ciphers'][0]
+            self.assertGreater(len(ciphers), 0)
+            for name, tls_version, bits in ciphers:
+                self.assertIn("DES-CBC3-", name)
+                self.assertEqual(bits, 112)
+
         def test_read_write_after_close_raises_valuerror(self):
             context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
             context.verify_mode = ssl.CERT_REQUIRED
diff --git a/Misc/NEWS b/Misc/NEWS
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -199,6 +199,10 @@
 Library
 -------
 
+- Issue #23186: Add ssl.SSLObject.shared_ciphers() and
+  ssl.SSLSocket.shared_ciphers() to fetch the client's list ciphers sent at
+  handshake.
+
 - Issue #23143: Remove compatibility with OpenSSLs older than 0.9.8.
 
 - Issue #23132: Improve performance and introspection support of comparison
diff --git a/Modules/_ssl.c b/Modules/_ssl.c
--- a/Modules/_ssl.c
+++ b/Modules/_ssl.c
@@ -1360,54 +1360,83 @@
 peer certificate, or None if no certificate was provided.  This will\n\
 return the certificate even if it wasn't validated.");
 
-static PyObject *PySSL_cipher (PySSLSocket *self) {
-
-    PyObject *retval, *v;
-    const SSL_CIPHER *current;
-    char *cipher_name;
-    char *cipher_protocol;
-
-    if (self->ssl == NULL)
-        Py_RETURN_NONE;
-    current = SSL_get_current_cipher(self->ssl);
-    if (current == NULL)
-        Py_RETURN_NONE;
-
-    retval = PyTuple_New(3);
+static PyObject *
+cipher_to_tuple(const SSL_CIPHER *cipher)
+{
+    const char *cipher_name, *cipher_protocol;
+    PyObject *v, *retval = PyTuple_New(3);
     if (retval == NULL)
         return NULL;
 
-    cipher_name = (char *) SSL_CIPHER_get_name(current);
+    cipher_name = SSL_CIPHER_get_name(cipher);
     if (cipher_name == NULL) {
         Py_INCREF(Py_None);
         PyTuple_SET_ITEM(retval, 0, Py_None);
     } else {
         v = PyUnicode_FromString(cipher_name);
         if (v == NULL)
-            goto fail0;
+            goto fail;
         PyTuple_SET_ITEM(retval, 0, v);
     }
-    cipher_protocol = (char *) SSL_CIPHER_get_version(current);
+
+    cipher_protocol = SSL_CIPHER_get_version(cipher);
     if (cipher_protocol == NULL) {
         Py_INCREF(Py_None);
         PyTuple_SET_ITEM(retval, 1, Py_None);
     } else {
         v = PyUnicode_FromString(cipher_protocol);
         if (v == NULL)
-            goto fail0;
+            goto fail;
         PyTuple_SET_ITEM(retval, 1, v);
     }
-    v = PyLong_FromLong(SSL_CIPHER_get_bits(current, NULL));
+
+    v = PyLong_FromLong(SSL_CIPHER_get_bits(cipher, NULL));
     if (v == NULL)
-        goto fail0;
+        goto fail;
     PyTuple_SET_ITEM(retval, 2, v);
+
     return retval;
 
-  fail0:
+  fail:
     Py_DECREF(retval);
     return NULL;
 }
 
+static PyObject *PySSL_shared_ciphers(PySSLSocket *self)
+{
+    STACK_OF(SSL_CIPHER) *ciphers;
+    int i;
+    PyObject *res;
+
+    if (!self->ssl->session || !self->ssl->session->ciphers)
+        Py_RETURN_NONE;
+    ciphers = self->ssl->session->ciphers;
+    res = PyList_New(sk_SSL_CIPHER_num(ciphers));
+    if (!res)
+        return NULL;
+    for (i = 0; i < sk_SSL_CIPHER_num(ciphers); i++) {
+        PyObject *tup = cipher_to_tuple(sk_SSL_CIPHER_value(ciphers, i));
+        if (!tup) {
+            Py_DECREF(res);
+            return NULL;
+        }
+        PyList_SET_ITEM(res, i, tup);
+    }
+    return res;
+}
+
+static PyObject *PySSL_cipher (PySSLSocket *self)
+{
+    const SSL_CIPHER *current;
+
+    if (self->ssl == NULL)
+        Py_RETURN_NONE;
+    current = SSL_get_current_cipher(self->ssl);
+    if (current == NULL)
+        Py_RETURN_NONE;
+    return cipher_to_tuple(current);
+}
+
 static PyObject *PySSL_version(PySSLSocket *self)
 {
     const char *version;
@@ -2019,6 +2048,7 @@
     {"peer_certificate", (PyCFunction)PySSL_peercert, METH_VARARGS,
      PySSL_peercert_doc},
     {"cipher", (PyCFunction)PySSL_cipher, METH_NOARGS},
+    {"shared_ciphers", (PyCFunction)PySSL_shared_ciphers, METH_NOARGS},
     {"version", (PyCFunction)PySSL_version, METH_NOARGS},
 #ifdef OPENSSL_NPN_NEGOTIATED
     {"selected_npn_protocol", (PyCFunction)PySSL_selected_npn_protocol, METH_NOARGS},

-- 
Repository URL: https://hg.python.org/cpython


More information about the Python-checkins mailing list