How to implement an async message bus

Nagy László Zsolt gandalf at shopzeus.com
Thu Oct 15 07:25:34 EDT 2015


I'm new to Python 3.5 async / await syntax. I would like to create a
class - let's call it AsyncBus - that can be used to listen for keys,
and send messages for keys ansynchronously.

class BusTimeoutError(Exception):
    pass

class AsyncBus(object):
    def __init__(self, add_timeout):
        """add_timeout should be a function that uses the current event
loop implementation to call back"
        ....

    async def notify(self, key, message):
        """Notify a single waiter about a key.  Return if somebody was
waiting for it."""
        ....

    async def notifyall(self, key, message):
        """Notify all waiters. Return the number of waiters notified."""
        ....

    async def waitfor(self, keys, timeout=None):
        """Wait until a message comes in for any of the given key(s).
Raise BusTimeoutError after the given timedelta."""
        ....


Internally, the waitfor method would use the add_timeout to resume
itself and raise the BusTimeoutError exception after the given timeout,
and this should be the only place where a periodic (system time based)
sleep would occur. The notify/notifyall methods would also resume
execution in waitfor() call(s), but they would provide the message for
the waiter instead of raising an exception.

Here is an example use case:

  * Write a chat server, where all the users are running the chat client
    in a browser
  * The browser sends long polling ajax request to the server, that
    returns any new messages immediatelly, or block for at most
    timeout=10 seconds before returning without any message. This long
    poll would be called in an infinite loop in the browser. Internally,
    long poll requests would end in bus.waitfor() calls on the server.
  * When the user sends a new message to another user, then
    bus.notifyall() is awaited. notifyall() awakens all bus.waitfor()
    calls, delivers the message to all clients, and finally gives back
    the number of clients notified to the sender of the message. The
    sender can see how many clients got the message.

I have examined code for long polling in other projects, and I have
found that most of them use add_timeout to check for new messages
periodically. I do not want to follow this practice.

  * let's say there are 1000 clients connected.
  * if I use a lower timeout (say 0.1 seconds) for periodic checks, then
    the server will be woken up 1000 times in ever 0.1 seconds. Avg. in
    every 0.0001 seconds. It will do nothing usefull in 99.99% of that
    time,. That seems to be bad.
  * if I use a higher timeout (say 10 seconds) then messages won't be
    delivered for an average of 5 seconds which is also bad.

So messages should NOT be delivered by periodic checks. They should be
delivered from events triggered by incoming messages. In other words:
when a new message comes in, it should wake up the clients waiting for
messages (for the given user) and deliver the message instantaneously.

Question is: how to write the AsyncBus class? Here is what I have tried
- but I don't know how to create the waiter object at the bottom.


class BusTimeoutError(Exception):
    """Raised when the waiter has been waiting for too long."""
    pass


class AsnycBus(object):
    """Asynchronous notification bus."""

    def __init__(self, add_timeout):
        self._waiters = {}
        self._add_timeout = add_timeout

    async def notify(self, key, message):
        """Notify a single waiter. Return if there was a waiter waiting
for the key."""
        if key in self._waiters:
            self._waiters[key][0].send((key, message))
            return True
        else:
            return False

    async def notifyall(self, key, message):
        """Notify all waiters. Return the number of waiters notified."""
        if key in self._waiters:
            # Get all waiters
            waiters = self._waiters[key]
            for waiter in waiters:
                # Send the message to the waiter
                waiter.send((key, message))
            return len(waiters)
        else:
            return 0

    async def waitfor(self, keys, timeout=None):
        """Wait for keys.

                :arg keys: An iterable of immutable keys.
                :arg timeout: Raise TimeoutError if nothing hits the bus
for this amount of time.
                    None means: wait indefinitely. It should be a
datetime.timedelta object.
        """
        # Register for keys
        if not keys:
            raise Exception("Need some keys to wait for...")

        waiter = ?????????????????????????????

        for key in keys:
            if key in self._waiters:
                self._waiters[key].append(waiter)
            else:
                self._waiters[key] = [waiter]
        try:
            # Add timeout and wake me up if nothing comes in.
            if timeout:
                self._add_timeout(timeout,
functools.partial(waiter.throw, BusTimeoutError))
            return await waiter
        finally:
            for key in keys:
                if key in self._waiters:
                    self._waiters[key].remove(waiter)

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.python.org/pipermail/python-list/attachments/20151015/84a5da4e/attachment.html>


More information about the Python-list mailing list