[Spambayes-checkins] spambayes pop3proxy.py,1.11,1.12

Richie Hindle richiehindle@users.sourceforge.net
Fri Nov 8 08:00:25 2002


Update of /cvsroot/spambayes/spambayes
In directory usw-pr-cvs1:/tmp/cvs-serv25390

Modified Files:
	pop3proxy.py 
Log Message:
 o The database is now saved (optionally) on exit, rather than after each
   message you train with.  There should be explicit save/reload commands,
   but they can come later.
 o It now keeps two mbox files of all the messages that have been used to
   train via the web interface - thanks to Just for the patch.
 o All the sockets now use async - the web interface used to freeze
   whenever the proxy was awaiting a response from the POP3 server.  That's
   now fixed.
 o It now copes with POP3 servers that don't issue a welcome command.
 o The training form now appears in the training results, so you can train
   on another message without having to go back to the Home page.


Index: pop3proxy.py
===================================================================
RCS file: /cvsroot/spambayes/spambayes/pop3proxy.py,v
retrieving revision 1.11
retrieving revision 1.12
diff -C2 -d -r1.11 -r1.12
*** pop3proxy.py	7 Nov 2002 22:27:02 -0000	1.11
--- pop3proxy.py	8 Nov 2002 08:00:20 -0000	1.12
***************
*** 47,50 ****
--- 47,74 ----
  
  
+ todo = """
+  o (Re)training interface - one message per line, quick-rendering table.
+  o Slightly-wordy index page; intro paragraph for each page.
+  o Once the training stuff is on a separate page, make the paste box
+    bigger.
+  o "Links" section (on homepage?) to project homepage, mailing list,
+    etc.
+  o "Home" link (with helmet!) at the end of each page.
+  o "Classify this" - just like Train.
+  o "Send me an email every [...] to remind me to train on new
+    messages."
+  o "Send me a status email every [...] telling how many mails have been
+    classified, etc."
+  o Deployment: Windows executable?  atlaxwin and ctypes?  Or just
+    webbrowser?
+  o Possibly integrate Tim Stone's SMTP code - make it use async, make
+    the training code update (rather than replace!) the database.
+  o Can it cleanly dynamically update its status display while having a
+    POP3 converation?  Hammering reload sucks.
+  o Add a command to save the database without shutting down, and one to
+    reload the database.
+  o Leave the word in the input field after a Word query.
+ """
+ 
  import sys, re, operator, errno, getopt, cPickle, cStringIO, time
  import socket, asyncore, asynchat, cgi, urlparse, webbrowser
***************
*** 92,95 ****
--- 116,120 ----
              self.factory(*args)
  
+ 
  class BrighterAsyncChat(asynchat.async_chat):
      """An asynchat.async_chat that doesn't give spurious warnings on
***************
*** 110,113 ****
--- 135,164 ----
  
  
+ class ServerLineReader(BrighterAsyncChat):
+     """An async socket that reads lines from a remote server and
+     simply calls a callback with the data.  The BayesProxy object
+     can't connect to the real POP3 server and talk to it
+     synchronously, because that would block the process."""
+     
+     def __init__(self, serverName, serverPort, lineCallback):
+         BrighterAsyncChat.__init__(self)
+         self.lineCallback = lineCallback
+         self.request = ''
+         self.set_terminator('\r\n')
+         self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+         self.connect((serverName, serverPort))
+     
+     def collect_incoming_data(self, data):
+         self.request = self.request + data
+ 
+     def found_terminator(self):
+         self.lineCallback(self.request + '\r\n')
+         self.request = ''
+ 
+     def handle_close(self):
+         self.lineCallback('')
+         self.close()
+ 
+ 
  class POP3ProxyBase(BrighterAsyncChat):
      """An async dispatcher that understands POP3 and proxies to a POP3
***************
*** 126,134 ****
          BrighterAsyncChat.__init__(self, clientSocket)
          self.request = ''
          self.set_terminator('\r\n')
!         self.serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
!         self.serverSocket.connect((serverName, serverPort))
!         self.serverIn = self.serverSocket.makefile('r')  # For reading only
!         self.push(self.serverIn.readline())
  
      def onTransaction(self, command, args, response):
