[Python-checkins] r57513 - in python/trunk/Lib: ssl.py test/test_ssl.py

guido.van.rossum python-checkins at python.org
Sun Aug 26 21:35:10 CEST 2007


Author: guido.van.rossum
Date: Sun Aug 26 21:35:09 2007
New Revision: 57513

Modified:
   python/trunk/Lib/ssl.py
   python/trunk/Lib/test/test_ssl.py
Log:
Bill Janssen wrote:
Here's a patch which makes test_ssl a better player in the buildbots
environment.  I deep-ended on "try-except-else" clauses.


Modified: python/trunk/Lib/ssl.py
==============================================================================
--- python/trunk/Lib/ssl.py	(original)
+++ python/trunk/Lib/ssl.py	Sun Aug 26 21:35:09 2007
@@ -100,12 +100,13 @@
             # see if it's connected
             try:
                 socket.getpeername(self)
-                # yes
-                self._sslobj = _ssl.sslwrap(self._sock, 0, keyfile, certfile,
-                                            cert_reqs, ssl_version, ca_certs)
             except:
-                # no
+                # no, no connection yet
                 self._sslobj = None
+            else:
+                # yes, create the SSL object
+                self._sslobj = _ssl.sslwrap(self._sock, 0, keyfile, certfile,
+                                            cert_reqs, ssl_version, ca_certs)
         self.keyfile = keyfile
         self.certfile = certfile
         self.cert_reqs = cert_reqs

Modified: python/trunk/Lib/test/test_ssl.py
==============================================================================
--- python/trunk/Lib/test/test_ssl.py	(original)
+++ python/trunk/Lib/test/test_ssl.py	Sun Aug 26 21:35:09 2007
@@ -91,38 +91,66 @@
     def testTLSecho (self):
 
         s1 = socket.socket()
-        s1.connect(('127.0.0.1', 10024))
-        c1 = ssl.sslsocket(s1, ssl_version=ssl.PROTOCOL_TLSv1)
-        indata = "FOO\n"
-        c1.write(indata)
-        outdata = c1.read()
-        if outdata != indata.lower():
-            sys.stderr.write("bad data <<%s>> received\n" % data)
-        c1.close()
+        try:
+            s1.connect(('127.0.0.1', 10024))
+        except:
+            sys.stdout.write("connection failure:\n" + string.join(
+                traceback.format_exception(*sys.exc_info())))
+            raise test_support.TestFailed("Can't connect to test server")
+        else:
+            try:
+                c1 = ssl.sslsocket(s1, ssl_version=ssl.PROTOCOL_TLSv1)
+            except:
+                sys.stdout.write("SSL handshake failure:\n" + string.join(
+                    traceback.format_exception(*sys.exc_info())))
+                raise test_support.TestFailed("Can't SSL-handshake with test server")
+            else:
+                if not c1:
+                    raise test_support.TestFailed("Can't SSL-handshake with test server")
+                indata = "FOO\n"
+                c1.write(indata)
+                outdata = c1.read()
+                if outdata != indata.lower():
+                    raise test_support.TestFailed("bad data <<%s>> received; expected <<%s>>\n" % (data, indata.lower()))
+                c1.close()
 
     def testReadCert(self):
 
         s2 = socket.socket()
