spambayes/spambayes/test test_sb_imapfilter.py, 1.1, 1.2

Tony Meyer anadelonbrin at users.sourceforge.net
Mon Aug 9 09:45:05 CEST 2004

Update of /cvsroot/spambayes/spambayes/spambayes/test
In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv10014/spambayes/test

Modified Files:
Log Message:
No new tests, but a better system.

Use a dummy IMAP server like test_sb-server.py does, so that we can control what
returns and don't need to have an IMAP server available to run the tests.

Index: test_sb_imapfilter.py
RCS file: /cvsroot/spambayes/spambayes/spambayes/test/test_sb_imapfilter.py,v
retrieving revision 1.1
retrieving revision 1.2
diff -C2 -d -r1.1 -r1.2
*** test_sb_imapfilter.py	4 Aug 2004 08:36:32 -0000	1.1
--- test_sb_imapfilter.py	9 Aug 2004 07:45:02 -0000	1.2
*** 7,31 ****
  import sys
  import time
  import imaplib
  import unittest
  import sb_test_support
  from spambayes.Options import options
  from sb_imapfilter import BadIMAPResponseError
  from sb_imapfilter import IMAPSession, IMAPMessage, IMAPFolder
! IMAP_SERVER = "mail.madsods.gen.nz"
! IMAP_PORT = 143
! IMAP_USERNAME = "test_account+madsods.gen.nz"
! IMAP_FOLDER_LIST = ["INBOX", "unsure", "ham_to_train", "spam",
!                     "spam_to_train", ".mailboxlist"]
! class IMAPSessionTest(unittest.TestCase):
      def setUp(self):
!         self.imap = IMAPSession(IMAP_SERVER, IMAP_PORT)
      def tearDown(self):
--- 7,176 ----
  import sys
  import time
+ import types
+ import socket
+ import thread
  import imaplib
  import unittest
+ import asyncore
  import sb_test_support
+ from spambayes import Dibbler
  from spambayes.Options import options
  from sb_imapfilter import BadIMAPResponseError
  from sb_imapfilter import IMAPSession, IMAPMessage, IMAPFolder
