[Python-checkins] cpython: Issue #4473: Add a POP3.stls() to switch a clear-text POP3 session into an

antoine.pitrou python-checkins at python.org
Fri Nov 23 20:15:10 CET 2012


http://hg.python.org/cpython/rev/2329f9198d7f
changeset:   80575:2329f9198d7f
user:        Antoine Pitrou <solipsis at pitrou.net>
date:        Fri Nov 23 20:13:48 2012 +0100
summary:
  Issue #4473: Add a POP3.stls() to switch a clear-text POP3 session into an encrypted POP3 session, on supported servers.
Patch by Lorenzo Catucci.

files:
  Doc/library/poplib.rst  |   19 +++-
  Lib/poplib.py           |   48 ++++++++-
  Lib/test/test_poplib.py |  145 ++++++++++++++++++++++-----
  Misc/NEWS               |    3 +
  4 files changed, 176 insertions(+), 39 deletions(-)


diff --git a/Doc/library/poplib.rst b/Doc/library/poplib.rst
--- a/Doc/library/poplib.rst
+++ b/Doc/library/poplib.rst
@@ -13,8 +13,11 @@
 --------------
 
 This module defines a class, :class:`POP3`, which encapsulates a connection to a
-POP3 server and implements the protocol as defined in :rfc:`1725`.  The
-:class:`POP3` class supports both the minimal and optional command sets.
+POP3 server and implements the protocol as defined in :rfc:`1939`. The
+:class:`POP3` class supports both the minimal and optional command sets from
+:rfc:`1939`. The :class:`POP3` class also supports the `STLS` command introduced
+in :rfc:`2595` to enable encrypted communication on an already established connection.
+
 Additionally, this module provides a class :class:`POP3_SSL`, which provides
 support for connecting to POP3 servers that use SSL as an underlying protocol
 layer.
@@ -184,6 +187,18 @@
    the unique id for that message in the form ``'response mesgnum uid``, otherwise
    result is list ``(response, ['mesgnum uid', ...], octets)``.
 
+.. method:: POP3.stls(context=None)
+
+   Start a TLS session on the active connection as specified in :rfc:`2595`.
+   This is only allowed before user authentication
+
+   *context* parameter is a :class:`ssl.SSLContext` object which allows
+   bundling SSL configuration options, certificates and private keys into
+   a single (potentially long-lived) structure.
+
+   .. versionadded:: 3.4
+
+
 Instances of :class:`POP3_SSL` have no additional methods. The interface of this
 subclass is identical to its parent.
 
diff --git a/Lib/poplib.py b/Lib/poplib.py
--- a/Lib/poplib.py
+++ b/Lib/poplib.py
@@ -15,6 +15,12 @@
 
 import re, socket
 
+try:
+    import ssl
+    HAVE_SSL = True
+except ImportError:
+    HAVE_SSL = False
+
 __all__ = ["POP3","error_proto"]
 
 # Exception raised when an error or invalid response is received:
@@ -56,6 +62,7 @@
             TOP msg n               top(msg, n)
             UIDL [msg]              uidl(msg = None)
             CAPA                    capa()
+            STLS                    stls()
 
     Raises one exception: 'error_proto'.
 
@@ -82,6 +89,7 @@
                  timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
         self.host = host
         self.port = port
+        self._tls_established = False
         self.sock = self._create_socket(timeout)
         self.file = self.sock.makefile('rb')
         self._debugging = 0
@@ -352,21 +360,42 @@
             raise error_proto('-ERR CAPA not supported by server')
         return caps
 
-try:
-    import ssl
-except ImportError:
-    pass
-else:
+
+    def stls(self, context=None):
+        """Start a TLS session on the active connection as specified in RFC 2595.
+
+                context - a ssl.SSLContext
+        """
+        if not HAVE_SSL:
+            raise error_proto('-ERR TLS support missing')
+        if self._tls_established:
+            raise error_proto('-ERR TLS session already established')
+        caps = self.capa()
+        if not 'STLS' in caps:
+            raise error_proto('-ERR STLS not supported by server')
+        if context is None:
+            context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+            context.options |= ssl.OP_NO_SSLv2
+        resp = self._shortcmd('STLS')
+        self.sock = context.wrap_socket(self.sock)
+        self.file = self.sock.makefile('rb')
+        self._tls_established = True
+        return resp
+
+
+if HAVE_SSL:
 
     class POP3_SSL(POP3):
         """POP3 client class over SSL connection
 
-        Instantiate with: POP3_SSL(hostname, port=995, keyfile=None, certfile=None)
+        Instantiate with: POP3_SSL(hostname, port=995, keyfile=None, certfile=None,
+                                   context=None)
 
                hostname - the hostname of the pop3 over ssl server
                port - port number
                keyfile - PEM formatted file that countains your private key
                certfile - PEM formatted certificate chain file
+               context - a ssl.SSLContext
 
         See the methods of the parent class POP3 for more documentation.
         """