-        s2.connect(('127.0.0.1', 10024))
-        c2 = ssl.sslsocket(s2, ssl_version=ssl.PROTOCOL_TLSv1,
-                           cert_reqs=ssl.CERT_REQUIRED, ca_certs=CERTFILE)
-        cert = c2.getpeercert()
-        if not cert:
-            raise test_support.TestFailed("Can't get peer certificate.")
-        if not cert.has_key('subject'):
-            raise test_support.TestFailed(
-                "No subject field in certificate: %s." %
-                pprint.pformat(cert))
-        if not (cert['subject'].has_key('organizationName')):
-            raise test_support.TestFailed(
-                "No 'organizationName' field in certificate subject: %s." %
-                pprint.pformat(cert))
-        if (cert['subject']['organizationName'] !=
-              "Python Software Foundation"):
-            raise test_support.TestFailed(
-                "Invalid 'organizationName' field in certificate subject; "
-                "should be 'Python Software Foundation'.");
-        c2.close()
+        try:
+            s2.connect(('127.0.0.1', 10024))
+        except:
+            sys.stdout.write("connection failure:\n" + string.join(
+                traceback.format_exception(*sys.exc_info())))
+            raise test_support.TestFailed("Can't connect to test server")
+        else:
+            try:
+                c2 = ssl.sslsocket(s2, ssl_version=ssl.PROTOCOL_TLSv1,
+                                   cert_reqs=ssl.CERT_REQUIRED, ca_certs=CERTFILE)
+            except:
+                sys.stdout.write("SSL handshake failure:\n" + string.join(
+                    traceback.format_exception(*sys.exc_info())))
+                raise test_support.TestFailed("Can't SSL-handshake with test server")
+            else:
+                if not c2:
+                    raise test_support.TestFailed("Can't SSL-handshake with test server")
+                cert = c2.getpeercert()
+                if not cert:
+                    raise test_support.TestFailed("Can't get peer certificate.")
+                if not cert.has_key('subject'):
+                    raise test_support.TestFailed(
+                        "No subject field in certificate: %s." %
+                        pprint.pformat(cert))
+                if not (cert['subject'].has_key('organizationName')):
+                    raise test_support.TestFailed(
+                        "No 'organizationName' field in certificate subject: %s." %
+                        pprint.pformat(cert))
+                if (cert['subject']['organizationName'] !=
+                      "Python Software Foundation"):
+                    raise test_support.TestFailed(
+                        "Invalid 'organizationName' field in certificate subject; "
+                        "should be 'Python Software Foundation'.");
+                c2.close()
 
 
 class threadedEchoServer(threading.Thread):
@@ -138,10 +166,22 @@
 
         def run (self):
             self.running = True
-            sslconn = ssl.sslsocket(self.sock, server_side=True,
-                                    certfile=self.server.certificate,
-                                    ssl_version=self.server.protocol,
-                                    cert_reqs=self.server.certreqs)
+            try:
+                sslconn = ssl.sslsocket(self.sock, server_side=True,
+                                        certfile=self.server.certificate,
+                                        ssl_version=self.server.protocol,
+                                        cert_reqs=self.server.certreqs)
+            except:
+                # here, we want to stop the server, because this shouldn't
+                # happen in the context of our test case
+                sys.stdout.write("Test server failure:\n" + string.join(
+                    traceback.format_exception(*sys.exc_info())))
+                self.running = False
+                # normally, we'd just stop here, but for the test
+                # harness, we want to stop the server
+                self.server.stop()
+                return
+
             while self.running:
                 try:
                     msg = sslconn.read()
@@ -154,15 +194,18 @@
                         self.server.stop()
                         self.running = False
                     else:
-                        # print "server:", msg.strip().lower()
+                        sys.stdout.write("\nserver: %s\n" % msg.strip().lower())
                         sslconn.write(msg.lower())
                 except ssl.sslerror:
-                    sys.stderr.write(string.join(
+                    sys.stdout.write("Test server failure:\n" + string.join(
                         traceback.format_exception(*sys.exc_info())))
                     sslconn.close()
                     self.running = False
+                    # normally, we'd just stop here, but for the test
+                    # harness, we want to stop the server
+                    self.server.stop()
                 except:
-                    sys.stderr.write(string.join(
+                    sys.stdout.write(string.join(
                         traceback.format_exception(*sys.exc_info())))
 
     def __init__(self, port, certificate, ssl_version=None,
@@ -192,20 +235,20 @@
         while self.active:
             try:
                 newconn, connaddr = self.sock.accept()
-                # sys.stderr.write('new connection from ' + str(connaddr))
+                sys.stdout.write('\nserver:  new connection from ' + str(connaddr) + '\n')
                 handler = self.connectionHandler(self, newconn)
                 handler.start()
             except socket.timeout:
                 pass
             except KeyboardInterrupt:
-                self.active = False
+                self.stop()
             except:
-                sys.stderr.write(string.join(
+                sys.stdout.write("Test server failure:\n" + string.join(
                     traceback.format_exception(*sys.exc_info())))
 
     def stop (self):
         self.active = False
-
+        self.sock.close()
 
 CERTFILE_CONFIG_TEMPLATE = """
 # create RSA certs - Server


More information about the Python-checkins mailing list