[Python-checkins] r56533 - python/trunk/Lib/test/test_smtplib.py

facundo.batista python-checkins at python.org
Tue Jul 24 23:20:42 CEST 2007


Author: facundo.batista
Date: Tue Jul 24 23:20:42 2007
New Revision: 56533

Modified:
   python/trunk/Lib/test/test_smtplib.py
Log:

New tests for basic behavior of smtplib.SMTP and
smtpd.DebuggingServer. Change to use global host & port number
variables. Modified the 'server' to take a string to send back in
order to vary test server responses. Added a test for the reaction of
smtplib.SMTP to a non-200 HELO response. [GSoC - Alan McIntyre]


Modified: python/trunk/Lib/test/test_smtplib.py
==============================================================================
--- python/trunk/Lib/test/test_smtplib.py	(original)
+++ python/trunk/Lib/test/test_smtplib.py	Tue Jul 24 23:20:42 2007
@@ -1,24 +1,40 @@
+import asyncore
 import socket
 import threading
+import smtpd
 import smtplib
+import StringIO
+import sys
 import time
+import select
 
 from unittest import TestCase
 from test import test_support
 
+HOST = "localhost"
+PORT = 54328
 
-def server(evt):
+def server(evt, buf):
     serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     serv.settimeout(3)
     serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-    serv.bind(("", 9091))
+    serv.bind(("", PORT))
     serv.listen(5)
     try:
         conn, addr = serv.accept()
     except socket.timeout:
         pass
     else:
-        conn.send("220 Hola mundo\n")
+        n = 200
+        while buf and n > 0:
+            r, w, e = select.select([], [conn], [])
+            if w:
+                sent = conn.send(buf)
+                buf = buf[sent:]
+
+            n -= 1
+            time.sleep(0.01)
+
         conn.close()
     finally:
         serv.close()
@@ -28,26 +44,42 @@
 
     def setUp(self):
         self.evt = threading.Event()
-        threading.Thread(target=server, args=(self.evt,)).start()
+        servargs = (self.evt, "220 Hola mundo\n")
+        threading.Thread(target=server, args=servargs).start()
         time.sleep(.1)
 
     def tearDown(self):
         self.evt.wait()
 
-    def testBasic(self):
+    def testBasic1(self):
         # connects
-        smtp = smtplib.SMTP("localhost", 9091)
+        smtp = smtplib.SMTP(HOST, PORT)
         smtp.sock.close()
 
+    def testBasic2(self):
+        # connects, include port in host name
+        smtp = smtplib.SMTP("%s:%s" % (HOST, PORT))
+        smtp.sock.close()
+
+    def testLocalHostName(self):
+        # check that supplied local_hostname is used
+        smtp = smtplib.SMTP(HOST, PORT, local_hostname="testhost")
+        self.assertEqual(smtp.local_hostname, "testhost")
+        smtp.sock.close()
+
+    def testNonnumericPort(self):
+        # check that non-numeric port raises ValueError
+        self.assertRaises(socket.error, smtplib.SMTP, "localhost", "bogus")
+
     def testTimeoutDefault(self):
         # default
-        smtp = smtplib.SMTP("localhost", 9091)
+        smtp = smtplib.SMTP(HOST, PORT)
         self.assertTrue(smtp.sock.gettimeout() is None)
         smtp.sock.close()
 
     def testTimeoutValue(self):
         # a value
-        smtp = smtplib.SMTP("localhost", 9091, timeout=30)
+        smtp = smtplib.SMTP(HOST, PORT, timeout=30)
         self.assertEqual(smtp.sock.gettimeout(), 30)
         smtp.sock.close()
 
@@ -56,16 +88,95 @@
         previous = socket.getdefaulttimeout()
         socket.setdefaulttimeout(30)
         try:
-            smtp = smtplib.SMTP("localhost", 9091, timeout=None)
+            smtp = smtplib.SMTP(HOST, PORT, timeout=None)
         finally:
             socket.setdefaulttimeout(previous)
         self.assertEqual(smtp.sock.gettimeout(), 30)
         smtp.sock.close()
 
 
+# Test server using smtpd.DebuggingServer
+def debugging_server(evt):
+    serv = smtpd.DebuggingServer(("", PORT), ('nowhere', -1))
+
+    try:
+        asyncore.loop(timeout=.01, count=300)
+    except socket.timeout:
+        pass
+    finally:
+        # allow some time for the client to read the result
+        time.sleep(0.5)
+        asyncore.close_all()
+        evt.set()
+
+MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n'
+MSG_END = '------------ END MESSAGE ------------\n'
+
+# Test behavior of smtpd.DebuggingServer
+class DebuggingServerTests(TestCase):
+
+    def setUp(self):
+        self.old_stdout = sys.stdout
+        self.output = StringIO.StringIO()
+        sys.stdout = self.output
+
+        self.evt = threading.Event()
+        threading.Thread(target=debugging_server, args=(self.evt,)).start()
+        time.sleep(.5)
+
+    def tearDown(self):
+        self.evt.wait()
+        sys.stdout = self.old_stdout
+
+    def testBasic(self):
+        # connect
+        smtp = smtplib.SMTP(HOST, PORT)
+        smtp.sock.close()
+
+    def testEHLO(self):
+        smtp = smtplib.SMTP(HOST, PORT)
+        self.assertEqual(smtp.ehlo(), (502, 'Error: command "EHLO" not implemented'))
+        smtp.sock.close()
+
+    def testHELP(self):
+        smtp = smtplib.SMTP(HOST, PORT)
+        self.assertEqual(smtp.help(), 'Error: command "HELP" not implemented')
+        smtp.sock.close()
+
+    def testSend(self):
+        # connect and send mail
+        m = 'A test message'
+        smtp = smtplib.SMTP(HOST, PORT)
+        smtp.sendmail('John', 'Sally', m)
+        smtp.sock.close()
+
+        self.evt.wait()
+        self.output.flush()
+        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
+        self.assertEqual(self.output.getvalue(), mexpect)
+
+
+class BadHELOServerTests(TestCase):
+
+    def setUp(self):
+        self.old_stdout = sys.stdout
+        self.output = StringIO.StringIO()
+        sys.stdout = self.output
+
+        self.evt = threading.Event()
+        servargs = (self.evt, "199 no hello for you!\n")
+        threading.Thread(target=server, args=servargs).start()
+        time.sleep(.5)
+
+    def tearDown(self):
+        self.evt.wait()
+        sys.stdout = self.old_stdout
+
+    def testFailingHELO(self):
+        self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP, HOST, PORT)
 
 def test_main(verbose=None):
-    test_support.run_unittest(GeneralTests)
+    test_support.run_unittest(GeneralTests, DebuggingServerTests, BadHELOServerTests)
 
 if __name__ == '__main__':
     test_main()


More information about the Python-checkins mailing list