[Spambayes-checkins] spambayes/spambayes/test test_sb_imapfilter.py, 1.3, 1.4

Tony Meyer anadelonbrin at users.sourceforge.net
Thu Oct 14 06:01:20 CEST 2004


Update of /cvsroot/spambayes/spambayes/spambayes/test
In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv7854/spambayes/test

Modified Files:
	test_sb_imapfilter.py 
Log Message:
Add some more limited capability to the fake server: store, append, search, fetch
 rfc822.header and fetch flags internaldate.

Finish off most of the IMAPFolder tests (test_iter, test_keys, test_getitem), and
 do the setup for IMAPFilterTest.

Index: test_sb_imapfilter.py
===================================================================
RCS file: /cvsroot/spambayes/spambayes/spambayes/test/test_sb_imapfilter.py,v
retrieving revision 1.3
retrieving revision 1.4
diff -C2 -d -r1.3 -r1.4
*** test_sb_imapfilter.py	13 Oct 2004 05:57:41 -0000	1.3
--- test_sb_imapfilter.py	14 Oct 2004 04:01:10 -0000	1.4
***************
*** 15,31 ****
  from spambayes import Dibbler
  from spambayes.Options import options
  from sb_imapfilter import BadIMAPResponseError
! from sb_imapfilter import IMAPSession, IMAPMessage, IMAPFolder
  
  IMAP_PORT = 8143
  IMAP_USERNAME = "testu"
  IMAP_PASSWORD = "testp"
! IMAP_FOLDER_LIST = ["INBOX", "unsure", "ham_to_train", "spam"]
  # Key is UID.
! IMAP_MESSAGES = {101 : """Subject: Test\r\n\r\nBody test.""",
!                  102 : """Subject: Test2\r\n\r\nAnother body test.""",
!                  # 103 is taken from Anthony's email torture test
!                  # (the test_zero-length-boundary file).
!                  103 : """Received: from noisy-2-82-67-182-141.fbx.proxad.net(82.67.182.141)
   via SMTP by mx1.example.com, id smtpdAAAzMayUR; Tue Apr 27 18:56:48 2004
  Return-Path: " Freeman" <XLUPSYGSHLBAPN at runbox.com>
--- 15,50 ----
  from spambayes import Dibbler
  from spambayes.Options import options
+ from spambayes.classifier import Classifer
  from sb_imapfilter import BadIMAPResponseError
! from spambayes.message import message_from_string
! from sb_imapfilter import IMAPSession, IMAPMessage, IMAPFolder, IMAPFilter
  
  IMAP_PORT = 8143
  IMAP_USERNAME = "testu"
  IMAP_PASSWORD = "testp"
! IMAP_FOLDER_LIST = ["INBOX", "unsure", "ham_to_train", "spam",
!                     "spam_to_train"]
! # Must be different.
! SB_ID_1 = "test at spambayes.invalid"
! SB_ID_2 = "14102004"
  # Key is UID.
! IMAP_MESSAGES = {
!     # 101 should be valid and have a MessageID header, but no
!     # X-Spambayes-MessageID header.
!     101 : """Subject: Test\r
! Message-ID: <%s>\r
! \r
! Body test.""" % (SB_ID_1,),
!     # 102 should be valid and have both a MessageID header and a
!     # X-Spambayes-MessageID header.
!     102 : """Subject: Test2\r
! Message-ID: <%s>\r
! %s: %s\r
! \r
! Another body test.""" % (SB_ID_1, options["Headers", "mailid_header_name"],
!                          SB_ID_2),
!     # 103 is taken from Anthony's email torture test (the
!     # test_zero-length-boundary file).
!     103 : """Received: from noisy-2-82-67-182-141.fbx.proxad.net(82.67.182.141)
   via SMTP by mx1.example.com, id smtpdAAAzMayUR; Tue Apr 27 18:56:48 2004
  Return-Path: " Freeman" <XLUPSYGSHLBAPN at runbox.com>
***************
*** 57,63 ****
  
  """,
!                  }
  # Map of ID -> UID
! IMAP_UIDS = {1 : 101, 2: 102, 3:103}
  
  class TestListener(Dibbler.Listener):
--- 76,90 ----
  
  """,
!     # 104 should be valid and have neither a MessageID header nor a
!     # X-Spambayes-MessageID header.
!     104 : """Subject: Test2\r
! \r
! Yet another body test.""",
!     }
  # Map of ID -> UID
! IMAP_UIDS = {1 : 101, 2: 102, 3:103, 4:104}
! 
! # Messages that are UNDELETED
! UNDELETED_IDS = (1,2)
  
  class TestListener(Dibbler.Listener):
