[Python-checkins] r84650 - python/branches/py3k/Lib/test/test_ssl.py

antoine.pitrou python-checkins at python.org
Thu Sep 9 15:31:47 CEST 2010


Author: antoine.pitrou
Date: Thu Sep  9 15:31:46 2010
New Revision: 84650

Log:
Use transient_internet() where appropriate in test_ssl
(svn.python.org is sometimes unavailable)



Modified:
   python/branches/py3k/Lib/test/test_ssl.py

Modified: python/branches/py3k/Lib/test/test_ssl.py
==============================================================================
--- python/branches/py3k/Lib/test/test_ssl.py	(original)
+++ python/branches/py3k/Lib/test/test_ssl.py	Thu Sep  9 15:31:46 2010
@@ -305,63 +305,59 @@
 
 
 class NetworkedTests(unittest.TestCase):
-    def setUp(self):
-        self.old_timeout = socket.getdefaulttimeout()
-        socket.setdefaulttimeout(30)
-
-    def tearDown(self):
-        socket.setdefaulttimeout(self.old_timeout)
 
     def test_connect(self):
-        s = ssl.wrap_socket(socket.socket(socket.AF_INET),
-                            cert_reqs=ssl.CERT_NONE)
-        try:
-            s.connect(("svn.python.org", 443))
-            self.assertEqual({}, s.getpeercert())
-        finally:
-            s.close()
+        with support.transient_internet("svn.python.org"):
+            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
+                                cert_reqs=ssl.CERT_NONE)
+            try:
+                s.connect(("svn.python.org", 443))
+                self.assertEqual({}, s.getpeercert())
+            finally:
+                s.close()
 
-        # this should fail because we have no verification certs
-        s = ssl.wrap_socket(socket.socket(socket.AF_INET),
-                            cert_reqs=ssl.CERT_REQUIRED)
-        self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed",
-                                s.connect, ("svn.python.org", 443))
-        s.close()
-
-        # this should succeed because we specify the root cert
-        s = ssl.wrap_socket(socket.socket(socket.AF_INET),
-                            cert_reqs=ssl.CERT_REQUIRED,
-                            ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
-        try:
-            s.connect(("svn.python.org", 443))
-            self.assertTrue(s.getpeercert())
-        finally:
+            # this should fail because we have no verification certs
+            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
+                                cert_reqs=ssl.CERT_REQUIRED)
+            self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed",
+                                    s.connect, ("svn.python.org", 443))
             s.close()
 
+            # this should succeed because we specify the root cert
+            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
+                                cert_reqs=ssl.CERT_REQUIRED,
+                                ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
+            try:
+                s.connect(("svn.python.org", 443))
+                self.assertTrue(s.getpeercert())
+            finally:
+                s.close()
+
     def test_connect_with_context(self):
-        # Same as test_connect, but with a separately created context
-        ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
-        s = ctx.wrap_socket(socket.socket(socket.AF_INET))
-        s.connect(("svn.python.org", 443))
-        try:
-            self.assertEqual({}, s.getpeercert())
-        finally:
-            s.close()
-        # This should fail because we have no verification certs
-        ctx.verify_mode = ssl.CERT_REQUIRED
-        s = ctx.wrap_socket(socket.socket(socket.AF_INET))
-        self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed",
-                                s.connect, ("svn.python.org", 443))
-        s.close()
-        # This should succeed because we specify the root cert
-        ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
-        s = ctx.wrap_socket(socket.socket(socket.AF_INET))
-        s.connect(("svn.python.org", 443))
-        try:
-            cert = s.getpeercert()
-            self.assertTrue(cert)
-        finally:
+        with support.transient_internet("svn.python.org"):
+            # Same as test_connect, but with a separately created context
+            ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+            s = ctx.wrap_socket(socket.socket(socket.AF_INET))
+            s.connect(("svn.python.org", 443))
+            try:
+                self.assertEqual({}, s.getpeercert())
+            finally:
+                s.close()
+            # This should fail because we have no verification certs
+            ctx.verify_mode = ssl.CERT_REQUIRED
+            s = ctx.wrap_socket(socket.socket(socket.AF_INET))
+            self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed",
+                                    s.connect, ("svn.python.org", 443))
             s.close()
+            # This should succeed because we specify the root cert
+            ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
+            s = ctx.wrap_socket(socket.socket(socket.AF_INET))
+            s.connect(("svn.python.org", 443))
+            try:
+                cert = s.getpeercert()
+                self.assertTrue(cert)
+            finally:
+                s.close()
 
     def test_connect_capath(self):
         # Verify server certificates using the `capath` argument
@@ -369,104 +365,109 @@
         # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
         # contain both versions of each certificate (same content, different
         # filename) for this test to be portable across OpenSSL releases.
-        ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
-        ctx.verify_mode = ssl.CERT_REQUIRED
-        ctx.load_verify_locations(capath=CAPATH)
-        s = ctx.wrap_socket(socket.socket(socket.AF_INET))
-        s.connect(("svn.python.org", 443))
-        try:
-            cert = s.getpeercert()
-            self.assertTrue(cert)
-        finally:
-            s.close()
-        # Same with a bytes `capath` argument
-        ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
-        ctx.verify_mode = ssl.CERT_REQUIRED
-        ctx.load_verify_locations(capath=BYTES_CAPATH)
-        s = ctx.wrap_socket(socket.socket(socket.AF_INET))
-        s.connect(("svn.python.org", 443))
-        try:
-            cert = s.getpeercert()
-            self.assertTrue(cert)
-        finally:
-            s.close()
+        with support.transient_internet("svn.python.org"):
+            ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+            ctx.verify_mode = ssl.CERT_REQUIRED
+            ctx.load_verify_locations(capath=CAPATH)
+            s = ctx.wrap_socket(socket.socket(socket.AF_INET))
+            s.connect(("svn.python.org", 443))
+            try:
+                cert = s.getpeercert()
+                self.assertTrue(cert)
+            finally:
+                s.close()
+            # Same with a bytes `capath` argument
+            ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+            ctx.verify_mode = ssl.CERT_REQUIRED
+            ctx.load_verify_locations(capath=BYTES_CAPATH)
+            s = ctx.wrap_socket(socket.socket(socket.AF_INET))
+            s.connect(("svn.python.org", 443))
+            try:
+                cert = s.getpeercert()
+                self.assertTrue(cert)
+            finally:
+                s.close()
 
     @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
     def test_makefile_close(self):
         # Issue #5238: creating a file-like object with makefile() shouldn't
         # delay closing the underlying "real socket" (here tested with its
         # file descriptor, hence skipping the test under Windows).
