[Spambayes-checkins]
spambayes/spambayes/test test_sb_imapfilter.py, 1.1, 1.2
Tony Meyer
anadelonbrin at users.sourceforge.net
Mon Aug 9 09:45:05 CEST 2004
Update of /cvsroot/spambayes/spambayes/spambayes/test
In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv10014/spambayes/test
Modified Files:
test_sb_imapfilter.py
Log Message:
No new tests, but a better system.
Use a dummy IMAP server like test_sb-server.py does, so that we can control what
returns and don't need to have an IMAP server available to run the tests.
Index: test_sb_imapfilter.py
===================================================================
RCS file: /cvsroot/spambayes/spambayes/spambayes/test/test_sb_imapfilter.py,v
retrieving revision 1.1
retrieving revision 1.2
diff -C2 -d -r1.1 -r1.2
*** test_sb_imapfilter.py 4 Aug 2004 08:36:32 -0000 1.1
--- test_sb_imapfilter.py 9 Aug 2004 07:45:02 -0000 1.2
***************
*** 7,31 ****
import sys
import time
import imaplib
import unittest
import sb_test_support
sb_test_support.fix_sys_path()
from spambayes.Options import options
from sb_imapfilter import BadIMAPResponseError
from sb_imapfilter import IMAPSession, IMAPMessage, IMAPFolder
! IMAP_SERVER = "mail.madsods.gen.nz"
! IMAP_PORT = 143
! IMAP_USERNAME = "test_account+madsods.gen.nz"
! IMAP_PASSWORD = ""
! IMAP_FOLDER_LIST = ["INBOX", "unsure", "ham_to_train", "spam",
! "spam_to_train", ".mailboxlist"]
! IMAP_FOLDER_LIST.sort()
! class IMAPSessionTest(unittest.TestCase):
def setUp(self):
! self.imap = IMAPSession(IMAP_SERVER, IMAP_PORT)
def tearDown(self):
--- 7,176 ----
import sys
import time
+ import types
+ import socket
+ import thread
import imaplib
import unittest
+ import asyncore
import sb_test_support
sb_test_support.fix_sys_path()
+ from spambayes import Dibbler
from spambayes.Options import options
from sb_imapfilter import BadIMAPResponseError
from sb_imapfilter import IMAPSession, IMAPMessage, IMAPFolder
! IMAP_PORT = 8143
! IMAP_USERNAME = "testu"
! IMAP_PASSWORD = "testp"
! IMAP_FOLDER_LIST = ["INBOX", "unsure", "ham_to_train", "spam"]
! # Key is UID.
! IMAP_MESSAGES = {101 : """Subject: Test\r\n\r\nBody test.""",
! 102 : """Subject: Test2\r\n\r\nAnother body test."""}
! # Map of ID -> UID
! IMAP_UIDS = {1 : 101, 2: 102}
! class TestListener(Dibbler.Listener):
! """Listener for TestIMAP4Server. Works on port 8143, to co-exist
! with real IMAP4 servers."""
! def __init__(self, socketMap=asyncore.socket_map):
! Dibbler.Listener.__init__(self, IMAP_PORT, TestIMAP4Server,
! (socketMap,), socketMap=socketMap)
!
!
! class TestIMAP4Server(Dibbler.BrighterAsyncChat):
! """Minimal IMAP4 server, for testing purposes. Accepts a limited
! subset of commands, and also a KILL command, to terminate."""
! def __init__(self, clientSocket, socketMap):
! # Grumble: asynchat.__init__ doesn't take a 'map' argument,
! # hence the two-stage construction.
! Dibbler.BrighterAsyncChat.__init__(self)
! Dibbler.BrighterAsyncChat.set_socket(self, clientSocket, socketMap)
! self.set_terminator('\r\n')
! # okCommands are just ignored (we pass back a happy this-was-fine
! # answer, and do nothing.
! self.okCommands = ['NOOP', 'LOGOUT', 'CAPABILITY', 'KILL']
! # These commands actually result in something.
! self.handlers = {'LIST' : self.onList,
! 'LOGIN' : self.onLogin,
! 'SELECT' : self.onSelect,
! 'FETCH' : self.onFetch,
! 'UID' : self.onUID,
! }
! self.push("* OK [CAPABILITY IMAP4REV1 AUTH=LOGIN] " \
! "localhost IMAP4rev1\r\n")
! self.request = ''
!
! def collect_incoming_data(self, data):
! """Asynchat override."""
! self.request = self.request + data
!
! def found_terminator(self):
! """Asynchat override."""
! id, command = self.request.split(None, 1)
! if ' ' in command:
! command, args = command.split(None, 1)
! else:
! args = ''
! command = command.upper()
! if command in self.okCommands:
! self.push("%s OK (we hope)\r\n" % (id,))
! if command == 'LOGOUT':
! self.close_when_done()
! if command == 'KILL':
! self.socket.shutdown(2)
! self.close()
! raise SystemExit()
! else:
! handler = self.handlers.get(command, self.onUnknown)
! self.push(handler(id, command, args, False)) # Or push_slowly for testing
! self.request = ''
!
! def push_slowly(self, response):
! """Useful for testing."""
! for c in response:
! self.push(c)
! time.sleep(0.02)
!
! def onLogin(self, id, command, args, uid=False):
! """Log in to server."""
! username, password = args.split(None, 1)
! username = username.strip('"')
! password = password.strip('"')
! if username == IMAP_USERNAME and password == IMAP_PASSWORD:
! return "%s OK [CAPABILITY IMAP4REV1] User %s " \
! "authenticated.\r\n" % (id, username)
! return "%s NO LOGIN failed\r\n" % (id,)
!
! def onList(self, id, command, args, uid=False):
! """Return list of folders."""
! base = '\r\n* LIST (\\NoInferiors \\UnMarked) "/" '
! return "%s%s\r\n%s OK LIST completed\r\n" % \
! (base[2:], base.join(IMAP_FOLDER_LIST), id)
!
! def onSelect(self, id, command, args, uid=False):
! exists = "* %d EXISTS" % (len(IMAP_MESSAGES),)
! recent = "* 0 RECENT"
! uidv = "* OK [UIDVALIDITY 1091599302] UID validity status"
! next_uid = "* OK [UIDNEXT 23] Predicted next UID"
! flags = "* FLAGS (\Answered \Flagged \Deleted \Draft \Seen)"
! perm_flags = "* OK [PERMANENTFLAGS (\* \Answered \Flagged " \
! "\Deleted \Draft \Seen)] Permanent flags"
! complete = "%s OK [READ-WRITE] SELECT completed" % (id,)
! return "%s\r\n" % ("\r\n".join([exists, recent, uidv, next_uid,
! flags, perm_flags, complete]),)
!
! def onFetch(self, id, command, args, uid=False):
! msg_nums, msg_parts = args.split(None, 1)
! msg_nums = msg_nums.split()
! response = {}
! for msg in msg_nums:
! response[msg] = []
! if "UID" in msg_parts:
! if uid:
! for msg in msg_nums:
! response[msg].append("FETCH (UID %s)" % (msg,))
! else:
! for msg in msg_nums:
! response[msg].append("FETCH (UID %s)" %
! (IMAP_UIDS[int(msg)]))
! if "BODY.PEEK[]" in msg_parts:
! for msg in msg_nums:
! if uid:
! msg_uid = int(msg)
! else:
! msg_uid = IMAP_UIDS[int(msg)]
! response[msg].append(("FETCH (BODY[] {%s}" %
! (len(IMAP_MESSAGES[msg_uid])),
! IMAP_MESSAGES[msg_uid]))
! for msg in msg_nums:
! try:
! simple = " ".join(response[msg])
! except TypeError:
! simple = []
! for part in response[msg]:
! if isinstance(part, types.StringTypes):
! simple.append(part)
! else:
! simple.append('%s\r\n%s)' % (part[0], part[1]))
! simple = " ".join(simple)
! response[msg] = "* %s %s" % (msg, simple)
! response_text = "\r\n".join(response.values())
! return "%s\r\n%s OK FETCH completed\r\n" % (response_text, id)
!
! def onUID(self, id, command, args, uid=False):
! actual_command, args = args.split(None, 1)
! handler = self.handlers.get(actual_command, self.onUnknown)
! return handler(id, command, args, uid=True)
!
! def onUnknown(self, id, command, args, uid=False):
! """Unknown IMAP4 command."""
! return "%s BAD Command unrecognised: %s\r\n" % (id, repr(command))
!
!
! class BaseIMAPFilterTest(unittest.TestCase):
def setUp(self):
! self.imap = IMAPSession("localhost", IMAP_PORT)
def tearDown(self):
***************
*** 35,38 ****
--- 180,185 ----
pass
+
+ class IMAPSessionTest(BaseIMAPFilterTest):
def testGoodLogin(self):
self.imap.login(IMAP_USERNAME, IMAP_PASSWORD)
***************
*** 81,85 ****
folders = self.imap.folder_list()
! self.assertEqual(folders, IMAP_FOLDER_LIST)
def test_extract_fetch_data(self):
--- 228,234 ----
folders = self.imap.folder_list()
! correct = IMAP_FOLDER_LIST[:]
! correct.sort()
! self.assertEqual(folders, correct)
def test_extract_fetch_data(self):
***************
*** 127,141 ****
! class IMAPMessageTest(unittest.TestCase):
def setUp(self):
! imap = IMAPSession(IMAP_SERVER, IMAP_PORT)
self.msg = IMAPMessage()
! self.msg.imap_server = imap
!
! def tearDown(self):
! try:
! self.msg.imap_server.logout()
! except imaplib.error:
! pass
# These tests might fail if more than one second passes
--- 276,284 ----
! class IMAPMessageTest(BaseIMAPFilterTest):
def setUp(self):
! BaseIMAPFilterTest.setUp(self)
self.msg = IMAPMessage()
! self.msg.imap_server = self.imap
# These tests might fail if more than one second passes
***************
*** 210,212 ****
--- 353,359 ----
if __name__=='__main__':
+ def runTestServer():
+ TestListener()
+ asyncore.loop()
+ thread.start_new_thread(runTestServer, ())
sb_test_support.unittest_main(argv=sys.argv + ['suite'])
More information about the Spambayes-checkins
mailing list