[Spambayes-checkins] spambayes/contrib sb_culler.py,1.2,1.3

Tony Meyer anadelonbrin at users.sourceforge.net
Wed Oct 27 04:36:27 CEST 2004


Update of /cvsroot/spambayes/spambayes/contrib
In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv16272/contrib

Modified Files:
	sb_culler.py 
Log Message:
Update with the changes that Andrew Dalke posted to c.l.p today.

Weed out duplicates.
Add SpamAssassin header checking.
Add whitelisting of delivered-to header.
Add ability to continue after KeyboardInterupt
Only restart network after 21 errors, not 1.
During delay, let user quit, immediately refilter, or delay for a given time.
Print a little indicator while filtering.
Make logging subject able to recover from parsing errors.
Reload whitelist on demand.

Index: sb_culler.py
===================================================================
RCS file: /cvsroot/spambayes/spambayes/contrib/sb_culler.py,v
retrieving revision 1.2
retrieving revision 1.3
diff -C2 -d -r1.2 -r1.3
*** sb_culler.py	27 Oct 2004 02:25:07 -0000	1.2
--- sb_culler.py	27 Oct 2004 02:36:25 -0000	1.3
***************
*** 19,24 ****
--- 19,29 ----
  be done by editing the code.
  
+ The virus identification and POP3 manipulation code is based on Kevin
+ Altis' virus killer code, which I've been gratefully using for the
+ last several months.
+ 
  Written by Andrew Dalke, November 2003.
  Released into the public domain on 2003/11/22.
+ Updated 2004/10/26
    == NO copyright protection asserted for this code.  Share and enjoy! ==
  
***************
*** 26,35 ****
  """
  
! import sets, traceback
  import poplib
  import posixpath
! from email import Header
  from spambayes import mboxutils, hammie
  
  DO_ACTIONS = 1
  VERBOSE_LEVEL = 1
--- 31,43 ----
  """
  
! import sets, traceback, md5, os
  import poplib
  import posixpath
! from email import Header, Utils
  from spambayes import mboxutils, hammie
  
+ import socket
+ socket.setdefaulttimeout(10)
+ 
  DO_ACTIONS = 1
  VERBOSE_LEVEL = 1
***************
*** 113,128 ****
          
  
  class WhiteListFrom:
      """Test: Read a list of email addresses to use a 'from' whitelist"""
      def __init__(self, filename):
          lines = [line.strip().lower() for line in
!                            open(filename).readlines()]
          self.addresses = sets.Set(lines)
          
      def __call__(self, mi, log):
          frm = mi.msg["from"]
          status = (frm is not None) and (frm.lower() in self.addresses)
          if status:
!             log.pass_test("'from' white list")
              return "it is in 'from' white list"
          return False
--- 121,186 ----
          
  
+ class Duplicate:
+     def __init__(self):
+         self.unique = {}
+     def __call__(self, mi, log):
+         digest = md5.md5(mi.text).digest()
+         if digest in self.unique:
+             log.pass_test(SPAM)
+             return "duplicate"
+         self.unique[digest] = 1
+         return False
+ 
+ class IllegalDeliveredTo:
+     def __init__(self, names):
+         self.names = names
+     def __call__(self, mi, log):
+         fields = mi.msg.get_all("Delivered-To")
+         if fields is None:
+             return False
+         
+         for field in fields:
+             field = field.lower()
+             for name in self.names:
+                 if name in field:
+                     return False
+         log.pass_test(SPAM)
+         return "sent to random email"
+ 
+ class SpamAssassin:
+     def __init__(self, level = 8):
+         self.level = level
+     def __call__(self, mi, log):
+         if ("*" * self.level) in mi.msg.get("X-Spam-Status", ""):
+             log.pass_test(SPAM)
+             return "assassinated!"
+         return False
+ 
  class WhiteListFrom:
      """Test: Read a list of email addresses to use a 'from' whitelist"""
      def __init__(self, filename):
+         self.filename = filename
+         self._mtime = 0
+         self._load_if_needed()
+ 
+     def _load(self):
          lines = [line.strip().lower() for line in
!                            open(self.filename).readlines()]
          self.addresses = sets.Set(lines)
+ 
+     def _load_if_needed(self):
+         mtime = os.path.getmtime(self.filename)
+         if mtime != self._mtime:
+             print "Reloading", self.filename
+             self._mtime = mtime
+             self._load()
          
      def __call__(self, mi, log):
+         self._load_if_needed()
          frm = mi.msg["from"]
+         realname, frm = Utils.parseaddr(frm)
          status = (frm is not None) and (frm.lower() in self.addresses)
          if status:
!             log.pass_test(SPAM)
              return "it is in 'from' white list"
          return False
***************
*** 212,216 ****
  def _log_subject(mi, log):
      encoded_subject = mi.msg.get('subject')
!     subject, encoding = Header.decode_header(encoded_subject)[0]
      if encoding is None or encoding == 'iso-8859-1':
          s = subject
--- 270,278 ----
  def _log_subject(mi, log):
      encoded_subject = mi.msg.get('subject')