! IMAP_PORT = 8143
! IMAP_USERNAME = "testu"
! IMAP_PASSWORD = "testp"
! IMAP_FOLDER_LIST = ["INBOX", "unsure", "ham_to_train", "spam"]
! # Key is UID.
! IMAP_MESSAGES = {101 : """Subject: Test\r\n\r\nBody test.""",
!                  102 : """Subject: Test2\r\n\r\nAnother body test."""}
! # Map of ID -> UID
! IMAP_UIDS = {1 : 101, 2: 102}
! class TestListener(Dibbler.Listener):
!     """Listener for TestIMAP4Server.  Works on port 8143, to co-exist
!     with real IMAP4 servers."""
!     def __init__(self, socketMap=asyncore.socket_map):
!         Dibbler.Listener.__init__(self, IMAP_PORT, TestIMAP4Server,
!                                   (socketMap,), socketMap=socketMap)
! class TestIMAP4Server(Dibbler.BrighterAsyncChat):
!     """Minimal IMAP4 server, for testing purposes.  Accepts a limited
!     subset of commands, and also a KILL command, to terminate."""
!     def __init__(self, clientSocket, socketMap):
!         # Grumble: asynchat.__init__ doesn't take a 'map' argument,
!         # hence the two-stage construction.
!         Dibbler.BrighterAsyncChat.__init__(self)
!         Dibbler.BrighterAsyncChat.set_socket(self, clientSocket, socketMap)
!         self.set_terminator('\r\n')
!         # okCommands are just ignored (we pass back a happy this-was-fine
!         # answer, and do nothing.
!         self.okCommands = ['NOOP', 'LOGOUT', 'CAPABILITY', 'KILL']
!         # These commands actually result in something.
!         self.handlers = {'LIST' : self.onList,
!                          'LOGIN' : self.onLogin,
!                          'SELECT' : self.onSelect,
!                          'FETCH' : self.onFetch,
!                          'UID' : self.onUID,
!                          }
!         self.push("* OK [CAPABILITY IMAP4REV1 AUTH=LOGIN] " \
!                   "localhost IMAP4rev1\r\n")
!         self.request = ''
!     def collect_incoming_data(self, data):
!         """Asynchat override."""
!         self.request = self.request + data
!     def found_terminator(self):
!         """Asynchat override."""
!         id, command = self.request.split(None, 1)
!         if ' ' in command:
!             command, args = command.split(None, 1)
!         else:
!             args = ''
!         command = command.upper()
!         if command in self.okCommands:
!             self.push("%s OK (we hope)\r\n" % (id,))
!             if command == 'LOGOUT':
!                 self.close_when_done()
!             if command == 'KILL':
!                 self.socket.shutdown(2)
!                 self.close()
!                 raise SystemExit()
!         else:
!             handler = self.handlers.get(command, self.onUnknown)
!             self.push(handler(id, command, args, False))  # Or push_slowly for testing
!         self.request = ''
!     def push_slowly(self, response):
!         """Useful for testing."""
!         for c in response:
!             self.push(c)
!             time.sleep(0.02)
!     def onLogin(self, id, command, args, uid=False):
!         """Log in to server."""
!         username, password = args.split(None, 1)
!         username = username.strip('"')
!         password = password.strip('"')
!         if username == IMAP_USERNAME and password == IMAP_PASSWORD:
!             return "%s OK [CAPABILITY IMAP4REV1] User %s " \
!                    "authenticated.\r\n" % (id, username)
!         return "%s NO LOGIN failed\r\n" % (id,)
!     def onList(self, id, command, args, uid=False):
!         """Return list of folders."""
!         base = '\r\n* LIST (\\NoInferiors \\UnMarked) "/" '
!         return "%s%s\r\n%s OK LIST completed\r\n" % \
!                (base[2:], base.join(IMAP_FOLDER_LIST), id)
!     def onSelect(self, id, command, args, uid=False):
!         exists = "* %d EXISTS" % (len(IMAP_MESSAGES),)
!         recent = "* 0 RECENT"
!         uidv = "* OK [UIDVALIDITY 1091599302] UID validity status"
!         next_uid = "* OK [UIDNEXT 23] Predicted next UID"
!         flags = "* FLAGS (\Answered \Flagged \Deleted \Draft \Seen)"
!         perm_flags = "* OK [PERMANENTFLAGS (\* \Answered \Flagged " \
!                      "\Deleted \Draft \Seen)] Permanent flags"
!         complete = "%s OK [READ-WRITE] SELECT completed" % (id,)
!         return "%s\r\n" % ("\r\n".join([exists, recent, uidv, next_uid,
!                                         flags, perm_flags, complete]),)
!     def onFetch(self, id, command, args, uid=False):
!         msg_nums, msg_parts = args.split(None, 1)
!         msg_nums = msg_nums.split()
!         response = {}
!         for msg in msg_nums:
!             response[msg] = []
!         if "UID" in msg_parts:
!             if uid:
!                 for msg in msg_nums:
!                     response[msg].append("FETCH (UID %s)" % (msg,))
!             else:
!                 for msg in msg_nums:
!                     response[msg].append("FETCH (UID %s)" %
!                                          (IMAP_UIDS[int(msg)]))
!         if "BODY.PEEK[]" in msg_parts:
!             for msg in msg_nums:
!                 if uid:
!                     msg_uid = int(msg)
!                 else:
!                     msg_uid = IMAP_UIDS[int(msg)]
!                 response[msg].append(("FETCH (BODY[] {%s}" %
!                                      (len(IMAP_MESSAGES[msg_uid])),
!                                      IMAP_MESSAGES[msg_uid]))
!         for msg in msg_nums:
!             try:
!                 simple = " ".join(response[msg])
!             except TypeError:
!                 simple = []
!                 for part in response[msg]:
!                     if isinstance(part, types.StringTypes):
!                         simple.append(part)
!                     else:
!                         simple.append('%s\r\n%s)' % (part[0], part[1]))
!                 simple = " ".join(simple)
!             response[msg] = "* %s %s" % (msg, simple)
!         response_text = "\r\n".join(response.values())
!         return "%s\r\n%s OK FETCH completed\r\n" % (response_text, id)
!     def onUID(self, id, command, args, uid=False):
!         actual_command, args = args.split(None, 1)
!         handler = self.handlers.get(actual_command, self.onUnknown)
!         return handler(id, command, args, uid=True)
!     def onUnknown(self, id, command, args, uid=False):
!         """Unknown IMAP4 command."""
!         return "%s BAD Command unrecognised: %s\r\n" % (id, repr(command))
! class BaseIMAPFilterTest(unittest.TestCase):
      def setUp(self):
!         self.imap = IMAPSession("localhost", IMAP_PORT)
      def tearDown(self):
*** 35,38 ****
--- 180,185 ----
+ class IMAPSessionTest(BaseIMAPFilterTest):
      def testGoodLogin(self):
          self.imap.login(IMAP_USERNAME, IMAP_PASSWORD)
*** 81,85 ****
          folders = self.imap.folder_list()
!         self.assertEqual(folders, IMAP_FOLDER_LIST)
      def test_extract_fetch_data(self):
--- 228,234 ----
          folders = self.imap.folder_list()
!         correct = IMAP_FOLDER_LIST[:]
!         correct.sort()
!         self.assertEqual(folders, correct)
      def test_extract_fetch_data(self):
*** 127,141 ****
! class IMAPMessageTest(unittest.TestCase):
      def setUp(self):
!         imap = IMAPSession(IMAP_SERVER, IMAP_PORT)
          self.msg = IMAPMessage()
!         self.msg.imap_server = imap
!     def tearDown(self):
!         try:
!             self.msg.imap_server.logout()
!         except imaplib.error:
!             pass
      # These tests might fail if more than one second passes
--- 276,284 ----
! class IMAPMessageTest(BaseIMAPFilterTest):
      def setUp(self):
!         BaseIMAPFilterTest.setUp(self)
          self.msg = IMAPMessage()
!         self.msg.imap_server = self.imap
      # These tests might fail if more than one second passes
*** 210,212 ****
--- 353,359 ----
  if __name__=='__main__':
+     def runTestServer():
+         TestListener()
+         asyncore.loop()
+     thread.start_new_thread(runTestServer, ())
      sb_test_support.unittest_main(argv=sys.argv + ['suite'])