-        ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
-        ss.connect(("svn.python.org", 443))
-        fd = ss.fileno()
-        f = ss.makefile()
-        f.close()
-        # The fd is still open
-        os.read(fd, 0)
-        # Closing the SSL socket should close the fd too
-        ss.close()
-        gc.collect()
-        with self.assertRaises(OSError) as e:
+        with support.transient_internet("svn.python.org"):
+            ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
+            ss.connect(("svn.python.org", 443))
+            fd = ss.fileno()
+            f = ss.makefile()
+            f.close()
+            # The fd is still open
             os.read(fd, 0)
-        self.assertEqual(e.exception.errno, errno.EBADF)
+            # Closing the SSL socket should close the fd too
+            ss.close()
+            gc.collect()
+            with self.assertRaises(OSError) as e:
+                os.read(fd, 0)
+            self.assertEqual(e.exception.errno, errno.EBADF)
 
     def test_non_blocking_handshake(self):
-        s = socket.socket(socket.AF_INET)
-        s.connect(("svn.python.org", 443))
-        s.setblocking(False)
-        s = ssl.wrap_socket(s,
-                            cert_reqs=ssl.CERT_NONE,
-                            do_handshake_on_connect=False)
-        count = 0
-        while True:
-            try:
-                count += 1
-                s.do_handshake()
-                break
-            except ssl.SSLError as err:
-                if err.args[0] == ssl.SSL_ERROR_WANT_READ:
-                    select.select([s], [], [])
-                elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE:
-                    select.select([], [s], [])
-                else:
-                    raise
-        s.close()
-        if support.verbose:
-            sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
+        with support.transient_internet("svn.python.org"):
+            s = socket.socket(socket.AF_INET)
+            s.connect(("svn.python.org", 443))
+            s.setblocking(False)
+            s = ssl.wrap_socket(s,
+                                cert_reqs=ssl.CERT_NONE,
+                                do_handshake_on_connect=False)
+            count = 0
+            while True:
+                try:
+                    count += 1
+                    s.do_handshake()
+                    break
+                except ssl.SSLError as err:
+                    if err.args[0] == ssl.SSL_ERROR_WANT_READ:
+                        select.select([s], [], [])
+                    elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE:
+                        select.select([], [s], [])
+                    else:
+                        raise
+            s.close()
+            if support.verbose:
+                sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
 
     def test_get_server_certificate(self):
-        pem = ssl.get_server_certificate(("svn.python.org", 443))
-        if not pem:
-            self.fail("No server certificate on svn.python.org:443!")
+        with support.transient_internet("svn.python.org"):
+            pem = ssl.get_server_certificate(("svn.python.org", 443))
+            if not pem:
+                self.fail("No server certificate on svn.python.org:443!")
 
-        try:
-            pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=CERTFILE)
-        except ssl.SSLError as x:
-            #should fail
-            if support.verbose:
-                sys.stdout.write("%s\n" % x)
-        else:
-            self.fail("Got server certificate %s for svn.python.org!" % pem)
+            try:
+                pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=CERTFILE)
+            except ssl.SSLError as x:
+                #should fail
+                if support.verbose:
+                    sys.stdout.write("%s\n" % x)
+            else:
+                self.fail("Got server certificate %s for svn.python.org!" % pem)
 
-        pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
-        if not pem:
-            self.fail("No server certificate on svn.python.org:443!")
-        if support.verbose:
-            sys.stdout.write("\nVerified certificate for svn.python.org:443 is\n%s\n" % pem)
+            pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
+            if not pem:
+                self.fail("No server certificate on svn.python.org:443!")
+            if support.verbose:
+                sys.stdout.write("\nVerified certificate for svn.python.org:443 is\n%s\n" % pem)
 
     def test_ciphers(self):
         remote = ("svn.python.org", 443)
-        s = ssl.wrap_socket(socket.socket(socket.AF_INET),
-                            cert_reqs=ssl.CERT_NONE, ciphers="ALL")
-        s.connect(remote)
-        s = ssl.wrap_socket(socket.socket(socket.AF_INET),
-                            cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT")
-        s.connect(remote)
-        # Error checking can happen at instantiation or when connecting
-        with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
+        with support.transient_internet(remote[0]):
+            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
+                                cert_reqs=ssl.CERT_NONE, ciphers="ALL")
+            s.connect(remote)
             s = ssl.wrap_socket(socket.socket(socket.AF_INET),
-                                cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
+                                cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT")
             s.connect(remote)
+            # Error checking can happen at instantiation or when connecting
+            with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
+                s = ssl.wrap_socket(socket.socket(socket.AF_INET),
+                                    cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
+                s.connect(remote)
 
     def test_algorithms(self):
         # Issue #8484: all algorithms should be available when verifying a


More information about the Python-checkins mailing list