SimpleXMLRPCServer

dcrespo dcrespo at gmail.com
Wed Oct 26 11:26:19 EDT 2005


Hello,

Here's my SimpleXMLRPCServer derived class for serving XMLRPC
functions. Note that this class enables you to shutdown the xml-rpc
server in the moment you want just accesing the QuitFlag variable of
the class. I would like to add SSL functionality to that. What do I
have to add or modify in it to do so?

class XMLRPCServer(SimpleXMLRPCServer):

    def __init__(self,socket,lock):
        self.bUsarFuncionesXMLRPCPrivadas = False
        SimpleXMLRPCServer.__init__(self, socket)
        SocketServer.ThreadingTCPServer.allow_reuse_address = True
        SocketServer.ThreadingTCPServer.request_queue_size = 100
        self.timeout = 0.1                   # Default timeout: 5.0
seconds
        self.lock = lock   # Should be a preexisting threading.RLock()
object
        self.lock.acquire()
        self.QuitFlag = 0
        self.lock.release()

    def get_request(self):
        socklist = [self.socket]
        while 1:
            # Select with a timeout, then poll a quit flag. An
alternate
            # approach would be to let the master thread "wake us up"
            # with a socket connection.
            ready = select.select(socklist, [], [], self.timeout)
            self.lock.acquire()
            time_to_quit = self.QuitFlag
            self.lock.release()
            if time_to_quit:
                raise TimeToQuit        # Get out now
            if ready[0]:        # A socket was ready to read
                return
SocketServer.ThreadingTCPServer.get_request(self)
            else:               # We timed out, no connection yet
                pass            # Just go back to the select()

    def _dispatch(self, method, params):
        try:
            func = getattr(self, 'xmlrpc_' + method)
        except AttributeError:
            #return 'La funcion no existe'
            raise Exception('La Funcion "%s" no existe' % method)
        else:
            return func(*params)

    def verify_request(self,request,client_address):

        if TrustedHost(client_address):
            return True
        return False

    def test_xmlrpc(self):
        return "xmlrpc works"

Thanks,

Daniel




More information about the Python-list mailing list