[Medusa-dev] Improving main loop

Gregory P. Smith greg at electricrain.com
Thu Dec 12 10:35:39 EST 2002


A lot of what you describe sounds like stuff available in the "Twisted"
library rather than medusa.  You might want to have a look.

http://twistedmatrix.com/

-g

On Wed, Dec 11, 2002 at 05:41:18PM +0300, Alexander Semenov wrote:
> Hi,
> 
> I work on tcp/ip server. It must serve about 100 client connections,
> and make two connections to upstream server to suck some info.
> Medusa looks like ideal solution for it. Perfomance is perfect, I think
> it is because of single threaded server
> 
> But I want some additional functionality from asyncore/asynchat/medusa
> 1. Timers. I need callbacks in my channels which called every number of
>    seconds. Something like standart module sched tied with asyncore.loop()
>    I need timer to send heartbeat messsages, and looks for clients which was
>    timedout.
> 2. Autoreconnect. I want my outgoing connections to upstream server
>    reconnects if become broken. My application is a server, but it must
>    maintain client connection to upstream server, and it must run forever.
> 3. Simplify channel protocol. Why we write accepting server for each of
>    channel types? They all looks the same. I think, most channels need
>    four callbacks: On_Init(self), On_Connected(self), On_Disconnected(self)
>    and On_DataArrived(self, data) plus timer signals. All network API
>    for channels is Send(self, data) and Close(self). Channel can be started
>    in two modes: Listen(addr, channel) and Connect(addr, channel).
> 
> Maybe sombody succesful in realizing with features in medusa?
> I made quick hacks and now have working server, but dislike to it.Then, I
> attempt to write my own framework stealing ideas from asyncore, asynchat
> and sched. Now it is very slow. I have no idea why.
> 
> I attached it. Maybe sombody will look to it and send me some
> critics/comments?
> Why it is so slow? How I can make this functionality in medusa? What
> calls/callbacks I forget in my protocol (described in paragraph 3)?
> 
> Sorry for terrible English,
> Alexeander Semenov.

