[Python-checkins] cpython: Issue #19509: Finish implementation of check_hostname
christian.heimes
python-checkins at python.org
Fri Dec 6 00:29:11 CET 2013
http://hg.python.org/cpython/rev/1605eda93392
changeset: 87786:1605eda93392
user: Christian Heimes <christian at cheimes.de>
date: Fri Dec 06 00:23:13 2013 +0100
summary:
Issue #19509: Finish implementation of check_hostname
The new asyncio package now supports the new feature and comes with additional tests for SSL.
files:
Lib/asyncio/selector_events.py | 25 +-
Lib/test/keycert3.pem | 0
Lib/test/pycacert.pem | 0
Lib/test/test_asyncio/sample.crt | 14 -
Lib/test/test_asyncio/sample.key | 15 -
Lib/test/ssl_cert.pem | 0
Lib/test/ssl_key.pem | 0
Lib/test/test_asyncio/test_events.py | 140 +++++++++++++-
8 files changed, 137 insertions(+), 57 deletions(-)
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py
--- a/Lib/asyncio/selector_events.py
+++ b/Lib/asyncio/selector_events.py
@@ -583,7 +583,8 @@
# cadefault=True.
if hasattr(ssl, '_create_stdlib_context'):
sslcontext = ssl._create_stdlib_context(
- cert_reqs=ssl.CERT_REQUIRED)
+ cert_reqs=ssl.CERT_REQUIRED,
+ check_hostname=bool(server_hostname))
else:
# Fallback for Python 3.3.
sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
@@ -639,17 +640,19 @@
self._loop.remove_reader(self._sock_fd)
self._loop.remove_writer(self._sock_fd)
- # Verify hostname if requested.
peercert = self._sock.getpeercert()
- if (self._server_hostname and
- self._sslcontext.verify_mode != ssl.CERT_NONE):
- try:
- ssl.match_hostname(peercert, self._server_hostname)
- except Exception as exc:
- self._sock.close()
- if self._waiter is not None:
- self._waiter.set_exception(exc)
- return
+ if not hasattr(self._sslcontext, 'check_hostname'):
+ # Verify hostname if requested, Python 3.4+ uses check_hostname
+ # and checks the hostname in do_handshake()
+ if (self._server_hostname and
+ self._sslcontext.verify_mode != ssl.CERT_NONE):
+ try:
+ ssl.match_hostname(peercert, self._server_hostname)
+ except Exception as exc:
+ self._sock.close()
+ if self._waiter is not None:
+ self._waiter.set_exception(exc)
+ return
# Add extra info that becomes available after handshake.
self._extra.update(peercert=peercert,
diff --git a/Lib/test/keycert3.pem b/Lib/test/test_asyncio/keycert3.pem
copy from Lib/test/keycert3.pem
copy to Lib/test/test_asyncio/keycert3.pem
diff --git a/Lib/test/pycacert.pem b/Lib/test/test_asyncio/pycacert.pem
copy from Lib/test/pycacert.pem
copy to Lib/test/test_asyncio/pycacert.pem
diff --git a/Lib/test/test_asyncio/sample.crt b/Lib/test/test_asyncio/sample.crt
deleted file mode 100644
--- a/Lib/test/test_asyncio/sample.crt
+++ /dev/null
@@ -1,14 +0,0 @@
------BEGIN CERTIFICATE-----
-MIICMzCCAZwCCQDFl4ys0fU7iTANBgkqhkiG9w0BAQUFADBeMQswCQYDVQQGEwJV
-UzETMBEGA1UECAwKQ2FsaWZvcm5pYTEWMBQGA1UEBwwNU2FuLUZyYW5jaXNjbzEi
-MCAGA1UECgwZUHl0aG9uIFNvZnR3YXJlIEZvbmRhdGlvbjAeFw0xMzAzMTgyMDA3
-MjhaFw0yMzAzMTYyMDA3MjhaMF4xCzAJBgNVBAYTAlVTMRMwEQYDVQQIDApDYWxp
-Zm9ybmlhMRYwFAYDVQQHDA1TYW4tRnJhbmNpc2NvMSIwIAYDVQQKDBlQeXRob24g
-U29mdHdhcmUgRm9uZGF0aW9uMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCn
-t3s+J7L0xP/YdAQOacpPi9phlrzKZhcXL3XMu2LCUg2fNJpx/47Vc5TZSaO11uO7
-gdwVz3Z7Q2epAgwo59JLffLt5fia8+a/SlPweI/j4+wcIIIiqusnLfpqR8cIAavg
-Z06cLYCDvb9wMlheIvSJY12skc1nnphWS2YJ0Xm6uQIDAQABMA0GCSqGSIb3DQEB
-BQUAA4GBAE9PknG6pv72+5z/gsDGYy8sK5UNkbWSNr4i4e5lxVsF03+/M71H+3AB
-MxVX4+A+Vlk2fmU+BrdHIIUE0r1dDcO3josQ9hc9OJpp5VLSQFP8VeuJCmzYPp9I
-I8WbW93cnXnChTrYQVdgVoFdv7GE9YgU7NYkrGIM0nZl1/f/bHPB
------END CERTIFICATE-----
diff --git a/Lib/test/test_asyncio/sample.key b/Lib/test/test_asyncio/sample.key
deleted file mode 100644
--- a/Lib/test/test_asyncio/sample.key
+++ /dev/null
@@ -1,15 +0,0 @@
------BEGIN RSA PRIVATE KEY-----
-MIICXQIBAAKBgQCnt3s+J7L0xP/YdAQOacpPi9phlrzKZhcXL3XMu2LCUg2fNJpx
-/47Vc5TZSaO11uO7gdwVz3Z7Q2epAgwo59JLffLt5fia8+a/SlPweI/j4+wcIIIi
-qusnLfpqR8cIAavgZ06cLYCDvb9wMlheIvSJY12skc1nnphWS2YJ0Xm6uQIDAQAB
-AoGABfm8k19Yue3W68BecKEGS0VBV57GRTPT+MiBGvVGNIQ15gk6w3sGfMZsdD1y
-bsUkQgcDb2d/4i5poBTpl/+Cd41V+c20IC/sSl5X1IEreHMKSLhy/uyjyiyfXlP1
-iXhToFCgLWwENWc8LzfUV8vuAV5WG6oL9bnudWzZxeqx8V0CQQDR7xwVj6LN70Eb
-DUhSKLkusmFw5Gk9NJ/7wZ4eHg4B8c9KNVvSlLCLhcsVTQXuqYeFpOqytI45SneP
-lr0vrvsDAkEAzITYiXu6ox5huDCG7imX2W9CAYuX638urLxBqBXMS7GqBzojD6RL
-21Q8oPwJWJquERa3HDScq1deiQbM9uKIkwJBAIa1PLslGN216Xv3UPHPScyKD/aF
-ynXIv+OnANPoiyp6RH4ksQ/18zcEGiVH8EeNpvV9tlAHhb+DZibQHgNr74sCQQC0
-zhToplu/bVKSlUQUNO0rqrI9z30FErDewKeCw5KSsIRSU1E/uM3fHr9iyq4wiL6u
-GNjUtKZ0y46lsT9uW6LFAkB5eqeEQnshAdr3X5GykWHJ8DDGBXPPn6Rce1NX4RSq
-V9khG2z1bFyfo+hMqpYnF2k32hVq3E54RS8YYnwBsVof
------END RSA PRIVATE KEY-----
diff --git a/Lib/test/ssl_cert.pem b/Lib/test/test_asyncio/ssl_cert.pem
copy from Lib/test/ssl_cert.pem
copy to Lib/test/test_asyncio/ssl_cert.pem
diff --git a/Lib/test/ssl_key.pem b/Lib/test/test_asyncio/ssl_key.pem
copy from Lib/test/ssl_key.pem
copy to Lib/test/test_asyncio/ssl_key.pem
diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py
--- a/Lib/test/test_asyncio/test_events.py
+++ b/Lib/test/test_asyncio/test_events.py
@@ -17,7 +17,7 @@
import errno
import unittest
import unittest.mock
-from test.support import find_unused_port, IPV6_ENABLED
+from test import support # find_unused_port, IPV6_ENABLED, TEST_HOME_DIR
from asyncio import futures
@@ -30,10 +30,27 @@
from asyncio import locks
+def data_file(filename):
+ if hasattr(support, 'TEST_HOME_DIR'):
+ fullname = os.path.join(support.TEST_HOME_DIR, filename)
+ if os.path.isfile(fullname):
+ return fullname
+ fullname = os.path.join(os.path.dirname(__file__), filename)
+ if os.path.isfile(fullname):
+ return fullname
+ raise FileNotFoundError(filename)
+
+ONLYCERT = data_file('ssl_cert.pem')
+ONLYKEY = data_file('ssl_key.pem')
+SIGNED_CERTFILE = data_file('keycert3.pem')
+SIGNING_CA = data_file('pycacert.pem')
+
+
class MyProto(protocols.Protocol):
done = None
def __init__(self, loop=None):
+ self.transport = None
self.state = 'INITIAL'
self.nbytes = 0
if loop is not None:
@@ -523,7 +540,7 @@
def test_create_connection_local_addr(self):
with test_utils.run_test_server() as httpd:
- port = find_unused_port()
+ port = support.find_unused_port()
f = self.loop.create_connection(
lambda: MyProto(loop=self.loop),
*httpd.address, local_addr=(httpd.address[0], port))
@@ -587,6 +604,20 @@
# close server
server.close()
+ def _make_ssl_server(self, factory, certfile, keyfile=None):
+ sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ sslcontext.options |= ssl.OP_NO_SSLv2
+ sslcontext.load_cert_chain(certfile, keyfile)
+
+ f = self.loop.create_server(
+ factory, '127.0.0.1', 0, ssl=sslcontext)
+
+ server = self.loop.run_until_complete(f)
+ sock = server.sockets[0]
+ host, port = sock.getsockname()
+ self.assertEqual(host, '127.0.0.1')
+ return server, host, port
+
@unittest.skipIf(ssl is None, 'No ssl module')
def test_create_server_ssl(self):
proto = None
@@ -602,19 +633,7 @@
proto = MyProto(loop=self.loop)
return proto
- here = os.path.dirname(__file__)
- sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
- sslcontext.load_cert_chain(
- certfile=os.path.join(here, 'sample.crt'),
- keyfile=os.path.join(here, 'sample.key'))
-
- f = self.loop.create_server(
- factory, '127.0.0.1', 0, ssl=sslcontext)
-
- server = self.loop.run_until_complete(f)
- sock = server.sockets[0]
- host, port = sock.getsockname()
- self.assertEqual(host, '127.0.0.1')
+ server, host, port = self._make_ssl_server(factory, ONLYCERT, ONLYKEY)
f_c = self.loop.create_connection(ClientMyProto, host, port,
ssl=test_utils.dummy_ssl_context())
@@ -646,6 +665,93 @@
# stop serving
server.close()
+ @unittest.skipIf(ssl is None, 'No ssl module')
+ def test_create_server_ssl_verify_failed(self):
+ proto = None
+
+ def factory():
+ nonlocal proto
+ proto = MyProto(loop=self.loop)
+ return proto
+
+ server, host, port = self._make_ssl_server(factory, SIGNED_CERTFILE)
+
+ sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ sslcontext_client.options |= ssl.OP_NO_SSLv2
+ sslcontext_client.verify_mode = ssl.CERT_REQUIRED
+ if hasattr(sslcontext_client, 'check_hostname'):
+ sslcontext_client.check_hostname = True
+
+ # no CA loaded
+ f_c = self.loop.create_connection(MyProto, host, port,
+ ssl=sslcontext_client)
+ with self.assertRaisesRegex(ssl.SSLError,
+ 'certificate verify failed '):
+ self.loop.run_until_complete(f_c)
+
+ # close connection
+ self.assertIsNone(proto.transport)
+ server.close()
+
+ @unittest.skipIf(ssl is None, 'No ssl module')
+ def test_create_server_ssl_match_failed(self):
+ proto = None
+
+ def factory():
+ nonlocal proto
+ proto = MyProto(loop=self.loop)
+ return proto
+
+ server, host, port = self._make_ssl_server(factory, SIGNED_CERTFILE)
+
+ sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ sslcontext_client.options |= ssl.OP_NO_SSLv2
+ sslcontext_client.verify_mode = ssl.CERT_REQUIRED
+ sslcontext_client.load_verify_locations(
+ cafile=SIGNING_CA)
+ if hasattr(sslcontext_client, 'check_hostname'):
+ sslcontext_client.check_hostname = True
+
+ # incorrect server_hostname
+ f_c = self.loop.create_connection(MyProto, host, port,
+ ssl=sslcontext_client)
+ with self.assertRaisesRegex(ssl.CertificateError,
+ "hostname '127.0.0.1' doesn't match 'localhost'"):
+ self.loop.run_until_complete(f_c)
+
+ # close connection
+ proto.transport.close()
+ server.close()
+
+ @unittest.skipIf(ssl is None, 'No ssl module')
+ def test_create_server_ssl_verified(self):
+ proto = None
+
+ def factory():
+ nonlocal proto
+ proto = MyProto(loop=self.loop)
+ return proto
+
+ server, host, port = self._make_ssl_server(factory, SIGNED_CERTFILE)
+
+ sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ sslcontext_client.options |= ssl.OP_NO_SSLv2
+ sslcontext_client.verify_mode = ssl.CERT_REQUIRED
+ sslcontext_client.load_verify_locations(cafile=SIGNING_CA)
+ if hasattr(sslcontext_client, 'check_hostname'):
+ sslcontext_client.check_hostname = True
+
+ # Connection succeeds with correct CA and server hostname.
+ f_c = self.loop.create_connection(MyProto, host, port,
+ ssl=sslcontext_client,
+ server_hostname='localhost')
+ client, pr = self.loop.run_until_complete(f_c)
+
+ # close connection
+ proto.transport.close()
+ client.close()
+ server.close()
+
def test_create_server_sock(self):
proto = futures.Future(loop=self.loop)
@@ -688,7 +794,7 @@
server.close()
- @unittest.skipUnless(IPV6_ENABLED, 'IPv6 not supported or enabled')
+ @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 not supported or enabled')
def test_create_server_dual_stack(self):
f_proto = futures.Future(loop=self.loop)
@@ -700,7 +806,7 @@
try_count = 0
while True:
try:
- port = find_unused_port()
+ port = support.find_unused_port()
f = self.loop.create_server(TestMyProto, host=None, port=port)
server = self.loop.run_until_complete(f)
except OSError as ex:
--
Repository URL: http://hg.python.org/cpython
More information about the Python-checkins
mailing list