[Python-checkins] cpython: Issue #19509: Finish implementation of check_hostname

christian.heimes python-checkins at python.org
Fri Dec 6 00:29:11 CET 2013


http://hg.python.org/cpython/rev/1605eda93392
changeset:   87786:1605eda93392
user:        Christian Heimes <christian at cheimes.de>
date:        Fri Dec 06 00:23:13 2013 +0100
summary:
  Issue #19509: Finish implementation of check_hostname
The new asyncio package now supports the new feature and comes with additional tests for SSL.

files:
  Lib/asyncio/selector_events.py       |   25 +-
  Lib/test/keycert3.pem                |    0 
  Lib/test/pycacert.pem                |    0 
  Lib/test/test_asyncio/sample.crt     |   14 -
  Lib/test/test_asyncio/sample.key     |   15 -
  Lib/test/ssl_cert.pem                |    0 
  Lib/test/ssl_key.pem                 |    0 
  Lib/test/test_asyncio/test_events.py |  140 +++++++++++++-
  8 files changed, 137 insertions(+), 57 deletions(-)


diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py
--- a/Lib/asyncio/selector_events.py
+++ b/Lib/asyncio/selector_events.py
@@ -583,7 +583,8 @@
                 # cadefault=True.
                 if hasattr(ssl, '_create_stdlib_context'):
                     sslcontext = ssl._create_stdlib_context(
-                        cert_reqs=ssl.CERT_REQUIRED)
+                        cert_reqs=ssl.CERT_REQUIRED,
+                        check_hostname=bool(server_hostname))
                 else:
                     # Fallback for Python 3.3.
                     sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
@@ -639,17 +640,19 @@
         self._loop.remove_reader(self._sock_fd)
         self._loop.remove_writer(self._sock_fd)
 
-        # Verify hostname if requested.
         peercert = self._sock.getpeercert()
