SMTP Test Rig

Hughes, Chad O chad.hughes at pnl.gov
Fri Jun 10 14:14:16 EDT 2005


Thanks very much it works great and does exactly what I need for a test
I have to perform.  I started teaching myself the SMTP protocol last
night and it does not look to hard.  But this saves me a great deal of
time an effort.  I hope you can find a good place to host it, as it is
the only example I have seen yet.
Chad

-----Original Message-----
From: Tim Williams [mailto:listserver at tdw.net] 
Sent: Friday, June 10, 2005 5:35 AM
To: Hughes, Chad O; Jesse Noller
Subject: SMTP Test Rig


Chad, Jesse.   The SMTP test rig receives SMTP emails and can be
configured
to write them to the screen,  save them to a file - or both.

Have fun


#
"""
SMTPRIG.py  (SMTP Test Rig)

Version 1.0

Copyright (C) Tim Williams (tdw at tdw.net) 10th June 2005
#
# This software is provided "AS IS" without any warranty
# explicit or implied.
#
# Redistribution of this code is allowed provided the
# above copyright information remains intact.
#
# This code was developed out of the need for a very basic
# test SMTP server, and later expanded to add logging to
# screen and/or the ability to save the email to a text file.
#
# This server has the bare minimum SMTP functionality required # to
impliment the above requirements. It has no error checking # or rfc822
compliance checks, it will receive anything from anyone # no matter how
bad their SMTP dialogue is. # # This server has only been tested in a
windows environment # # This server can receive multiple concurrent
emails (multithreaded inbound) # # This server CAN NOT RELAY email to
another server (unless you add that bit
yourself)
#
# Please feel free to send comments or questions to tdw at tdw.net # """

from socket import *
from SocketServer import *
from Queue import Queue
from random import randrange
from os import path, makedirs
import thread

server_port = 2525      # the port the server will run on
file_path = 'txtmails'  # path for saved files

write_to_screen = 1     # 1 to enable, 0 to disable
write_to_file = 1       # 1 to enable, 0 to disable

try:
    makedirs(file_path)
except OSError, x:
    #[Errno 17] File exists:
    pass

q_print = Queue() #queue for printing to console.

class SMTPServer(ThreadingMixIn, TCPServer):

    # other functions exist here in a live server

    def some_function(self):
        pass

#end of class SMTPServer

class SMTPRequestHandler(StreamRequestHandler):
    global write_to_file, file_path

    def handle(self):
        qprint ('*-'*30)
        self.ip, self.port = self.client_address
        self.port = str(self.port)
        self.ip = str(self.ip)
        qprint ( "<=====> Incoming connection from ", self.ip, ' Port:',
self.port)
        self.terminated = 0
        self.receiving_data = 0
        self.SendResponse('220 ** Welcome **')
        self.msg = []
        self.msg_count = 1
        empty_line = 0
        self.msg_id()

        commands={ 'HELO' : self.__OK,            # supported commands
                   'QUIT' : self.__QUIT,
                   'MAIL' : self.__OK,
                   'RCPT' : self.__OK,
                   'NOOP' : self.__OK,
                   'RSET' : self.__OK,
                   'HELP' : self.__OK,
                   'DATA' : self.__DATA,
                   }

        while not self.terminated:
            try:
                self.inputline = self.rfile.readline()
                self.inputline = self.inputline.rstrip()
                qprint (self.port + '<== ' + self.inputline )
            except:
                self.inputline = ' '

            request = self.inputline.upper().split()
            if self.receiving_data:
                self.receive_body()
            elif request and commands.has_key(request[0]):  # eg: if
has_key HELO
                    commands[request[0]](request)
            else:
                self.SendResponse("500 Unknown Command "+
self.inputline)
                if self.inputline == '':                    # empty
commandline
                    empty_line += 1
                    if empty_line > 9:                      # max empty
lines
                        self.SendResponse('550 Too many empty lines')
                        break                               # end of
smtp
conversation

    def receive_body(self):
        if self.inputline == '.':
            self.receiving_data = 0
            self.msg_count += 1
            self.msg_id()
            if write_to_file:
                self.write_file()
            self.SendResponse('250 2.1.0 OK: Data received')
        else:
            self.msg.append(self.inputline)

    def __OK(self,request):
        self.SendResponse('250 2.1.0 OK')

    def __DATA(self,request):
        self.receiving_data = 1
        self.SendResponse('354 End data with <CR><LF>.<CR><LF>')

    def __QUIT(self,request):
        self.terminated = 1
        self.SendResponse('221 2.1.0 Bye')
        qprint ('*-'*30)
        qprint (' ')

    def SendResponse(self,msg):
        qprint (self.port + '==> '+ msg)
        msg = msg + '\r\n'
        self.wfile.write(msg)
        self.wfile.flush()

    def msg_id(self):
        self.msg_name =
str(randrange(1,999))+self.port+str(self.msg_count)

    def write_file(self):
        msg = '\r\n'.join(self.msg)
        self.msg = []
        file_loc = path.join(file_path, self.msg_name)+'.txt'
        file = open(file_loc, "wb")
        file.write(msg)
        file.close()
# end of class SMTPRequestHandler

def qprint(*args):
    """Put a string on the print queue"""
    if write_to_screen:
        q_print.put(''.join(args))

def printThread():
    """wait for items to print to console"""
    while 1:
        m = q_print.get(1)
        print m

thread.start_new_thread(printThread, ()) # new thread for print queue
processing

def StartSMTPServer():
    print '*'*35
    print 'Server Starting, CTRL-C to end'
    print '*'*35
    setdefaulttimeout( 30  )  # timeout incoming connections
    server = SMTPServer(('', server_port ), SMTPRequestHandler)
    server.serve_forever()
#end of def StartSMTPServer

if __name__ == '__main__':

    StartSMTPServer()





More information about the Python-list mailing list