How to implement an async message bus
Nagy László Zsolt
gandalf at shopzeus.com
Thu Oct 15 07:25:34 EDT 2015
I'm new to Python 3.5 async / await syntax. I would like to create a
class - let's call it AsyncBus - that can be used to listen for keys,
and send messages for keys ansynchronously.
class BusTimeoutError(Exception):
pass
class AsyncBus(object):
def __init__(self, add_timeout):
"""add_timeout should be a function that uses the current event
loop implementation to call back"
....
async def notify(self, key, message):
"""Notify a single waiter about a key. Return if somebody was
waiting for it."""
....
async def notifyall(self, key, message):
"""Notify all waiters. Return the number of waiters notified."""
....
async def waitfor(self, keys, timeout=None):
"""Wait until a message comes in for any of the given key(s).
Raise BusTimeoutError after the given timedelta."""
....
Internally, the waitfor method would use the add_timeout to resume
itself and raise the BusTimeoutError exception after the given timeout,
and this should be the only place where a periodic (system time based)
sleep would occur. The notify/notifyall methods would also resume
execution in waitfor() call(s), but they would provide the message for
the waiter instead of raising an exception.
Here is an example use case:
* Write a chat server, where all the users are running the chat client
in a browser
* The browser sends long polling ajax request to the server, that
returns any new messages immediatelly, or block for at most
timeout=10 seconds before returning without any message. This long
poll would be called in an infinite loop in the browser. Internally,
long poll requests would end in bus.waitfor() calls on the server.
* When the user sends a new message to another user, then
bus.notifyall() is awaited. notifyall() awakens all bus.waitfor()
calls, delivers the message to all clients, and finally gives back
the number of clients notified to the sender of the message. The
sender can see how many clients got the message.
I have examined code for long polling in other projects, and I have
found that most of them use add_timeout to check for new messages
periodically. I do not want to follow this practice.
* let's say there are 1000 clients connected.
* if I use a lower timeout (say 0.1 seconds) for periodic checks, then
the server will be woken up 1000 times in ever 0.1 seconds. Avg. in
every 0.0001 seconds. It will do nothing usefull in 99.99% of that
time,. That seems to be bad.
* if I use a higher timeout (say 10 seconds) then messages won't be
delivered for an average of 5 seconds which is also bad.
So messages should NOT be delivered by periodic checks. They should be
delivered from events triggered by incoming messages. In other words:
when a new message comes in, it should wake up the clients waiting for
messages (for the given user) and deliver the message instantaneously.
Question is: how to write the AsyncBus class? Here is what I have tried
- but I don't know how to create the waiter object at the bottom.
class BusTimeoutError(Exception):
"""Raised when the waiter has been waiting for too long."""
pass
class AsnycBus(object):
"""Asynchronous notification bus."""
def __init__(self, add_timeout):
self._waiters = {}
self._add_timeout = add_timeout
async def notify(self, key, message):
"""Notify a single waiter. Return if there was a waiter waiting
for the key."""
if key in self._waiters:
self._waiters[key][0].send((key, message))
return True
else:
return False
async def notifyall(self, key, message):
"""Notify all waiters. Return the number of waiters notified."""
if key in self._waiters:
# Get all waiters
waiters = self._waiters[key]
for waiter in waiters:
# Send the message to the waiter
waiter.send((key, message))
return len(waiters)
else:
return 0
async def waitfor(self, keys, timeout=None):
"""Wait for keys.
:arg keys: An iterable of immutable keys.
:arg timeout: Raise TimeoutError if nothing hits the bus
for this amount of time.
None means: wait indefinitely. It should be a
datetime.timedelta object.
"""
# Register for keys
if not keys:
raise Exception("Need some keys to wait for...")
waiter = ?????????????????????????????
for key in keys:
if key in self._waiters:
self._waiters[key].append(waiter)
else:
self._waiters[key] = [waiter]
try:
# Add timeout and wake me up if nothing comes in.
if timeout:
self._add_timeout(timeout,
functools.partial(waiter.throw, BusTimeoutError))
return await waiter
finally:
for key in keys:
if key in self._waiters:
self._waiters[key].remove(waiter)
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.python.org/pipermail/python-list/attachments/20151015/84a5da4e/attachment.html>
More information about the Python-list
mailing list