!     try:
!         subject, encoding = Header.decode_header(encoded_subject)[0]
!     except Header.HeaderParseError:
!         log.info("%s Subject cannot be parsed" % (mi.i,))
!         return
      if encoding is None or encoding == 'iso-8859-1':
          s = subject
***************
*** 230,233 ****
--- 292,297 ----
  
          for i in range(1, count+1):
+             if (i-1) % 10 == 0:
+                 print " == %d/%d ==" % (i, count)
              # Kevin's code used -1, but -1 doesn't work for one of
              # my POP accounts, while a million does.
***************
*** 298,303 ****
      try:
          # Note this this example uses the default password.  YMMV.
!         urllib.urlopen("http://:admin@192.168.1.1/Gozila.cgi?pppoeAct=2")
!         urllib.urlopen("http://:admin@192.168.1.1/Gozila.cgi?pppoeAct=1")
      except KeyboardInterrupt:
          raise
--- 362,367 ----
      try:
          # Note this this example uses the default password.  YMMV.
!         urllib.urlopen("http://:admin@192.168.1.1/Gozila.cgi?pppoeAct=2").read()
!         urllib.urlopen("http://:admin@192.168.1.1/Gozila.cgi?pppoeAct=1").read()
      except KeyboardInterrupt:
          raise
***************
*** 329,343 ****
      filters = Filters()
  
      # A list of everyone who has emailed me this year.
      # Keep their messages on the server.
      filters.add(WhiteListFrom("good_emails.txt"), KEEP)
  
!     # My mailing lists.  Edited to make it slightly harder
!     # for spammers to read this description and figure
!     # out how to spam me.
!     filters.add(WhiteListSubstrings("subject",
!                   ['[Twisted]', 'CompChem:', '[Bioperl]',
!                    '[BioPy]', '[SALSA CLUB]', '[Open-bio]',
!                    '[StarshipCrew]']), KEEP)
  
      # Get rid of anything which smells like an exectuable.
--- 393,427 ----
      filters = Filters()
  
+     duplicate = Duplicate()
+     filters.add(duplicate, AppendFile("spam2.mbox"))
+ 
      # A list of everyone who has emailed me this year.
      # Keep their messages on the server.
      filters.add(WhiteListFrom("good_emails.txt"), KEEP)
  
!     # My mailing lists.
!     filters.add(WhiteListSubstrings("subject", [
!                    'ABCD:',
!                    '[Python-announce]',
!                    '[Python]',
!                    '[Bioinfo]',
!                    '[EuroPython]',
!                    ]),
!                 KEEP)
! 
!     filters.add(WhiteListSubstrings("to", [
!         "president at whitehouse.gov",
!         "ceo at big.com",
!         ]),
!                 KEEP)
! 
!     names = ["john", "", "jon", "johnathan"]
!     valid_emails = ([name + "@lectroid.com" for name in names] +
!                     [name + "@bigboote.org" for name in names] +
!                     ["buckeroo.bonzai at aol.earth"])
! 
!     filters.add(IllegalDeliveredTo(valid_emails), DELETE)
!     filters.add(SpamAssassin(), AppendFile("spam2.mbox"))
!     
  
      # Get rid of anything which smells like an exectuable.
***************
*** 349,356 ****
      filters.add(IsSpam(h, 0.90), AppendFile("spam.mbox"))
  
!     # These are my POP3 accounts.  (or not ;)
      server_configs = [("mail.example.com",
!                           "dalke", "password"),
!                       ("mail2.spam.com", "dalke", "1234"), ]
  
      # The main culling loop.
--- 433,441 ----
      filters.add(IsSpam(h, 0.90), AppendFile("spam.mbox"))
  
!     # These are my POP3 accounts.
      server_configs = [("mail.example.com",
!                           "user at example.com", "password"),
!                       ("popserver.big.com", "ceo", "12345"), ]
! 
  
      # The main culling loop.
***************
*** 361,367 ****
--- 446,455 ----
      while 1:
          error_flag = False
+         duplicate.unique.clear()  # Hack!
          for server, user, pwd in server_configs:
              try:
                  log = filter_server( (server, user, pwd), filters)
+             except KeyboardInterrupt:
+                 raw_input("Press enter to continue. ")
              except StandardError:
                  raise
***************
*** 406,416 ****
              error_count += 1
  
!         if error_count > 20:
              restart_network()
              error_count = 0
  
!         wait(3*60)
!                 
!     
  
  if __name__ == "__main__":
--- 494,520 ----
              error_count += 1
  
!         if error_count > 0:
              restart_network()
              error_count = 0
  
!         delay = 10 * 60
!         while delay:
!             try:
!                 wait(delay)
!                 break
!             except KeyboardInterrupt:
!                 print
!                 while 1:
!                     cmd = raw_input("enter, delay, or quit? ")
!                     if cmd in ("q", "quit"):
!                         raise SystemExit(0)
!                     elif cmd == "":
!                         delay = 0
!                         break
!                     elif cmd.isdigit():
!                         delay = int(cmd)
!                         break
!                     else:
!                         print "Unknown command."
  
  if __name__ == "__main__":



More information about the Spambayes-checkins mailing list