***************
*** 88,104 ****
                           'SELECT' : self.onSelect,
                           'FETCH' : self.onFetch,
                           'UID' : self.onUID,
                           }
          self.push("* OK [CAPABILITY IMAP4REV1 AUTH=LOGIN] " \
                    "localhost IMAP4rev1\r\n")
          self.request = ''
  
      def collect_incoming_data(self, data):
          """Asynchat override."""
!         self.request = self.request + data
  
      def found_terminator(self):
          """Asynchat override."""
          global FAIL_NEXT
          id, command = self.request.split(None, 1)
  
--- 115,149 ----
                           'SELECT' : self.onSelect,
                           'FETCH' : self.onFetch,
+                          'SEARCH' : self.onSearch,
                           'UID' : self.onUID,
+                          'APPEND' : self.onAppend,
+                          'STORE' : self.onStore,
                           }
          self.push("* OK [CAPABILITY IMAP4REV1 AUTH=LOGIN] " \
                    "localhost IMAP4rev1\r\n")
          self.request = ''
+         self.next_id = 0
+         self.in_literal = (0, None)
  
      def collect_incoming_data(self, data):
          """Asynchat override."""
!         if self.in_literal[0] > 0:
!             # Also add the line breaks.
!             self.request = "%s\r\n%s" % (self.request, data)
!         else:
!             self.request = self.request + data
  
      def found_terminator(self):
          """Asynchat override."""
          global FAIL_NEXT
+ 
+         if self.in_literal[0] > 0:
+             if len(self.request) >= self.in_literal[0]:
+                 self.push(self.in_literal[1](self.request,
+                                              *self.in_literal[2]))
+                 self.in_literal = (0, None)
+                 self.request = ''
+             return
+         
          id, command = self.request.split(None, 1)
  
***************
*** 147,150 ****
--- 192,199 ----
                 (base[2:], base.join(IMAP_FOLDER_LIST), id)
  
+     def onStore(self, id, command, args, uid=False):
+         # We ignore flags.
+         return "%s OK STORE completed\r\n" % (id,)
+ 
      def onSelect(self, id, command, args, uid=False):
          exists = "* %d EXISTS" % (len(IMAP_MESSAGES),)
***************
*** 159,162 ****
--- 208,256 ----
                                          flags, perm_flags, complete]),)
  
+     def onAppend(self, id, command, args, uid=False):
+         # Only stores for this session.
+         folder, args = args.split(None, 1)
+         # We ignore the folder.
+         if ')' in args:
+             flags, args = args.split(')', 1)
+             flags = flags[1:]
+             # We ignore the flags.
+         unused, date, args = args.split('"', 2)
+         # We ignore the date.
+         if '{' in args:
+             # A literal.
+             size = int(args[2:-1])
+             self.in_literal = (size, self.appendLiteral, (id,))
+             return "+ Ready for argument\r\n"
+         # Strip off the space at the front.
+         return self.appendLiteral(args[1:], id)
+ 
+     def appendLiteral(self, message, command_id):
+         while True:
+             id = self.next_id
+             self.next_id += 1
+             if id not in IMAP_MESSAGES:
+                 break
+         IMAP_MESSAGES[id] = message
+         return "* APPEND %s\r\n%s OK APPEND succeeded\r\n" % \
+                (id, command_id)
+ 
+     def onSearch(self, id, command, args, uid=False):
+         args = args.upper()
+         results = ()
+         if "UNDELETED" in args:
+             for msg_id in UNDELETED_IDS:
+                 if uid:
+                     results += (IMAP_UIDS[msg_id],)
+                 else:
+                     results += (msg_id,)
+         if uid:
+             command_string = "UID " + command
+         else:
+             command_string = command
+         return "%s\r\n%s OK %s completed\r\n" % \
+                ("* SEARCH " + ' '.join([str(r) for r in results]), id,
+                 command_string)
+ 
      def onFetch(self, id, command, args, uid=False):
          msg_nums, msg_parts = args.split(None, 1)
***************
*** 182,185 ****
--- 276,295 ----
                                       (len(IMAP_MESSAGES[msg_uid])),
                                       IMAP_MESSAGES[msg_uid]))
