[Spambayes-checkins] spambayes/contrib sb_culler.py,1.2,1.3
Tony Meyer
anadelonbrin at users.sourceforge.net
Wed Oct 27 04:36:27 CEST 2004
Update of /cvsroot/spambayes/spambayes/contrib
In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv16272/contrib
Modified Files:
sb_culler.py
Log Message:
Update with the changes that Andrew Dalke posted to c.l.p today.
Weed out duplicates.
Add SpamAssassin header checking.
Add whitelisting of delivered-to header.
Add ability to continue after KeyboardInterupt
Only restart network after 21 errors, not 1.
During delay, let user quit, immediately refilter, or delay for a given time.
Print a little indicator while filtering.
Make logging subject able to recover from parsing errors.
Reload whitelist on demand.
Index: sb_culler.py
===================================================================
RCS file: /cvsroot/spambayes/spambayes/contrib/sb_culler.py,v
retrieving revision 1.2
retrieving revision 1.3
diff -C2 -d -r1.2 -r1.3
*** sb_culler.py 27 Oct 2004 02:25:07 -0000 1.2
--- sb_culler.py 27 Oct 2004 02:36:25 -0000 1.3
***************
*** 19,24 ****
--- 19,29 ----
be done by editing the code.
+ The virus identification and POP3 manipulation code is based on Kevin
+ Altis' virus killer code, which I've been gratefully using for the
+ last several months.
+
Written by Andrew Dalke, November 2003.
Released into the public domain on 2003/11/22.
+ Updated 2004/10/26
== NO copyright protection asserted for this code. Share and enjoy! ==
***************
*** 26,35 ****
"""
! import sets, traceback
import poplib
import posixpath
! from email import Header
from spambayes import mboxutils, hammie
DO_ACTIONS = 1
VERBOSE_LEVEL = 1
--- 31,43 ----
"""
! import sets, traceback, md5, os
import poplib
import posixpath
! from email import Header, Utils
from spambayes import mboxutils, hammie
+ import socket
+ socket.setdefaulttimeout(10)
+
DO_ACTIONS = 1
VERBOSE_LEVEL = 1
***************
*** 113,128 ****
class WhiteListFrom:
"""Test: Read a list of email addresses to use a 'from' whitelist"""
def __init__(self, filename):
lines = [line.strip().lower() for line in
! open(filename).readlines()]
self.addresses = sets.Set(lines)
def __call__(self, mi, log):
frm = mi.msg["from"]
status = (frm is not None) and (frm.lower() in self.addresses)
if status:
! log.pass_test("'from' white list")
return "it is in 'from' white list"
return False
--- 121,186 ----
+ class Duplicate:
+ def __init__(self):
+ self.unique = {}
+ def __call__(self, mi, log):
+ digest = md5.md5(mi.text).digest()
+ if digest in self.unique:
+ log.pass_test(SPAM)
+ return "duplicate"
+ self.unique[digest] = 1
+ return False
+
+ class IllegalDeliveredTo:
+ def __init__(self, names):
+ self.names = names
+ def __call__(self, mi, log):
+ fields = mi.msg.get_all("Delivered-To")
+ if fields is None:
+ return False
+
+ for field in fields:
+ field = field.lower()
+ for name in self.names:
+ if name in field:
+ return False
+ log.pass_test(SPAM)
+ return "sent to random email"
+
+ class SpamAssassin:
+ def __init__(self, level = 8):
+ self.level = level
+ def __call__(self, mi, log):
+ if ("*" * self.level) in mi.msg.get("X-Spam-Status", ""):
+ log.pass_test(SPAM)
+ return "assassinated!"
+ return False
+
class WhiteListFrom:
"""Test: Read a list of email addresses to use a 'from' whitelist"""
def __init__(self, filename):
+ self.filename = filename
+ self._mtime = 0
+ self._load_if_needed()
+
+ def _load(self):
lines = [line.strip().lower() for line in
! open(self.filename).readlines()]
self.addresses = sets.Set(lines)
+
+ def _load_if_needed(self):
+ mtime = os.path.getmtime(self.filename)
+ if mtime != self._mtime:
+ print "Reloading", self.filename
+ self._mtime = mtime
+ self._load()
def __call__(self, mi, log):
+ self._load_if_needed()
frm = mi.msg["from"]
+ realname, frm = Utils.parseaddr(frm)
status = (frm is not None) and (frm.lower() in self.addresses)
if status:
! log.pass_test(SPAM)
return "it is in 'from' white list"
return False
***************
*** 212,216 ****
def _log_subject(mi, log):
encoded_subject = mi.msg.get('subject')
! subject, encoding = Header.decode_header(encoded_subject)[0]
if encoding is None or encoding == 'iso-8859-1':
s = subject
--- 270,278 ----
def _log_subject(mi, log):
encoded_subject = mi.msg.get('subject')
! try:
! subject, encoding = Header.decode_header(encoded_subject)[0]
! except Header.HeaderParseError:
! log.info("%s Subject cannot be parsed" % (mi.i,))
! return
if encoding is None or encoding == 'iso-8859-1':
s = subject
***************
*** 230,233 ****
--- 292,297 ----
for i in range(1, count+1):
+ if (i-1) % 10 == 0:
+ print " == %d/%d ==" % (i, count)
# Kevin's code used -1, but -1 doesn't work for one of
# my POP accounts, while a million does.
***************
*** 298,303 ****
try:
# Note this this example uses the default password. YMMV.
! urllib.urlopen("http://:admin@192.168.1.1/Gozila.cgi?pppoeAct=2")
! urllib.urlopen("http://:admin@192.168.1.1/Gozila.cgi?pppoeAct=1")
except KeyboardInterrupt:
raise
--- 362,367 ----
try:
# Note this this example uses the default password. YMMV.
! urllib.urlopen("http://:admin@192.168.1.1/Gozila.cgi?pppoeAct=2").read()
! urllib.urlopen("http://:admin@192.168.1.1/Gozila.cgi?pppoeAct=1").read()
except KeyboardInterrupt:
raise
***************
*** 329,343 ****
filters = Filters()
# A list of everyone who has emailed me this year.
# Keep their messages on the server.
filters.add(WhiteListFrom("good_emails.txt"), KEEP)
! # My mailing lists. Edited to make it slightly harder
! # for spammers to read this description and figure
! # out how to spam me.
! filters.add(WhiteListSubstrings("subject",
! ['[Twisted]', 'CompChem:', '[Bioperl]',
! '[BioPy]', '[SALSA CLUB]', '[Open-bio]',
! '[StarshipCrew]']), KEEP)
# Get rid of anything which smells like an exectuable.
--- 393,427 ----
filters = Filters()
+ duplicate = Duplicate()
+ filters.add(duplicate, AppendFile("spam2.mbox"))
+
# A list of everyone who has emailed me this year.
# Keep their messages on the server.
filters.add(WhiteListFrom("good_emails.txt"), KEEP)
! # My mailing lists.
! filters.add(WhiteListSubstrings("subject", [
! 'ABCD:',
! '[Python-announce]',
! '[Python]',
! '[Bioinfo]',
! '[EuroPython]',
! ]),
! KEEP)
!
! filters.add(WhiteListSubstrings("to", [
! "president at whitehouse.gov",
! "ceo at big.com",
! ]),
! KEEP)
!
! names = ["john", "", "jon", "johnathan"]
! valid_emails = ([name + "@lectroid.com" for name in names] +
! [name + "@bigboote.org" for name in names] +
! ["buckeroo.bonzai at aol.earth"])
!
! filters.add(IllegalDeliveredTo(valid_emails), DELETE)
! filters.add(SpamAssassin(), AppendFile("spam2.mbox"))
!
# Get rid of anything which smells like an exectuable.
***************
*** 349,356 ****
filters.add(IsSpam(h, 0.90), AppendFile("spam.mbox"))
! # These are my POP3 accounts. (or not ;)
server_configs = [("mail.example.com",
! "dalke", "password"),
! ("mail2.spam.com", "dalke", "1234"), ]
# The main culling loop.
--- 433,441 ----
filters.add(IsSpam(h, 0.90), AppendFile("spam.mbox"))
! # These are my POP3 accounts.
server_configs = [("mail.example.com",
! "user at example.com", "password"),
! ("popserver.big.com", "ceo", "12345"), ]
!
# The main culling loop.
***************
*** 361,367 ****
--- 446,455 ----
while 1:
error_flag = False
+ duplicate.unique.clear() # Hack!
for server, user, pwd in server_configs:
try:
log = filter_server( (server, user, pwd), filters)
+ except KeyboardInterrupt:
+ raw_input("Press enter to continue. ")
except StandardError:
raise
***************
*** 406,416 ****
error_count += 1
! if error_count > 20:
restart_network()
error_count = 0
! wait(3*60)
!
!
if __name__ == "__main__":
--- 494,520 ----
error_count += 1
! if error_count > 0:
restart_network()
error_count = 0
! delay = 10 * 60
! while delay:
! try:
! wait(delay)
! break
! except KeyboardInterrupt:
! print
! while 1:
! cmd = raw_input("enter, delay, or quit? ")
! if cmd in ("q", "quit"):
! raise SystemExit(0)
! elif cmd == "":
! delay = 0
! break
! elif cmd.isdigit():
! delay = int(cmd)
! break
! else:
! print "Unknown command."
if __name__ == "__main__":
More information about the Spambayes-checkins
mailing list