forking smtpd

Eric S. Johansson esj at harvee.org
Fri Aug 20 10:58:20 EDT 2004


I was working on a filter for postfix and instead of using the "fork a 
new python process on every message" route, I decided to use the SMTP 
interface instead and try forking after having started the Python process.

obviously, I needed some way to receive and processed SMTP. smtpd.py 
works OK if you are willing to use single threaded one at a time 
filtering.  Because filtering takes significant amounts of time, I 
decided to expand smtpd to support a forking server.

I make no claims to the correctness or suitability of these 
modifications or even that I understood wtf was going on with 
asyncore/chat.  it will also be interesting to see what improvements 
others have to suggest. But here's what I found:

it was relatively easy to create a subclass (forkSMTPServer) and inside 
of the accept handler, I was able to fork off the child process and 
release the connection.  I suspect that I haven't released the listen 
socket properly but I will look into that.

I also copied the asyncore polling loop into SMTPServer and modified it 
to support forking and exit on the same signal as the rest of asyncore.

the last change I added was a close method to SMTPChannel and 
process_close method SMTPServer.  This is so one can detect when a 
channel has closed and do something about it such as ending a child process.

so, in the end a forking smtp receiver looks like:

class filter(smtpd.forkSMTPServer):

     def process_message(self, peer, mailfrom, rcpttos, data):

         sleep(5)
         log("postfix peer %s %s"%(peer))
         log("postfix mail from %s"%(mailfrom))
         log("postfix rcpt to %s"%(rcpttos))
         #<do something with message here>
         print "processing",self.parent_ID



f = filter2(("localhost",40025))
try:

     #asyncore.loop(10)
     f.loop(10)
     print "exit from loop"
except KeyboardInterrupt:
     pass
except SystemExit:
     pass

except Exception, error:
     print str(error)
     etype, value, tb = sys.exc_info()
     exception_strings = traceback.format_exception(etype, value, tb)
     for i in exception_strings:
         print(i, 1)



line wrapped patches below:
270,274d269
<     def close(self):
<         print "outside close"
<         asynchat.async_chat.close(self)
<         self.__server.process_close()
<
321,385d315
<     def process_close (self):
<         """override this abstract method to handle the close of SMTP 
channel. """
<         pass
<
<     def loop (self, timeout=30.0, use_poll=0, map=None):
<         global f
<         if map is None:
<             map=asyncore.socket_map
<
<         if use_poll:
<             if hasattr (select, 'poll'):
<                 poll_fun = asyncore.poll3
<             else:
<                 poll_fun = asyncore.poll2
<         else:
<             poll_fun = asyncore.poll
<
<         while map:
<             try:
<                 status = os.waitpid(-1,os.WNOHANG)
<                 print "timeout loop %s %s"% (self.parent_ID, str(status))
<             except OSError, error:
<                 if error[0] == 10:
<                     status = (0,0)
<                 else:
<                     print self.parent_ID, error
<
<             if status[0] == 0:
<                 try:
<                     poll_fun (timeout, map)
<                 except asyncore.ExitNow:
<                     return
<
<
< class forkSMTPServer(SMTPServer):
<
<     def __init__(self, address):
<
<         self.parent_ID = None
<         SMTPServer.__init__(self, address,(0,0))
<         self.mychannel = None
<
<     def handle_accept(self):
<         conn, addr = self.accept()
<         self.parent_ID = os.fork()
<         if not self.parent_ID:
<             print >> DEBUGSTREAM, 'Incoming connection from %s' % 
repr(addr)
<             self.mychannel = SMTPChannel(self, conn, addr)
<         else:
<             # close off socket in parent and fake lack of connection
<             conn.close()
<             self.connected = 0
<             print "child fork ID", self.parent_ID
<
<     def process_close(self):
<
<         print "process my very own close "
<
<         # if we are a child process, we are done so exit
<         if not self.parent_ID:
<             print "exit stage left %s"% (self.parent_ID)
<             raise asyncore.ExitNow
<
<
<

-- 
Speech recognition in use.  It makes mistakes, I correct most




More information about the Python-list mailing list