@@ -392,6 +421,13 @@
                 sock = ssl.wrap_socket(sock, self.keyfile, self.certfile)
             return sock
 
+        def stls(self, keyfile=None, certfile=None, context=None):
+            """The method unconditionally raises an exception since the
+            STLS command doesn't make any sense on an already established
+            SSL/TLS session.
+            """
+            raise error_proto('-ERR TLS session already established')
+
     __all__.append("POP3_SSL")
 
 if __name__ == "__main__":
diff --git a/Lib/test/test_poplib.py b/Lib/test/test_poplib.py
--- a/Lib/test/test_poplib.py
+++ b/Lib/test/test_poplib.py
@@ -18,6 +18,13 @@
 HOST = test_support.HOST
 PORT = 0
 
+SUPPORTS_SSL = False
+if hasattr(poplib, 'POP3_SSL'):
+    import ssl
+
+    SUPPORTS_SSL = True
+    CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert.pem")
+
 # the dummy data returned by server when LIST and RETR commands are issued
 LIST_RESP = b'1 1\r\n2 2\r\n3 3\r\n4 4\r\n5 5\r\n.\r\n'
 RETR_RESP = b"""From: postmaster at python.org\
@@ -40,6 +47,8 @@
         self.set_terminator(b"\r\n")
         self.in_buffer = []
         self.push('+OK dummy pop3 server ready. <timestamp>')
+        self.tls_active = False
+        self.tls_starting = False
 
     def collect_incoming_data(self, data):
         self.in_buffer.append(data)
@@ -114,16 +123,65 @@
         self.push('+OK closing.')
         self.close_when_done()
 
+    def _get_capas(self):
+        _capas = dict(self.CAPAS)
+        if not self.tls_active and SUPPORTS_SSL:
+            _capas['STLS'] = []
+        return _capas
+
     def cmd_capa(self, arg):
         self.push('+OK Capability list follows')
-        if self.CAPAS:
-            for cap, params in self.CAPAS.items():
+        if self._get_capas():
+            for cap, params in self._get_capas().items():
                 _ln = [cap]
                 if params:
                     _ln.extend(params)
                 self.push(' '.join(_ln))
         self.push('.')
 
+    if SUPPORTS_SSL:
+
+        def cmd_stls(self, arg):
+            if self.tls_active is False:
+                self.push('+OK Begin TLS negotiation')
+                tls_sock = ssl.wrap_socket(self.socket, certfile=CERTFILE,
+                                           server_side=True,
+                                           do_handshake_on_connect=False,
+                                           suppress_ragged_eofs=False)
+                self.del_channel()
+                self.set_socket(tls_sock)
+                self.tls_active = True
+                self.tls_starting = True
+                self.in_buffer = []
+                self._do_tls_handshake()
+            else:
+                self.push('-ERR Command not permitted when TLS active')
+
+        def _do_tls_handshake(self):
+            try:
+                self.socket.do_handshake()
+            except ssl.SSLError as err:
+                if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
+                                   ssl.SSL_ERROR_WANT_WRITE):
+                    return
+                elif err.args[0] == ssl.SSL_ERROR_EOF:
+                    return self.handle_close()
+                raise
+            except socket.error as err:
+                if err.args[0] == errno.ECONNABORTED:
+                    return self.handle_close()
+            else:
+                self.tls_active = True
+                self.tls_starting = False
+
+        def handle_read(self):
+            if self.tls_starting:
+                self._do_tls_handshake()
+            else:
+                try:
+                    asynchat.async_chat.handle_read(self)
+                except ssl.SSLEOFError:
+                    self.handle_close()
 
 class DummyPOP3Server(asyncore.dispatcher, threading.Thread):
 
@@ -254,13 +312,25 @@
         self.assertIsNone(self.client.sock)
         self.assertIsNone(self.client.file)
 
+    if SUPPORTS_SSL:
 
-SUPPORTS_SSL = False
-if hasattr(poplib, 'POP3_SSL'):
-    import ssl
+        def test_stls_capa(self):
+            capa = self.client.capa()
+            self.assertTrue('STLS' in capa.keys())
 
-    SUPPORTS_SSL = True
-    CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert.pem")
+        def test_stls(self):
+            expected = b'+OK Begin TLS negotiation'
+            resp = self.client.stls()
+            self.assertEqual(resp, expected)
+
+        def test_stls_context(self):
+            expected = b'+OK Begin TLS negotiation'
+            ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+            resp = self.client.stls(context=ctx)
+            self.assertEqual(resp, expected)
+
+
+if SUPPORTS_SSL:
 
     class DummyPOP3_SSLHandler(DummyPOP3Handler):
 
@@ -272,34 +342,13 @@
             self.del_channel()
             self.set_socket(ssl_socket)
             # Must try handshake before calling push()
-            self._ssl_accepting = True
-            self._do_ssl_handshake()
+            self.tls_active = True
+            self.tls_starting = True
+            self._do_tls_handshake()
             self.set_terminator(b"\r\n")
             self.in_buffer = []
             self.push('+OK dummy pop3 server ready. <timestamp>')
 
-        def _do_ssl_handshake(self):
-            try:
-                self.socket.do_handshake()
-            except ssl.SSLError as err:
-                if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
-                                   ssl.SSL_ERROR_WANT_WRITE):
-                    return
-                elif err.args[0] == ssl.SSL_ERROR_EOF:
-                    return self.handle_close()
-                raise
-            except socket.error as err:
-                if err.args[0] == errno.ECONNABORTED:
-                    return self.handle_close()
-            else:
-                self._ssl_accepting = False
-
-        def handle_read(self):
-            if self._ssl_accepting:
-                self._do_ssl_handshake()
-            else:
-                DummyPOP3Handler.handle_read(self)
-
 
     class TestPOP3_SSLClass(TestPOP3Class):
         # repeat previous tests by using poplib.POP3_SSL
@@ -330,6 +379,39 @@
             self.assertIs(self.client.sock.context, ctx)
             self.assertTrue(self.client.noop().startswith(b'+OK'))
 
+        def test_stls(self):
+            self.assertRaises(poplib.error_proto, self.client.stls)
+
+        test_stls_context = test_stls
+
+        def test_stls_capa(self):
+            capa = self.client.capa()
+            self.assertFalse('STLS' in capa.keys())
+
+
+    class TestPOP3_TLSClass(TestPOP3Class):
+        # repeat previous tests by using poplib.POP3.stls()
+
+        def setUp(self):
+            self.server = DummyPOP3Server((HOST, PORT))
+            self.server.start()
+            self.client = poplib.POP3(self.server.host, self.server.port, timeout=3)
+            self.client.stls()
+
+        def tearDown(self):
+            if self.client.file is not None and self.client.sock is not None:
+                self.client.quit()
+            self.server.stop()
+
+        def test_stls(self):
+            self.assertRaises(poplib.error_proto, self.client.stls)
+
+        test_stls_context = test_stls
+
+        def test_stls_capa(self):
+            capa = self.client.capa()
+            self.assertFalse(b'STLS' in capa.keys())
+
 
 class TestTimeouts(TestCase):
 
@@ -389,6 +471,7 @@
     tests = [TestPOP3Class, TestTimeouts]
     if SUPPORTS_SSL:
         tests.append(TestPOP3_SSLClass)
+        tests.append(TestPOP3_TLSClass)
     thread_info = test_support.threading_setup()
     try:
         test_support.run_unittest(*tests)
diff --git a/Misc/NEWS b/Misc/NEWS
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -138,6 +138,9 @@
 Library
 -------
 
+- Issue #4473: Add a POP3.stls() to switch a clear-text POP3 session into
+  an encrypted POP3 session, on supported servers.  Patch by Lorenzo Catucci.
+
 - Issue #4473: Add a POP3.capa() method to query the capabilities advertised
   by the POP3 server.  Patch by Lorenzo Catucci.
 

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list