--- 177,189 ----
          BrighterAsyncChat.__init__(self, clientSocket)
          self.request = ''
+         self.response = ''
          self.set_terminator('\r\n')
!         self.command = ''           # The POP3 command being processed...
!         self.args = ''              # ...and its arguments
!         self.isClosing = False      # Has the server closed the socket?
!         self.seenAllHeaders = False # For the current RETR or TOP
!         self.startTime = 0          # (ditto)
!         self.serverSocket = ServerLineReader(serverName, serverPort, 
!                                              self.onServerLine)
  
      def onTransaction(self, command, args, response):
***************
*** 139,152 ****
          raise NotImplementedError
  
!     def isMultiline(self, command, args):
!         """Returns True if the given request should get a multiline
          response (assuming the response is positive).
          """
!         if command in ['USER', 'PASS', 'APOP', 'QUIT',
!                        'STAT', 'DELE', 'NOOP', 'RSET', 'KILL']:
              return False
!         elif command in ['RETR', 'TOP']:
              return True
!         elif command in ['LIST', 'UIDL']:
              return len(args) == 0
          else:
--- 194,237 ----
          raise NotImplementedError
  
!     def onServerLine(self, line):
!         """A line of response has been received from the POP3 server."""
!         isFirstLine = not self.response
!         self.response = self.response + line
!         
!         # Is this line that terminates a set of headers?
!         self.seenAllHeaders = self.seenAllHeaders or line in ['\r\n', '\n']
!         
!         # Has the server closed its end of the socket?
!         if not line:
!             self.isClosing = True
!         
!         # If we're not processing a command, just echo the response.
!         if not self.command:
!             self.push(self.response)
!             self.response = ''
!         
!         # Time out after 30 seconds for message-retrieval commands if
!         # all the headers are down.  The rest of the message will proxy
!         # straight through.
!         if self.command in ['TOP', 'RETR'] and \
!            self.seenAllHeaders and time.time() > self.startTime + 30:
!             self.onResponse()
!             self.response = ''
!         # If that's a complete response, handle it.
!         elif not self.isMultiline() or line == '.\r\n' or \
!            (isFirstLine and line.startswith('-ERR')):
!             self.onResponse()
!             self.response = ''
!     
!     def isMultiline(self):
!         """Returns True if the request should get a multiline
          response (assuming the response is positive).
          """
!         if self.command in ['USER', 'PASS', 'APOP', 'QUIT',
!                             'STAT', 'DELE', 'NOOP', 'RSET', 'KILL']:
              return False
!         elif self.command in ['RETR', 'TOP']:
              return True
!         elif self.command in ['LIST', 'UIDL']:
              return len(args) == 0
          else:
***************
*** 155,204 ****
              return False
  
-     def readResponse(self, command, args):
-         """Reads the POP3 server's response and returns a tuple of
-         (response, isClosing, timedOut).  isClosing is True if the
-         server closes the socket, which tells found_terminator() to
-         close when the response has been sent.  timedOut is set if a
-         TOP or RETR request was still arriving after 30 seconds, and
-         tells found_terminator() to proxy the remainder of the response.
-         """
-         responseLines = []
-         startTime = time.time()
-         isMulti = self.isMultiline(command, args)
-         isClosing = False
-         timedOut = False
-         isFirstLine = True
-         seenAllHeaders = False
-         while True:
-             line = self.serverIn.readline()
-             if not line:
-                 # The socket's been closed by the server, probably by QUIT.
-                 isClosing = True
-                 break
-             elif not isMulti or (isFirstLine and line.startswith('-ERR')):
-                 # A single-line response.
-                 responseLines.append(line)
-                 break
-             elif line == '.\r\n':
-                 # The termination line.
-                 responseLines.append(line)
-                 break
-             else:
-                 # A normal line - append it to the response and carry on.
-                 responseLines.append(line)
-                 seenAllHeaders = seenAllHeaders or line in ['\r\n', '\n']
- 
-             # Time out after 30 seconds for message-retrieval commands
-             # if all the headers are down - found_terminator() knows how
-             # to deal with this.
-             if command in ['TOP', 'RETR'] and \
-                seenAllHeaders and time.time() > startTime + 30:
-                 timedOut = True
-                 break
- 
-             isFirstLine = False
- 
-         return ''.join(responseLines), isClosing, timedOut
- 
      def collect_incoming_data(self, data):
          """Asynchat override."""