+         if "RFC822.HEADER" in msg_parts:
+             for msg in msg_nums:
+                 if uid:
+                     msg_uid = int(msg)
+                 else:
+                     msg_uid = IMAP_UIDS[int(msg)]
+                 msg_text = IMAP_MESSAGES[msg_uid]
+                 headers, unused = msg_text.split('\r\n\r\n', 1)
+                 response[msg].append(("FETCH (RFC822.HEADER {%s}" %
+                                       (len(headers),), headers))
+         if "FLAGS INTERNALDATE" in msg_parts:
+             # We make up flags & dates.
+             for msg in msg_nums:
+                 response[msg].append('FETCH (FLAGS (\Seen \Deleted) '
+                                      'INTERNALDATE "27-Jul-2004 13:1'
+                                      '1:56 +1200')
          for msg in msg_nums:
              try:
***************
*** 200,204 ****
          actual_command, args = args.split(None, 1)
          handler = self.handlers.get(actual_command, self.onUnknown)
!         return handler(id, command, args, uid=True)
  
      def onUnknown(self, id, command, args, uid=False):
--- 310,314 ----
          actual_command, args = args.split(None, 1)
          handler = self.handlers.get(actual_command, self.onUnknown)
!         return handler(id, actual_command, args, uid=True)
  
      def onUnknown(self, id, command, args, uid=False):
***************
*** 421,424 ****
--- 531,535 ----
      def setUp(self):
          BaseIMAPFilterTest.setUp(self)
+         self.imap.login(IMAP_USERNAME, IMAP_PASSWORD)
          self.folder = IMAPFolder("testfolder", self.imap)
  
***************
*** 430,441 ****
          
      def test_iter(self):
!         # XXX To-do
!         pass
      def test_keys(self):
!         # XXX To-do
!         pass
!     def test_getitem(self):
!         # XXX To-do
!         pass
  
      def test_generate_id(self):
--- 541,591 ----
          
      def test_iter(self):
!         keys = self.folder.keys()
!         for msg in self.folder:
!             msg = msg.get_full_message()
!             msg_correct = message_from_string(IMAP_MESSAGES[int(keys[0])])
!             id_header_name = options["Headers", "mailid_header_name"]
!             if msg_correct[id_header_name] is None:
!                 msg_correct[id_header_name] = msg.id
!             self.assertEqual(msg.as_string(), msg_correct.as_string())
!             keys = keys[1:]
! 
      def test_keys(self):
!         keys = self.folder.keys()
!         # We get back UIDs, not IDs, so convert to check.
!         correct_keys = [str(IMAP_UIDS[id]) for id in UNDELETED_IDS]
!         self.assertEqual(keys, correct_keys)
! 
!     def test_getitem_new_style(self):
!         # 101 already has a suitable (new style) id, so it should
!         # not be recreated.
!         id_header_name = options["Headers", "mailid_header_name"]
!         msg1 = self.folder[101]
!         self.assertEqual(msg1.id, SB_ID_1)
!         msg1 = msg1.get_full_message()
!         msg1_correct = message_from_string(IMAP_MESSAGES[101])
!         self.assertNotEqual(msg1[id_header_name], None)
!         msg1_correct[id_header_name] = SB_ID_1
!         self.assertEqual(msg1.as_string(), msg1_correct.as_string())
! 
!     def test_getitem_old_style(self):
!         # 102 already has a suitable (old style) id, so it should
!         # not be recreated.  We should be sure to use the old id,
!         # rather than the new one, too, for backwards compatibility.
!         id_header_name = options["Headers", "mailid_header_name"]
!         msg2 = self.folder[102]
!         self.assertEqual(msg2.id, SB_ID_2)
!         msg2 = msg2.get_full_message()
!         self.assertNotEqual(msg2[id_header_name], None)
!         self.assertEqual(msg2.as_string(), IMAP_MESSAGES[102])
! 
!     def test_getitem_new_id(self):
!         # 104 doesn't have an id, so should be recreated with one.
!         id_header_name = options["Headers", "mailid_header_name"]
!         msg3 = self.folder[104]
!         self.assertNotEqual(msg3[id_header_name], None)
!         msg_correct = message_from_string(IMAP_MESSAGES[104])
!         msg_correct[id_header_name] = msg3.id
!         self.assertEqual(msg3.as_string(), msg_correct.as_string())
  
      def test_generate_id(self):
***************
*** 463,466 ****
--- 613,624 ----
  
  class IMAPFilterTest(BaseIMAPFilterTest):
+     def setUp(self):
+         BaseIMAPFilterTest.setUp(self)
+         self.imap.login(IMAP_USERNAME, IMAP_PASSWORD)
+         classifier = Classifier()
+         self.filter = IMAPFilter(classifier)
+         options["imap", "ham_train_folders"] = ("ham_to_train",)
+         options["imap", "spam_train_folders"] = ("spam_to_train",)
+ 
      def test_Train(self):
          # XXX To-do



More information about the Spambayes-checkins mailing list