How to implement an async message bus

Ian Kelly ian.g.kelly at gmail.com
Thu Oct 15 10:44:40 EDT 2015


On Thu, Oct 15, 2015 at 5:25 AM, Nagy László Zsolt <gandalf at shopzeus.com> wrote:
> I'm new to Python 3.5 async / await syntax. I would like to create a class -
> let's call it AsyncBus - that can be used to listen for keys, and send
> messages for keys ansynchronously.
>
> class BusTimeoutError(Exception):
>     pass
>
> class AsyncBus(object):
>     def __init__(self, add_timeout):
>         """add_timeout should be a function that uses the current event loop
> implementation to call back"
>         ....
>
>     async def notify(self, key, message):
>         """Notify a single waiter about a key.  Return if somebody was
> waiting for it."""
>         ....
>
>     async def notifyall(self, key, message):
>         """Notify all waiters. Return the number of waiters notified."""
>         ....
>
>     async def waitfor(self, keys, timeout=None):
>         """Wait until a message comes in for any of the given key(s). Raise
> BusTimeoutError after the given timedelta."""
>         ....
>
>
> Internally, the waitfor method would use the add_timeout to resume itself
> and raise the BusTimeoutError exception after the given timeout, and this
> should be the only place where a periodic (system time based) sleep would
> occur. The notify/notifyall methods would also resume execution in waitfor()
> call(s), but they would provide the message for the waiter instead of
> raising an exception.

My first instinct is to suggest that you not reinvent the wheel and
point you at the asyncio.Condition class. However, it apparently
doesn't support setting a timeout on wait, which seems odd since the
threading.Condition class that it's based on does. You could use
asyncio.wait to wait for it with a timeout, but that wouldn't remove
the waiter from the Condition. Maybe this would be a useful feature
request + patch.

> Question is: how to write the AsyncBus class? Here is what I have tried -
> but I don't know how to create the waiter object at the bottom.
>
>
> class BusTimeoutError(Exception):
>     """Raised when the waiter has been waiting for too long."""
>     pass
>
>
> class AsnycBus(object):
>     """Asynchronous notification bus."""
>
>     def __init__(self, add_timeout):
>         self._waiters = {}
>         self._add_timeout = add_timeout
>
>     async def notify(self, key, message):
>         """Notify a single waiter. Return if there was a waiter waiting for
> the key."""
>         if key in self._waiters:
>             self._waiters[key][0].send((key, message))
>             return True
>         else:
>             return False


It looks like you're assuming that the waiter object will be a
coroutine and trying to call its send method directly rather than
going through the event loop? That seems like a bad idea. In asyncio,
a coroutine is something that you can await or schedule with
loop.create_task(). Don't try to use those low-level methods.

I think a better approach would be to make the waiter a Future and
signal it by setting its result. Something like this, as a rough
sketch:

async def waitfor(self, keys, timeout=None):
    waiter = asyncio.Future()
    for key in keys:
        self._waiters[key].add(waiter)
    handle = None
    if timeout:
        handle = asyncio.call_later(timeout, self._handle_timeout, waiter)
    try:
        return await waiter
    finally:
        # TODO: Use a context manager to add and remove the keys.
        for key in keys:
            self._waiters[key].discard(waiter)
        if handle:
            handle.cancel()

def notify(self, key, message):
    if key in self._waiters and self._waiters[key]:
        waiter = next(iter(self._waiters[key]))
        waiter.set_result((key, message))
        return True
    return False

def _handle_timeout(self, waiter):
    waiter.set_exception(new BusTimeoutError())



More information about the Python-list mailing list