-        if (self._server_hostname and
-            self._sslcontext.verify_mode != ssl.CERT_NONE):
-            try:
-                ssl.match_hostname(peercert, self._server_hostname)
-            except Exception as exc:
-                self._sock.close()
-                if self._waiter is not None:
-                    self._waiter.set_exception(exc)
-                return
+        if not hasattr(self._sslcontext, 'check_hostname'):
+            # Verify hostname if requested, Python 3.4+ uses check_hostname
+            # and checks the hostname in do_handshake()
+            if (self._server_hostname and
+                self._sslcontext.verify_mode != ssl.CERT_NONE):
+                try:
+                    ssl.match_hostname(peercert, self._server_hostname)
+                except Exception as exc:
+                    self._sock.close()
+                    if self._waiter is not None:
+                        self._waiter.set_exception(exc)
+                    return
 
         # Add extra info that becomes available after handshake.
         self._extra.update(peercert=peercert,
diff --git a/Lib/test/keycert3.pem b/Lib/test/test_asyncio/keycert3.pem
copy from Lib/test/keycert3.pem
copy to Lib/test/test_asyncio/keycert3.pem
diff --git a/Lib/test/pycacert.pem b/Lib/test/test_asyncio/pycacert.pem
copy from Lib/test/pycacert.pem
copy to Lib/test/test_asyncio/pycacert.pem
diff --git a/Lib/test/test_asyncio/sample.crt b/Lib/test/test_asyncio/sample.crt
deleted file mode 100644
--- a/Lib/test/test_asyncio/sample.crt
+++ /dev/null
@@ -1,14 +0,0 @@
------BEGIN CERTIFICATE-----
-MIICMzCCAZwCCQDFl4ys0fU7iTANBgkqhkiG9w0BAQUFADBeMQswCQYDVQQGEwJV
-UzETMBEGA1UECAwKQ2FsaWZvcm5pYTEWMBQGA1UEBwwNU2FuLUZyYW5jaXNjbzEi
-MCAGA1UECgwZUHl0aG9uIFNvZnR3YXJlIEZvbmRhdGlvbjAeFw0xMzAzMTgyMDA3
-MjhaFw0yMzAzMTYyMDA3MjhaMF4xCzAJBgNVBAYTAlVTMRMwEQYDVQQIDApDYWxp
-Zm9ybmlhMRYwFAYDVQQHDA1TYW4tRnJhbmNpc2NvMSIwIAYDVQQKDBlQeXRob24g
-U29mdHdhcmUgRm9uZGF0aW9uMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCn
-t3s+J7L0xP/YdAQOacpPi9phlrzKZhcXL3XMu2LCUg2fNJpx/47Vc5TZSaO11uO7
-gdwVz3Z7Q2epAgwo59JLffLt5fia8+a/SlPweI/j4+wcIIIiqusnLfpqR8cIAavg
-Z06cLYCDvb9wMlheIvSJY12skc1nnphWS2YJ0Xm6uQIDAQABMA0GCSqGSIb3DQEB
-BQUAA4GBAE9PknG6pv72+5z/gsDGYy8sK5UNkbWSNr4i4e5lxVsF03+/M71H+3AB
-MxVX4+A+Vlk2fmU+BrdHIIUE0r1dDcO3josQ9hc9OJpp5VLSQFP8VeuJCmzYPp9I
-I8WbW93cnXnChTrYQVdgVoFdv7GE9YgU7NYkrGIM0nZl1/f/bHPB
------END CERTIFICATE-----
diff --git a/Lib/test/test_asyncio/sample.key b/Lib/test/test_asyncio/sample.key
deleted file mode 100644
--- a/Lib/test/test_asyncio/sample.key
+++ /dev/null
@@ -1,15 +0,0 @@
------BEGIN RSA PRIVATE KEY-----
-MIICXQIBAAKBgQCnt3s+J7L0xP/YdAQOacpPi9phlrzKZhcXL3XMu2LCUg2fNJpx
-/47Vc5TZSaO11uO7gdwVz3Z7Q2epAgwo59JLffLt5fia8+a/SlPweI/j4+wcIIIi
-qusnLfpqR8cIAavgZ06cLYCDvb9wMlheIvSJY12skc1nnphWS2YJ0Xm6uQIDAQAB
-AoGABfm8k19Yue3W68BecKEGS0VBV57GRTPT+MiBGvVGNIQ15gk6w3sGfMZsdD1y
-bsUkQgcDb2d/4i5poBTpl/+Cd41V+c20IC/sSl5X1IEreHMKSLhy/uyjyiyfXlP1
-iXhToFCgLWwENWc8LzfUV8vuAV5WG6oL9bnudWzZxeqx8V0CQQDR7xwVj6LN70Eb
-DUhSKLkusmFw5Gk9NJ/7wZ4eHg4B8c9KNVvSlLCLhcsVTQXuqYeFpOqytI45SneP
-lr0vrvsDAkEAzITYiXu6ox5huDCG7imX2W9CAYuX638urLxBqBXMS7GqBzojD6RL
-21Q8oPwJWJquERa3HDScq1deiQbM9uKIkwJBAIa1PLslGN216Xv3UPHPScyKD/aF
-ynXIv+OnANPoiyp6RH4ksQ/18zcEGiVH8EeNpvV9tlAHhb+DZibQHgNr74sCQQC0
-zhToplu/bVKSlUQUNO0rqrI9z30FErDewKeCw5KSsIRSU1E/uM3fHr9iyq4wiL6u
-GNjUtKZ0y46lsT9uW6LFAkB5eqeEQnshAdr3X5GykWHJ8DDGBXPPn6Rce1NX4RSq
-V9khG2z1bFyfo+hMqpYnF2k32hVq3E54RS8YYnwBsVof
------END RSA PRIVATE KEY-----
diff --git a/Lib/test/ssl_cert.pem b/Lib/test/test_asyncio/ssl_cert.pem
copy from Lib/test/ssl_cert.pem
copy to Lib/test/test_asyncio/ssl_cert.pem
diff --git a/Lib/test/ssl_key.pem b/Lib/test/test_asyncio/ssl_key.pem
copy from Lib/test/ssl_key.pem
copy to Lib/test/test_asyncio/ssl_key.pem
diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py
--- a/Lib/test/test_asyncio/test_events.py
+++ b/Lib/test/test_asyncio/test_events.py
@@ -17,7 +17,7 @@
 import errno
 import unittest
 import unittest.mock
-from test.support import find_unused_port, IPV6_ENABLED
+from test import support  # find_unused_port, IPV6_ENABLED, TEST_HOME_DIR
 
 
 from asyncio import futures
@@ -30,10 +30,27 @@
 from asyncio import locks
 
 
+def data_file(filename):
+    if hasattr(support, 'TEST_HOME_DIR'):
+        fullname = os.path.join(support.TEST_HOME_DIR, filename)
+        if os.path.isfile(fullname):
+            return fullname
+    fullname = os.path.join(os.path.dirname(__file__), filename)
+    if os.path.isfile(fullname):
+        return fullname
+    raise FileNotFoundError(filename)
+
+ONLYCERT = data_file('ssl_cert.pem')
+ONLYKEY = data_file('ssl_key.pem')
+SIGNED_CERTFILE = data_file('keycert3.pem')
+SIGNING_CA = data_file('pycacert.pem')
+
+
 class MyProto(protocols.Protocol):
     done = None
 
     def __init__(self, loop=None):
+        self.transport = None
         self.state = 'INITIAL'
         self.nbytes = 0
         if loop is not None:
@@ -523,7 +540,7 @@
 
     def test_create_connection_local_addr(self):
         with test_utils.run_test_server() as httpd:
-            port = find_unused_port()
+            port = support.find_unused_port()
             f = self.loop.create_connection(
                 lambda: MyProto(loop=self.loop),
                 *httpd.address, local_addr=(httpd.address[0], port))
@@ -587,6 +604,20 @@
         # close server
         server.close()
 
+    def _make_ssl_server(self, factory, certfile, keyfile=None):
+        sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+        sslcontext.options |= ssl.OP_NO_SSLv2
+        sslcontext.load_cert_chain(certfile, keyfile)
+
+        f = self.loop.create_server(
+            factory, '127.0.0.1', 0, ssl=sslcontext)
+
+        server = self.loop.run_until_complete(f)
+        sock = server.sockets[0]
+        host, port = sock.getsockname()
+        self.assertEqual(host, '127.0.0.1')
+        return server, host, port
+
     @unittest.skipIf(ssl is None, 'No ssl module')
     def test_create_server_ssl(self):
         proto = None
@@ -602,19 +633,7 @@
             proto = MyProto(loop=self.loop)
             return proto
 
-        here = os.path.dirname(__file__)
-        sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
-        sslcontext.load_cert_chain(
-            certfile=os.path.join(here, 'sample.crt'),
-            keyfile=os.path.join(here, 'sample.key'))
-
-        f = self.loop.create_server(
-            factory, '127.0.0.1', 0, ssl=sslcontext)
-
-        server = self.loop.run_until_complete(f)
-        sock = server.sockets[0]
-        host, port = sock.getsockname()
-        self.assertEqual(host, '127.0.0.1')
+        server, host, port = self._make_ssl_server(factory, ONLYCERT, ONLYKEY)
 
         f_c = self.loop.create_connection(ClientMyProto, host, port,
                                           ssl=test_utils.dummy_ssl_context())
@@ -646,6 +665,93 @@
         # stop serving
         server.close()
 
+    @unittest.skipIf(ssl is None, 'No ssl module')
+    def test_create_server_ssl_verify_failed(self):
+        proto = None
+
+        def factory():
+            nonlocal proto
+            proto = MyProto(loop=self.loop)
+            return proto
+
+        server, host, port = self._make_ssl_server(factory, SIGNED_CERTFILE)
+
+        sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+        sslcontext_client.options |= ssl.OP_NO_SSLv2
+        sslcontext_client.verify_mode = ssl.CERT_REQUIRED
+        if hasattr(sslcontext_client, 'check_hostname'):
+            sslcontext_client.check_hostname = True
+
+        # no CA loaded
+        f_c = self.loop.create_connection(MyProto, host, port,
+                                          ssl=sslcontext_client)
+        with self.assertRaisesRegex(ssl.SSLError,
+                                    'certificate verify failed '):
+            self.loop.run_until_complete(f_c)
+
+        # close connection
+        self.assertIsNone(proto.transport)
+        server.close()
+
+    @unittest.skipIf(ssl is None, 'No ssl module')
+    def test_create_server_ssl_match_failed(self):
+        proto = None
+
+        def factory():
+            nonlocal proto
+            proto = MyProto(loop=self.loop)
+            return proto
+
+        server, host, port = self._make_ssl_server(factory, SIGNED_CERTFILE)
+
+        sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+        sslcontext_client.options |= ssl.OP_NO_SSLv2
+        sslcontext_client.verify_mode = ssl.CERT_REQUIRED
+        sslcontext_client.load_verify_locations(
+            cafile=SIGNING_CA)
+        if hasattr(sslcontext_client, 'check_hostname'):
+            sslcontext_client.check_hostname = True
+
+        # incorrect server_hostname
+        f_c = self.loop.create_connection(MyProto, host, port,
+                                          ssl=sslcontext_client)
+        with self.assertRaisesRegex(ssl.CertificateError,
+                "hostname '127.0.0.1' doesn't match 'localhost'"):
+            self.loop.run_until_complete(f_c)
+
+        # close connection
+        proto.transport.close()
+        server.close()
+
+    @unittest.skipIf(ssl is None, 'No ssl module')
+    def test_create_server_ssl_verified(self):
+        proto = None
+
+        def factory():
+            nonlocal proto
+            proto = MyProto(loop=self.loop)
+            return proto
+
+        server, host, port = self._make_ssl_server(factory, SIGNED_CERTFILE)
+
+        sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+        sslcontext_client.options |= ssl.OP_NO_SSLv2
+        sslcontext_client.verify_mode = ssl.CERT_REQUIRED
+        sslcontext_client.load_verify_locations(cafile=SIGNING_CA)
+        if hasattr(sslcontext_client, 'check_hostname'):
+            sslcontext_client.check_hostname = True
+
+        # Connection succeeds with correct CA and server hostname.
+        f_c = self.loop.create_connection(MyProto, host, port,
+                                          ssl=sslcontext_client,
+                                          server_hostname='localhost')
+        client, pr = self.loop.run_until_complete(f_c)
+
+        # close connection
+        proto.transport.close()
+        client.close()
+        server.close()
+
     def test_create_server_sock(self):
         proto = futures.Future(loop=self.loop)
 
@@ -688,7 +794,7 @@
 
         server.close()
 
-    @unittest.skipUnless(IPV6_ENABLED, 'IPv6 not supported or enabled')
+    @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 not supported or enabled')
     def test_create_server_dual_stack(self):
         f_proto = futures.Future(loop=self.loop)
 
@@ -700,7 +806,7 @@
         try_count = 0
         while True:
             try:
-                port = find_unused_port()
+                port = support.find_unused_port()
                 f = self.loop.create_server(TestMyProto, host=None, port=port)
                 server = self.loop.run_until_complete(f)
             except OSError as ex:

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list