[Medusa-dev] Improving main loop
Gregory P. Smith
greg at electricrain.com
Thu Dec 12 10:35:39 EST 2002
A lot of what you describe sounds like stuff available in the "Twisted"
library rather than medusa. You might want to have a look.
http://twistedmatrix.com/
-g
On Wed, Dec 11, 2002 at 05:41:18PM +0300, Alexander Semenov wrote:
> Hi,
>
> I work on tcp/ip server. It must serve about 100 client connections,
> and make two connections to upstream server to suck some info.
> Medusa looks like ideal solution for it. Perfomance is perfect, I think
> it is because of single threaded server
>
> But I want some additional functionality from asyncore/asynchat/medusa
> 1. Timers. I need callbacks in my channels which called every number of
> seconds. Something like standart module sched tied with asyncore.loop()
> I need timer to send heartbeat messsages, and looks for clients which was
> timedout.
> 2. Autoreconnect. I want my outgoing connections to upstream server
> reconnects if become broken. My application is a server, but it must
> maintain client connection to upstream server, and it must run forever.
> 3. Simplify channel protocol. Why we write accepting server for each of
> channel types? They all looks the same. I think, most channels need
> four callbacks: On_Init(self), On_Connected(self), On_Disconnected(self)
> and On_DataArrived(self, data) plus timer signals. All network API
> for channels is Send(self, data) and Close(self). Channel can be started
> in two modes: Listen(addr, channel) and Connect(addr, channel).
>
> Maybe sombody succesful in realizing with features in medusa?
> I made quick hacks and now have working server, but dislike to it.Then, I
> attempt to write my own framework stealing ideas from asyncore, asynchat
> and sched. Now it is very slow. I have no idea why.
>
> I attached it. Maybe sombody will look to it and send me some
> critics/comments?
> Why it is so slow? How I can make this functionality in medusa? What
> calls/callbacks I forget in my protocol (described in paragraph 3)?
>
> Sorry for terrible English,
> Alexeander Semenov.
> '''TCPDispatcher - tcp/ip server which dispatchs channels
> Ideas stealed from asyncore, asynchat and sched
>
> User must derive his channels from Channel. It reacts on
> def On_Init(self): pass
> def On_Connected(self): pass
> def On_Disconnected(self): pass
> def On_DataArrived(self, data): pass
> and can use:
> def Close(self):
> def Send(self, data):
> then, user can install his channel with
> Listen()
> Connect()
> and finally, run loop:
> Serve()
> '''
>
> import time
> import socket
> from bisect import insort
> from select import select, error
> from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, EISCONN, EINTR
>
>
> TIMEOUT = 15 # Timeout for connection in seconds
>
> _sockets = {}
> _crontab = []
>
>
> class Channel:
> def __init__(self, sock, addr):
> self._addr = addr
> self._newsocket(sock)
> self._outbuff = []
> self._inbuff = []
> self._terminator = None
> self.On_Init()
>
> def __repr__(self):
> ret = 'to %s:%d' % self._addr
> if self._connected: ret = 'connected '+ret
> ret = '<%s %s at %#x>' % (self.__class__.__name__, ret, id(self))
> return ret
>
> def _newsocket(self, sock=0):
> global _sockets
>
> try: self._socket.close()
> except: pass
> try: del _sockets[self._socket.fileno()]
> except: pass
>
> self._connected = 1
> if not sock:
> sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
> self._connected = 0
> self._socket = sock
> _sockets[sock.fileno()] = self
> self._disconnected = time.time() - TIMEOUT
>
> def _killtimers(self):
> global _crontab
> _crontab = [t for t in _crontab if not t[2].im_self is self]
>
> def handle_read_event(self):
> if not self._connected:
> self._connected = 1
> self.On_Connected()
> try:
> data = self._socket.recv(4096)
> except socket.error:
> data = ''
> if data:
> if not self._terminator:
> self.On_DataArrived(data)
> return
> pos = data.find(self._terminator[-1:])
> if pos < 0:
> self._inbuff.append(data)
> return
> data = ''.join(self._inbuff)+data
> while 1:
> pos = data.find(self._terminator)
> if pos < 0:
> break
> else:
> l = len(self._terminator)
> self.On_DataArrived(data[:pos+l])
> data = data[pos+l:]
> self._inbuff = [data]
> else:
> data = ''.join(self._inbuff)
> if data: self.On_DataArrived(data)
> self._connected = 0
> self.On_Disconnected()
> try: del _sockets[self._socket.fileno()]
> except: pass
> try: self._socket.close()
> except: pass
> if self._reconnectable:
> self._newsocket()
> else:
> self._killtimers()
>
> def handle_write_event(self):
> if not self._connected:
> self._connected = 1
> self.On_Connected()
> out = ''.join(self._outbuff)
> sent = self._socket.send(out)
> out = out[sent:]
> if out:
> self._outbuff = [out]
> else:
> self._outbuff = []
>
> # API for Cahannel
> def Close(self):
> global _sockets
>
> try:
> self._socket.send(''.join(self._outbuff))
> except:
> pass
> self._killtimers()
> self._reconnectable = 0
> try:
> del _sockets[self._socket.fileno()]
> except KeyError:
> pass
> except socket.error:
> pass
> try:
> self._socket.close()
> except:
> pass
>
> def Send(self, data):
> self._outbuff.append(data)
>
> def SetTerminator(self, term=None):
> self._terminator = term
>
> # Channel callbacks
> def On_Init(self): pass
> def On_Connected(self): pass
> def On_Disconnected(self): pass
> def On_DataArrived(self, data): pass
>
>
> def Shutdown():
> global _sockets, _crontab
> #XXX close connections here
> for srv in _sockets.values():
> try:
> srv._socket.close()
> except:
> pass
> _sockets = _crontab = {}
>
> class Listen:
> def __init__(self, host, port, channel):
> global _sockets
>
> self._addr = (host, port)
> self._channel = channel
> self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
> self._socket.setblocking(0)
> _sockets[self._socket.fileno()] = self
> self._outbuff = []
> try:
> self._socket.setsockopt(
> socket.SOL_SOCKET, socket.SO_REUSEADDR,
> self._socket.getsockopt(socket.SOL_SOCKET,
> socket.SO_REUSEADDR) | 1
> )
> except: pass
> self._socket.bind(self._addr)
> self._reconnectable = 0
> self._socket.listen(5)
>
> def __repr__(self):
> return '<Listener for %s on %s:%d at %#x>' % (
> self._channel.__name__, self._addr[0], self._addr[1], id(self))
>
> def handle_read_event(self):
> srv = self._channel(*self._socket.accept())
> srv._connected = 1
> srv._reconnectable = 0
> srv.On_Connected()
>
>
> def Connect(host, port, channel):
> global _sockets
>
> srv = channel(0, (host, port))
> srv._connected, srv._disconnected, srv._reconnectable = 0, 0, 1
>
>
> def Schedule(time_, repeat, func, *args, **kwargs):
> global _crontab
> if not time_:
> time_ = time.time()+repeat
> insort(_crontab, (time_, repeat, func, args, kwargs))
>
> def process_timers():
> global _crontab
>
> while _crontab:
> if _crontab[0][0]<time.time():
> time_, repeat, func, args, kwargs = _crontab.pop(0)
> if repeat:
> Schedule(time_+repeat, repeat, func, *args, **kwargs)
> func(*args, **kwargs)
> else:
> return _crontab[0][0]-time.time()
> return 0
>
> def watch_connections():
> global _sockets
>
> dead = []
> for fd, srv in _sockets.items():
> if not srv._reconnectable or \
> srv._connected or \
> time.time()-srv._disconnected < TIMEOUT:
> continue
> err = srv._socket.connect_ex(srv._addr)
> if err in (EINPROGRESS, EALREADY, EWOULDBLOCK):
> continue
> if err in (0, EISCONN):
> srv._connected = 1
> srv.On_Connected()
> else:
> dead.append(srv)
> for srv in dead:
> srv._newsocket()
>
> def Serve(): # Main loop
> global _sockets, _crontab
>
> while _sockets or _crontab:
>
> # Connect new and reconnect broken channels
> watch_connections()
>
> # Do timer events
> delay = process_timers()
>
> wsocks = [fd for fd, srv in _sockets.items() if srv._outbuff]
>
> try:
> rsocks, wsocks, _ = select(_sockets.keys(), wsocks, [], delay)
> except error, err:
> if err[0] == EINTR: continue
> if err[0] == 10038: continue
> raise
>
> [_sockets[rsock].handle_read_event() for rsock in rsocks]
> [_sockets[wsock].handle_write_event() for wsock in wsocks]
> _______________________________________________
> Medusa-dev mailing list
> Medusa-dev@python.org
> http://mail.python.org/mailman/listinfo/medusa-dev
--
Some mistakes are too much fun to make only once.
More information about the Medusa-dev
mailing list