> '''TCPDispatcher - tcp/ip server which dispatchs channels
> Ideas stealed from asyncore, asynchat and sched
> 
> User must derive his channels from Channel. It reacts on 
>     def On_Init(self): pass
>     def On_Connected(self): pass
>     def On_Disconnected(self): pass
>     def On_DataArrived(self, data): pass
> and can use:
>     def Close(self): 
>     def Send(self, data): 
> then, user can install his channel with
>     Listen()
>     Connect()
> and finally, run loop:
>     Serve()
> '''
> 
> import time
> import socket
> from bisect import insort
> from select import select, error
> from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, EISCONN, EINTR
> 
> 
> TIMEOUT = 15 # Timeout for connection in seconds
> 
> _sockets = {}
> _crontab = []
> 
> 
> class Channel:
>     def __init__(self, sock, addr):
>         self._addr = addr
>         self._newsocket(sock)
>         self._outbuff = []
>         self._inbuff = []
>         self._terminator = None
>         self.On_Init()
> 
>     def __repr__(self):
>         ret = 'to %s:%d' % self._addr
>         if self._connected: ret = 'connected '+ret
>         ret = '<%s %s at %#x>' % (self.__class__.__name__, ret, id(self))
>         return ret
> 
>     def _newsocket(self, sock=0):
>         global _sockets
> 
>         try: self._socket.close()
>         except: pass
>         try: del _sockets[self._socket.fileno()]
>         except: pass
> 
>         self._connected = 1
>         if not sock:
>             sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
>             self._connected = 0
>         self._socket = sock
>         _sockets[sock.fileno()] = self
>         self._disconnected = time.time() - TIMEOUT
> 
>     def _killtimers(self):
>         global _crontab
>         _crontab = [t for t in _crontab if not t[2].im_self is self]
> 
>     def handle_read_event(self):
>         if not self._connected:
>             self._connected = 1
>             self.On_Connected()
>         try:
>             data = self._socket.recv(4096)
>         except socket.error:
>             data = ''
>         if data: 
>             if not self._terminator:
>                 self.On_DataArrived(data)
>                 return
>             pos = data.find(self._terminator[-1:])
>             if pos < 0:
>                 self._inbuff.append(data)
>                 return
>             data = ''.join(self._inbuff)+data
>             while 1:
>                 pos = data.find(self._terminator)
>                 if pos < 0: 
>                     break
>                 else:
>                     l = len(self._terminator)
>                     self.On_DataArrived(data[:pos+l])
>                     data = data[pos+l:]
>             self._inbuff = [data]
>         else:
>             data = ''.join(self._inbuff)
>             if data: self.On_DataArrived(data)
>             self._connected = 0
>             self.On_Disconnected()
>             try: del _sockets[self._socket.fileno()]
>             except: pass
>             try: self._socket.close()
>             except: pass
>             if self._reconnectable:
>                 self._newsocket()
>             else:
>                 self._killtimers()
> 
>     def handle_write_event(self):
>         if not self._connected:
>             self._connected = 1
>             self.On_Connected()
>         out = ''.join(self._outbuff)
>         sent = self._socket.send(out)
>         out = out[sent:]
>         if out:
>             self._outbuff = [out]
>         else:
>             self._outbuff = []
> 
>     # API for Cahannel
>     def Close(self): 
>         global _sockets
> 
>         try:
>             self._socket.send(''.join(self._outbuff))
>         except:
>             pass
>         self._killtimers()
>         self._reconnectable = 0
>         try:
>             del _sockets[self._socket.fileno()]
>         except KeyError:
>             pass
>         except socket.error:
>             pass
>         try:
>             self._socket.close()
>         except:
>             pass
> 
>     def Send(self, data): 
>         self._outbuff.append(data)
> 
>     def SetTerminator(self, term=None):
>         self._terminator = term
> 
>     # Channel callbacks
>     def On_Init(self): pass
>     def On_Connected(self): pass
>     def On_Disconnected(self): pass
>     def On_DataArrived(self, data): pass
> 
> 
> def Shutdown():
>     global _sockets, _crontab
>     #XXX close connections here
>     for srv in _sockets.values():
>         try:
>             srv._socket.close()
>         except:
>             pass
>     _sockets = _crontab = {}
> 
> class Listen:
>     def __init__(self, host, port, channel):
>         global _sockets
> 
>         self._addr = (host, port)
>         self._channel = channel
>         self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
>         self._socket.setblocking(0)
>         _sockets[self._socket.fileno()] = self
>         self._outbuff = []
>         try:
>             self._socket.setsockopt(
>                 socket.SOL_SOCKET, socket.SO_REUSEADDR,
>                 self._socket.getsockopt(socket.SOL_SOCKET, 
>                     socket.SO_REUSEADDR) | 1
>             )
>         except: pass
>         self._socket.bind(self._addr)
>         self._reconnectable = 0
>         self._socket.listen(5)
> 
>     def __repr__(self):
>         return '<Listener for %s on %s:%d at %#x>' % (
>             self._channel.__name__, self._addr[0], self._addr[1], id(self))
> 
>     def handle_read_event(self):
>         srv = self._channel(*self._socket.accept())
>         srv._connected = 1
>         srv._reconnectable = 0
>         srv.On_Connected()
> 
> 
> def Connect(host, port, channel):
>     global _sockets
> 
>     srv = channel(0, (host, port))
>     srv._connected, srv._disconnected, srv._reconnectable = 0, 0, 1
> 
> 
> def Schedule(time_, repeat, func, *args, **kwargs):
>     global _crontab
>     if not time_: 
>         time_ = time.time()+repeat
>     insort(_crontab, (time_, repeat, func, args, kwargs))
> 
> def process_timers():
>     global _crontab    
> 
>     while _crontab:
>         if _crontab[0][0]<time.time():
>             time_, repeat, func, args, kwargs = _crontab.pop(0)
>             if repeat:
>                 Schedule(time_+repeat, repeat, func, *args, **kwargs)
>             func(*args, **kwargs)
>         else:
>             return _crontab[0][0]-time.time()
>     return 0
> 
> def watch_connections():
>     global _sockets
> 
>     dead = []
>     for fd, srv in _sockets.items():
>         if not srv._reconnectable or \
>            srv._connected or \
>            time.time()-srv._disconnected < TIMEOUT:
>             continue
>         err = srv._socket.connect_ex(srv._addr)
>         if err in (EINPROGRESS, EALREADY, EWOULDBLOCK):
>             continue
>         if err in (0, EISCONN):
>             srv._connected = 1
>             srv.On_Connected()
>         else:
>             dead.append(srv)
>     for srv in dead:
>         srv._newsocket()
> 
> def Serve(): # Main loop
>     global _sockets, _crontab
> 
>     while _sockets or _crontab:
> 
>         # Connect new and reconnect broken channels
>         watch_connections()
> 
>         # Do timer events
>         delay = process_timers()
> 
>         wsocks = [fd for fd, srv in _sockets.items() if srv._outbuff]
> 
>         try:
>             rsocks, wsocks, _ = select(_sockets.keys(), wsocks, [], delay)
>         except error, err:
>             if err[0] == EINTR: continue
>             if err[0] == 10038: continue
>             raise
> 
>         [_sockets[rsock].handle_read_event() for rsock in rsocks]
>         [_sockets[wsock].handle_write_event() for wsock in wsocks]

> _______________________________________________
> Medusa-dev mailing list
> Medusa-dev@python.org
> http://mail.python.org/mailman/listinfo/medusa-dev


-- 
Some mistakes are too much fun to make only once.



More information about the Medusa-dev mailing list