--- 240,243 ----
***************
*** 207,256 ****
      def found_terminator(self):
          """Asynchat override."""
-         # Send the request to the server and read the reply.
          if self.request.strip().upper() == 'KILL':
              self.serverSocket.sendall('QUIT\r\n')
              self.send("+OK, dying.\r\n")
              self.shutdown(2)
              self.close()
              raise SystemExit
!         self.serverSocket.sendall(self.request + '\r\n')
          if self.request.strip() == '':
              # Someone just hit the Enter key.
!             command, args = ('', '')
          else:
              splitCommand = self.request.strip().split(None, 1)
!             command = splitCommand[0].upper()
!             args = splitCommand[1:]
!         rawResponse, isClosing, timedOut = self.readResponse(command, args)
! 
          # Pass the request and the raw response to the subclass and
          # send back the cooked response.
!         cookedResponse = self.onTransaction(command, args, rawResponse)
!         self.push(cookedResponse)
!         self.request = ''
! 
!         # If readResponse() timed out, we still need to read and proxy
!         # the rest of the message.
!         if timedOut:
!             while True:
!                 line = self.serverIn.readline()
!                 if not line:
!                     # The socket's been closed by the server.
!                     isClosing = True
!                     break
!                 elif line == '.\r\n':
!                     # The termination line.
!                     self.push(line)
!                     break
!                 else:
!                     # A normal line.
!                     self.push(line)
! 
!         # If readResponse() or the loop above decided that the server
!         # has closed its socket, close this one when the response has
!         # been sent.
!         if isClosing:
              self.close_when_done()
  
  
  class BayesProxyListener(Listener):
--- 246,288 ----
      def found_terminator(self):
          """Asynchat override."""
          if self.request.strip().upper() == 'KILL':
              self.serverSocket.sendall('QUIT\r\n')
              self.send("+OK, dying.\r\n")
+             self.serverSocket.shutdown(2)
+             self.serverSocket.close()
              self.shutdown(2)
              self.close()
              raise SystemExit
!         
!         self.serverSocket.push(self.request + '\r\n')
          if self.request.strip() == '':
              # Someone just hit the Enter key.
!             self.command = self.args = ''
          else:
+             # A proper command.
              splitCommand = self.request.strip().split(None, 1)
!             self.command = splitCommand[0].upper()
!             self.args = splitCommand[1:]
!             self.startTime = time.time()
!         
!         self.request = ''
!         
!     def onResponse(self):
          # Pass the request and the raw response to the subclass and
          # send back the cooked response.
!         cooked = self.onTransaction(self.command, self.args, self.response)
!         self.push(cooked)
!         
!         # If onServerLine() decided that the server has closed its
!         # socket, close this one when the response has been sent.
!         if self.isClosing:
              self.close_when_done()
  
+         # Reset.
+         self.command = ''
+         self.args = ''
+         self.isClosing = False
+         self.seenAllHeaders = False
+ 
  
  class BayesProxyListener(Listener):
***************
*** 452,456 ****
               table { font: 90%% arial, swiss, helvetica }
               form { margin: 0 }
