[Spambayes-checkins]
spambayes/spambayes/test test_sb_imapfilter.py, 1.3, 1.4
Tony Meyer
anadelonbrin at users.sourceforge.net
Thu Oct 14 06:01:20 CEST 2004
Update of /cvsroot/spambayes/spambayes/spambayes/test
In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv7854/spambayes/test
Modified Files:
test_sb_imapfilter.py
Log Message:
Add some more limited capability to the fake server: store, append, search, fetch
rfc822.header and fetch flags internaldate.
Finish off most of the IMAPFolder tests (test_iter, test_keys, test_getitem), and
do the setup for IMAPFilterTest.
Index: test_sb_imapfilter.py
===================================================================
RCS file: /cvsroot/spambayes/spambayes/spambayes/test/test_sb_imapfilter.py,v
retrieving revision 1.3
retrieving revision 1.4
diff -C2 -d -r1.3 -r1.4
*** test_sb_imapfilter.py 13 Oct 2004 05:57:41 -0000 1.3
--- test_sb_imapfilter.py 14 Oct 2004 04:01:10 -0000 1.4
***************
*** 15,31 ****
from spambayes import Dibbler
from spambayes.Options import options
from sb_imapfilter import BadIMAPResponseError
! from sb_imapfilter import IMAPSession, IMAPMessage, IMAPFolder
IMAP_PORT = 8143
IMAP_USERNAME = "testu"
IMAP_PASSWORD = "testp"
! IMAP_FOLDER_LIST = ["INBOX", "unsure", "ham_to_train", "spam"]
# Key is UID.
! IMAP_MESSAGES = {101 : """Subject: Test\r\n\r\nBody test.""",
! 102 : """Subject: Test2\r\n\r\nAnother body test.""",
! # 103 is taken from Anthony's email torture test
! # (the test_zero-length-boundary file).
! 103 : """Received: from noisy-2-82-67-182-141.fbx.proxad.net(82.67.182.141)
via SMTP by mx1.example.com, id smtpdAAAzMayUR; Tue Apr 27 18:56:48 2004
Return-Path: " Freeman" <XLUPSYGSHLBAPN at runbox.com>
--- 15,50 ----
from spambayes import Dibbler
from spambayes.Options import options
+ from spambayes.classifier import Classifer
from sb_imapfilter import BadIMAPResponseError
! from spambayes.message import message_from_string
! from sb_imapfilter import IMAPSession, IMAPMessage, IMAPFolder, IMAPFilter
IMAP_PORT = 8143
IMAP_USERNAME = "testu"
IMAP_PASSWORD = "testp"
! IMAP_FOLDER_LIST = ["INBOX", "unsure", "ham_to_train", "spam",
! "spam_to_train"]
! # Must be different.
! SB_ID_1 = "test at spambayes.invalid"
! SB_ID_2 = "14102004"
# Key is UID.
! IMAP_MESSAGES = {
! # 101 should be valid and have a MessageID header, but no
! # X-Spambayes-MessageID header.
! 101 : """Subject: Test\r
! Message-ID: <%s>\r
! \r
! Body test.""" % (SB_ID_1,),
! # 102 should be valid and have both a MessageID header and a
! # X-Spambayes-MessageID header.
! 102 : """Subject: Test2\r
! Message-ID: <%s>\r
! %s: %s\r
! \r
! Another body test.""" % (SB_ID_1, options["Headers", "mailid_header_name"],
! SB_ID_2),
! # 103 is taken from Anthony's email torture test (the
! # test_zero-length-boundary file).
! 103 : """Received: from noisy-2-82-67-182-141.fbx.proxad.net(82.67.182.141)
via SMTP by mx1.example.com, id smtpdAAAzMayUR; Tue Apr 27 18:56:48 2004
Return-Path: " Freeman" <XLUPSYGSHLBAPN at runbox.com>
***************
*** 57,63 ****
""",
! }
# Map of ID -> UID
! IMAP_UIDS = {1 : 101, 2: 102, 3:103}
class TestListener(Dibbler.Listener):
--- 76,90 ----
""",
! # 104 should be valid and have neither a MessageID header nor a
! # X-Spambayes-MessageID header.
! 104 : """Subject: Test2\r
! \r
! Yet another body test.""",
! }
# Map of ID -> UID
! IMAP_UIDS = {1 : 101, 2: 102, 3:103, 4:104}
!
! # Messages that are UNDELETED
! UNDELETED_IDS = (1,2)
class TestListener(Dibbler.Listener):
***************
*** 88,104 ****
'SELECT' : self.onSelect,
'FETCH' : self.onFetch,
'UID' : self.onUID,
}
self.push("* OK [CAPABILITY IMAP4REV1 AUTH=LOGIN] " \
"localhost IMAP4rev1\r\n")
self.request = ''
def collect_incoming_data(self, data):
"""Asynchat override."""
! self.request = self.request + data
def found_terminator(self):
"""Asynchat override."""
global FAIL_NEXT
id, command = self.request.split(None, 1)
--- 115,149 ----
'SELECT' : self.onSelect,
'FETCH' : self.onFetch,
+ 'SEARCH' : self.onSearch,
'UID' : self.onUID,
+ 'APPEND' : self.onAppend,
+ 'STORE' : self.onStore,
}
self.push("* OK [CAPABILITY IMAP4REV1 AUTH=LOGIN] " \
"localhost IMAP4rev1\r\n")
self.request = ''
+ self.next_id = 0
+ self.in_literal = (0, None)
def collect_incoming_data(self, data):
"""Asynchat override."""
! if self.in_literal[0] > 0:
! # Also add the line breaks.
! self.request = "%s\r\n%s" % (self.request, data)
! else:
! self.request = self.request + data
def found_terminator(self):
"""Asynchat override."""
global FAIL_NEXT
+
+ if self.in_literal[0] > 0:
+ if len(self.request) >= self.in_literal[0]:
+ self.push(self.in_literal[1](self.request,
+ *self.in_literal[2]))
+ self.in_literal = (0, None)
+ self.request = ''
+ return
+
id, command = self.request.split(None, 1)
***************
*** 147,150 ****
--- 192,199 ----
(base[2:], base.join(IMAP_FOLDER_LIST), id)
+ def onStore(self, id, command, args, uid=False):
+ # We ignore flags.
+ return "%s OK STORE completed\r\n" % (id,)
+
def onSelect(self, id, command, args, uid=False):
exists = "* %d EXISTS" % (len(IMAP_MESSAGES),)
***************
*** 159,162 ****
--- 208,256 ----
flags, perm_flags, complete]),)
+ def onAppend(self, id, command, args, uid=False):
+ # Only stores for this session.
+ folder, args = args.split(None, 1)
+ # We ignore the folder.
+ if ')' in args:
+ flags, args = args.split(')', 1)
+ flags = flags[1:]
+ # We ignore the flags.
+ unused, date, args = args.split('"', 2)
+ # We ignore the date.
+ if '{' in args:
+ # A literal.
+ size = int(args[2:-1])
+ self.in_literal = (size, self.appendLiteral, (id,))
+ return "+ Ready for argument\r\n"
+ # Strip off the space at the front.
+ return self.appendLiteral(args[1:], id)
+
+ def appendLiteral(self, message, command_id):
+ while True:
+ id = self.next_id
+ self.next_id += 1
+ if id not in IMAP_MESSAGES:
+ break
+ IMAP_MESSAGES[id] = message
+ return "* APPEND %s\r\n%s OK APPEND succeeded\r\n" % \
+ (id, command_id)
+
+ def onSearch(self, id, command, args, uid=False):
+ args = args.upper()
+ results = ()
+ if "UNDELETED" in args:
+ for msg_id in UNDELETED_IDS:
+ if uid:
+ results += (IMAP_UIDS[msg_id],)
+ else:
+ results += (msg_id,)
+ if uid:
+ command_string = "UID " + command
+ else:
+ command_string = command
+ return "%s\r\n%s OK %s completed\r\n" % \
+ ("* SEARCH " + ' '.join([str(r) for r in results]), id,
+ command_string)
+
def onFetch(self, id, command, args, uid=False):
msg_nums, msg_parts = args.split(None, 1)
***************
*** 182,185 ****
--- 276,295 ----
(len(IMAP_MESSAGES[msg_uid])),
IMAP_MESSAGES[msg_uid]))
+ if "RFC822.HEADER" in msg_parts:
+ for msg in msg_nums:
+ if uid:
+ msg_uid = int(msg)
+ else:
+ msg_uid = IMAP_UIDS[int(msg)]
+ msg_text = IMAP_MESSAGES[msg_uid]
+ headers, unused = msg_text.split('\r\n\r\n', 1)
+ response[msg].append(("FETCH (RFC822.HEADER {%s}" %
+ (len(headers),), headers))
+ if "FLAGS INTERNALDATE" in msg_parts:
+ # We make up flags & dates.
+ for msg in msg_nums:
+ response[msg].append('FETCH (FLAGS (\Seen \Deleted) '
+ 'INTERNALDATE "27-Jul-2004 13:1'
+ '1:56 +1200')
for msg in msg_nums:
try:
***************
*** 200,204 ****
actual_command, args = args.split(None, 1)
handler = self.handlers.get(actual_command, self.onUnknown)
! return handler(id, command, args, uid=True)
def onUnknown(self, id, command, args, uid=False):
--- 310,314 ----
actual_command, args = args.split(None, 1)
handler = self.handlers.get(actual_command, self.onUnknown)
! return handler(id, actual_command, args, uid=True)
def onUnknown(self, id, command, args, uid=False):
***************
*** 421,424 ****
--- 531,535 ----
def setUp(self):
BaseIMAPFilterTest.setUp(self)
+ self.imap.login(IMAP_USERNAME, IMAP_PASSWORD)
self.folder = IMAPFolder("testfolder", self.imap)
***************
*** 430,441 ****
def test_iter(self):
! # XXX To-do
! pass
def test_keys(self):
! # XXX To-do
! pass
! def test_getitem(self):
! # XXX To-do
! pass
def test_generate_id(self):
--- 541,591 ----
def test_iter(self):
! keys = self.folder.keys()
! for msg in self.folder:
! msg = msg.get_full_message()
! msg_correct = message_from_string(IMAP_MESSAGES[int(keys[0])])
! id_header_name = options["Headers", "mailid_header_name"]
! if msg_correct[id_header_name] is None:
! msg_correct[id_header_name] = msg.id
! self.assertEqual(msg.as_string(), msg_correct.as_string())
! keys = keys[1:]
!
def test_keys(self):
! keys = self.folder.keys()
! # We get back UIDs, not IDs, so convert to check.
! correct_keys = [str(IMAP_UIDS[id]) for id in UNDELETED_IDS]
! self.assertEqual(keys, correct_keys)
!
! def test_getitem_new_style(self):
! # 101 already has a suitable (new style) id, so it should
! # not be recreated.
! id_header_name = options["Headers", "mailid_header_name"]
! msg1 = self.folder[101]
! self.assertEqual(msg1.id, SB_ID_1)
! msg1 = msg1.get_full_message()
! msg1_correct = message_from_string(IMAP_MESSAGES[101])
! self.assertNotEqual(msg1[id_header_name], None)
! msg1_correct[id_header_name] = SB_ID_1
! self.assertEqual(msg1.as_string(), msg1_correct.as_string())
!
! def test_getitem_old_style(self):
! # 102 already has a suitable (old style) id, so it should
! # not be recreated. We should be sure to use the old id,
! # rather than the new one, too, for backwards compatibility.
! id_header_name = options["Headers", "mailid_header_name"]
! msg2 = self.folder[102]
! self.assertEqual(msg2.id, SB_ID_2)
! msg2 = msg2.get_full_message()
! self.assertNotEqual(msg2[id_header_name], None)
! self.assertEqual(msg2.as_string(), IMAP_MESSAGES[102])
!
! def test_getitem_new_id(self):
! # 104 doesn't have an id, so should be recreated with one.
! id_header_name = options["Headers", "mailid_header_name"]
! msg3 = self.folder[104]
! self.assertNotEqual(msg3[id_header_name], None)
! msg_correct = message_from_string(IMAP_MESSAGES[104])
! msg_correct[id_header_name] = msg3.id
! self.assertEqual(msg3.as_string(), msg_correct.as_string())
def test_generate_id(self):
***************
*** 463,466 ****
--- 613,624 ----
class IMAPFilterTest(BaseIMAPFilterTest):
+ def setUp(self):
+ BaseIMAPFilterTest.setUp(self)
+ self.imap.login(IMAP_USERNAME, IMAP_PASSWORD)
+ classifier = Classifier()
+ self.filter = IMAPFilter(classifier)
+ options["imap", "ham_train_folders"] = ("ham_to_train",)
+ options["imap", "spam_train_folders"] = ("spam_to_train",)
+
def test_Train(self):
# XXX To-do
More information about the Spambayes-checkins
mailing list