!              .banner { background: #c0e0ff; padding=5; padding-left: 15 }
               .header { font-size: 133%% }
               .content { margin: 15 }
--- 484,490 ----
               table { font: 90%% arial, swiss, helvetica }
               form { margin: 0 }
!              .banner { background: #c0e0ff; padding=5; padding-left: 15;
!                        border-top: 1px solid black;
!                        border-bottom: 1px solid black }
               .header { font-size: 133%% }
               .content { margin: 15 }
***************
*** 466,470 ****
                  <div class='banner'>
                  <img src='/helmet.gif' align='absmiddle'>
!                 <span class='header'>Spambayes proxy: %s</span></div>
                  <div class='content'>\n"""
  
--- 500,504 ----
                  <div class='banner'>
                  <img src='/helmet.gif' align='absmiddle'>
!                 <span class='header'>&nbsp;Spambayes proxy: %s</span></div>
                  <div class='content'>\n"""
  
***************
*** 475,481 ****
               <a href='http://www.spambayes.org/'>Spambayes.org</a></td>
               <td align='right' class='banner'>
!              <input type='submit' value='Shutdown now'>
               </td></tr></table></form>\n"""
  
      pageSection = """<table class='sectiontable' cellspacing='0'>
                    <tr><td class='sectionheading'>%s</td></tr>
--- 509,520 ----
               <a href='http://www.spambayes.org/'>Spambayes.org</a></td>
               <td align='right' class='banner'>
!              %s
               </td></tr></table></form>\n"""
  
+     shutdownDB = """<input type='submit' name='how' value='Shutdown'>"""
+     
+     shutdownPickle = shutdownDB + """&nbsp;&nbsp;
+             <input type='submit' name='how' value='Save &amp; shutdown'>"""
+ 
      pageSection = """<table class='sectiontable' cellspacing='0'>
                    <tr><td class='sectionheading'>%s</td></tr>
***************
*** 483,486 ****
--- 522,533 ----
                    &nbsp;<br>\n"""
      
+     summary = """POP3 proxy running on port <b>%(proxyPort)d</b>,
+               proxying to <b>%(serverName)s:%(serverPort)d</b>.<br>
+               Active POP3 conversations: <b>%(activeSessions)d</b>.<br>
+               POP3 conversations this session: <b>%(totalSessions)d</b>.<br>
+               Emails classified this session: <b>%(numSpams)d</b> spam,
+                 <b>%(numHams)d</b> ham, <b>%(numUnsure)d</b> unsure.
+               """
+     
      wordQuery = """<form action='/wordquery'>
                  <input name='word' type='text' size='30'>
***************
*** 488,491 ****
--- 535,550 ----
                  </form>"""
      
+     train = """<form action='/upload' method='POST'
+                 enctype='multipart/form-data'>
+             Either upload a message file: <input type='file' name='file'><br>
+             Or paste the whole message (incuding headers) here:<br>
+             <textarea name='text' rows='3' cols='60'></textarea><br>
+             Is this message
+             <input type='radio' name='which' value='ham'>Ham</input> or
+             <input type='radio'
+                    name='which' value='spam' checked>Spam</input>?<br>
+             <input type='submit' value='Train on this message'>
+             </form>"""
+     
      def __init__(self, clientSocket, bayes):
          BrighterAsyncChat.__init__(self, clientSocket)
***************
*** 502,506 ****
          """Asynchat override.
          Read and parse the HTTP request and call an on<Command> handler."""
!         requestLine, headers = self.request.split('\r\n', 1)
          try:
              method, url, version = requestLine.strip().split()
--- 561,565 ----
          """Asynchat override.
          Read and parse the HTTP request and call an on<Command> handler."""
!         requestLine, headers = (self.request+'\r\n').split('\r\n', 1)
          try:
              method, url, version = requestLine.strip().split()
***************
*** 547,551 ****
          
          if path == '/helmet.gif':
!             self.pushOKHeaders('image/gif')
              self.push(self.helmet)
          else:
--- 606,614 ----
          
          if path == '/helmet.gif':
!             # XXX Why doesn't Expires work?  Must read RFC 2616 one day.
!             inOneHour = time.gmtime(time.time() + 3600)
!             expiryDate = time.strftime('%a, %d %b %Y %H:%M:%S GMT', inOneHour)
!             extraHeaders = {'Expires': expiryDate}
!             self.pushOKHeaders('image/gif', extraHeaders)
              self.push(self.helmet)
          else:
***************
*** 554,558 ****
                  handler = getattr(self, 'on' + name)
              except AttributeError:
!                 self.pushError(404, "Not found: '%s'" % url)
              else:
                  # This is a request for a valid page; run the handler.
--- 617,621 ----
                  handler = getattr(self, 'on' + name)
              except AttributeError:
!                 self.pushError(404, "Not found: '%s'" % path)
              else:
                  # This is a request for a valid page; run the handler.
***************
*** 561,569 ****
                  handler(params)
                  timeString = time.asctime(time.localtime())
!                 self.push(self.footer % timeString)
      
!     def pushOKHeaders(self, contentType):
!         self.push("HTTP/1.0 200 OK\r\n")
          self.push("Content-Type: %s\r\n" % contentType)
          self.push("\r\n")
  
--- 624,641 ----
                  handler(params)
                  timeString = time.asctime(time.localtime())
!                 if status.useDB:
!                     self.push(self.footer % (timeString, self.shutdownDB))
!                 else:
!                     self.push(self.footer % (timeString, self.shutdownPickle))
      
!     def pushOKHeaders(self, contentType, extraHeaders={}):
!         timeNow = time.gmtime(time.time())
!         httpNow = time.strftime('%a, %d %b %Y %H:%M:%S GMT', timeNow)
!         self.push("HTTP/1.1 200 OK\r\n")
!         self.push("Connection: close\r\n")
          self.push("Content-Type: %s\r\n" % contentType)
+         self.push("Date: %s\r\n" % httpNow)
+         for name, value in extraHeaders.items():
+             self.push("%s: %s\r\n" % (name, value))
          self.push("\r\n")
  
***************
*** 583,616 ****
  
      def onHome(self, params):
!         summary = """POP3 proxy running on port <b>%(proxyPort)d</b>,
!                   proxying to <b>%(serverName)s:%(serverPort)d</b>.<br>
!                   Active POP3 conversations: <b>%(activeSessions)d</b>.<br>
!                   POP3 conversations this session:
!                     <b>%(totalSessions)d</b>.<br>
!                   Emails classified this session: <b>%(numSpams)d</b> spam,
!                     <b>%(numHams)d</b> ham, <b>%(numUnsure)d</b> unsure.
!                   """ % status.__dict__
!         
!         train = """<form action='/upload' method='POST'
!                     enctype='multipart/form-data'>
!                 Either upload a message file:
!                 <input type='file' name='file'><br>
!                 Or paste the whole message (incuding headers) here:<br>
!                 <textarea name='text' rows='3' cols='60'></textarea><br>
!                 Is this message
!                 <input type='radio' name='which' value='ham'>Ham</input> or
!                 <input type='radio'
!                        name='which' value='spam' checked>Spam</input>?<br>
!                 <input type='submit' value='Train on this message'>
!                 </form>"""
!         
!         body = (self.pageSection % ('Status', summary) +
!                 self.pageSection % ('Word query', self.wordQuery) +
!                 self.pageSection % ('Train', train))
          self.push(body)
  
      def onShutdown(self, params):
!         self.push("<p><b>Shutdown.</b> Goodbye.</p>")
!         self.push(' ')  # Acts as a flush for small buffers.
          self.shutdown(2)
          self.close()
--- 655,675 ----
  
      def onHome(self, params):
!         """Serve up the homepage."""
!         body = (self.pageSection % ('Status', self.summary % status.__dict__)+
!                 self.pageSection % ('Word query', self.wordQuery)+
!                 self.pageSection % ('Train', self.train))
          self.push(body)
  
      def onShutdown(self, params):
!         """Shutdown the server, saving the pickle if requested to do so."""
!         if params['how'].lower().find('save') >= 0:
!             if not status.useDB and status.pickleName:
!                 self.push("<b>Saving...</b>")
!                 self.push(' ')  # Acts as a flush for small buffers.
!                 fp = open(status.pickleName, 'wb')
!                 cPickle.dump(self.bayes, fp, 1)
!                 fp.close()
!         self.push("<b>Shutdown</b>. Goodbye.")
!         self.push(' ')
          self.shutdown(2)
          self.close()
***************
*** 618,625 ****
  
      def onUpload(self, params):
          message = params.get('file') or params.get('text')
          isSpam = (params['which'] == 'spam')
          # Append the message to a file, to make it easier to rebuild
!         # the database later.
          message = message.replace('\r\n', '\n').replace('\r', '\n')
          if isSpam:
--- 677,690 ----
  
      def onUpload(self, params):
+         """Train on an uploaded or pasted message."""
+         # Upload or paste?  Spam or ham?
          message = params.get('file') or params.get('text')
          isSpam = (params['which'] == 'spam')
+         
          # Append the message to a file, to make it easier to rebuild
!         # the database later.   This is a temporary implementation -
!         # it should keep a Corpus (from Tim Stone's forthcoming message
!         # management module) to manage a cache of messages.  It needs
!         # to keep them for the HTML retraining interface anyway.
          message = message.replace('\r\n', '\n').replace('\r', '\n')
          if isSpam:
***************
*** 627,642 ****
          else:
              f = open("_pop3proxyham.mbox", "a")
!         f.write("From ???@???\n")  # fake From line (XXX good enough?)
          f.write(message)
!         f.write("\n")
          f.close()
          self.bayes.learn(tokenizer.tokenize(message), isSpam, True)
!         self.push("""<p>Trained on your message. Saving database...</p>""")
!         self.push(" ")  # Flush... must find out how to do this properly...
!         if not status.useDB and status.pickleName:
!             fp = open(status.pickleName, 'wb')
!             cPickle.dump(self.bayes, fp, 1)
!             fp.close()
!         self.push("<p>Done.</p><p><a href='/'>Home</a></p>")
  
      def onWordquery(self, params):
--- 692,704 ----
          else:
              f = open("_pop3proxyham.mbox", "a")
!         f.write("From pop3proxy@spambayes.org Sat Jan 31 00:00:00 2000\n")
          f.write(message)
!         f.write("\n\n")
          f.close()
+ 
+         # Train on the message.
          self.bayes.learn(tokenizer.tokenize(message), isSpam, True)
!         self.push("<p>OK. Return <a href='/'>Home</a> or train another:</p>")
!         self.push(self.pageSection % ('Train another', self.train))
  
      def onWordquery(self, params):
***************
*** 656,660 ****
              info = "'%s' does not appear in the database." % word
          
!         body = (self.pageSection % ("Statistics for '%s':" % word, info) +
                  self.pageSection % ('Word query', self.wordQuery))
          self.push(body)
--- 718,722 ----
              info = "'%s' does not appear in the database." % word
          
!         body = (self.pageSection % ("Statistics for '%s'" % word, info) +
                  self.pageSection % ('Word query', self.wordQuery))
          self.push(body)
***************
*** 765,771 ****
          else:
              handler = self.handlers.get(command, self.onUnknown)
!             self.push(handler(command, args))
          self.request = ''
  
      def onStat(self, command, args):
          """POP3 STAT command."""
--- 827,839 ----
          else:
              handler = self.handlers.get(command, self.onUnknown)
!             self.push(handler(command, args))   # Or push_slowly for testing
          self.request = ''
  
+     def push_slowly(self, response):
+         """Useful for testing."""
+         for c in response:
+             self.push(c)
+             time.sleep(0.02)
+ 
      def onStat(self, command, args):
          """POP3 STAT command."""
***************
*** 777,781 ****
          """POP3 LIST command, with optional message number argument."""
          if args:
!             number = int(args)
              if 0 < number <= len(self.maildrop):
                  return "+OK %d\r\n" % len(self.maildrop[number-1])
--- 845,852 ----
          """POP3 LIST command, with optional message number argument."""
          if args:
!             try:
!                 number = int(args)
!             except ValueError:
!                 number = -1
              if 0 < number <= len(self.maildrop):
                  return "+OK %d\r\n" % len(self.maildrop[number-1])
***************
*** 803,811 ****
      def onRetr(self, command, args):
          """POP3 RETR command."""
!         return self._getMessage(int(args), 12345)
  
      def onTop(self, command, args):
          """POP3 RETR command."""
!         number, lines = map(int, args.split())
          return self._getMessage(number, lines)
  
--- 874,889 ----
      def onRetr(self, command, args):
          """POP3 RETR command."""
!         try:
!             number = int(args)
!         except ValueError:
!             number = -1
!         return self._getMessage(number, 12345)
  
      def onTop(self, command, args):
          """POP3 RETR command."""
!         try:
!             number, lines = map(int, args.split())
!         except ValueError:
!             number, lines = -1, -1
          return self._getMessage(number, lines)
  
***************
*** 863,867 ****
          while response.find('\n.\r\n') == -1:
              response = response + proxy.recv(1000)
!         assert response.find(options.hammie_header_name) != -1
  
      # Kill the proxy and the test server.
--- 941,945 ----
          while response.find('\n.\r\n') == -1:
              response = response + proxy.recv(1000)
!         assert response.find(options.hammie_header_name) >= 0
  
      # Kill the proxy and the test server.





More information about the Spambayes-checkins mailing list