How to implement an async message bus

Nagy László Zsolt gandalf at shopzeus.com
Thu Oct 15 13:16:28 EDT 2015


>>     async def waitfor(self, keys, timeout=None):
>>         """Wait until a message comes in for any of the given key(s). Raise
>> BusTimeoutError after the given timedelta."""
>>         ....
>>
>>
>> Internally, the waitfor method would use the add_timeout to resume itself
>> and raise the BusTimeoutError exception after the given timeout, and this
>> should be the only place where a periodic (system time based) sleep would
>> occur. The notify/notifyall methods would also resume execution in waitfor()
>> call(s), but they would provide the message for the waiter instead of
>> raising an exception.
> My first instinct is to suggest that you not reinvent the wheel and
> point you at the asyncio.Condition class. However, it apparently
> doesn't support setting a timeout on wait, which seems odd since the
> threading.Condition class that it's based on does. You could use
> asyncio.wait to wait for it with a timeout, but that wouldn't remove
> the waiter from the Condition. Maybe this would be a useful feature
> request + patch.
I can submit a feature request, but I cannot patch the CPython
implementation. :-)

In order to schedule a callback in the future, you would have to have a
standard event loop interface for scheduling. We do have a base class
asyncio.BaseEventLoop, but tornado's ioloop is NOT a descendant of it.
(It is based on tornado.util.Configureable,  which is based on
"object").  BaseEventLoop.call_later and
tornado.ioloop.IOLoop.add_timeout are similar, but also they are quite
different. I'm confused. How they are working together? In order to have
a timeout on the asyncio.Condition.wait, the Condition object would have
to know how to wait for the given amount of time. But asyncio.call_after
and tornado ioloop.add_timeout are fundamentally different. Aren't they?
If I use multiple io loops (which is uncommon but possible), then how
would the Condition object know what loop should be used for the callback?


>>     async def notify(self, key, message):
>>         """Notify a single waiter. Return if there was a waiter waiting for
>> the key."""
>>         if key in self._waiters:
>>             self._waiters[key][0].send((key, message))
>>             return True
>>         else:
>>             return False
> It looks like you're assuming that the waiter object will be a
> coroutine and trying to call its send method directly rather than
> going through the event loop? 
Well, not exactly. When a message comes in, an external event is
generated (in this case: starts with a network I/O interrupt that
continues as an event inside tornado's ioloop). So the call chain DOES
start in the event loop, and it will eventually calls the send() method
of the waiter directly. The call will continue to run in code that
writes back response to the client waiting for it and finish that
request. Yes, send() is called explicitely. But calling
generator.send(None) is equivalent of next(generator), and we do that
all the time. Why is that a bad idea? I do not see anywhere in the
documentation that send() should not be called. Maybe I'm wrong.
> That seems like a bad idea. In asyncio,
> a coroutine is something that you can await or schedule with
> loop.create_task(). Don't try to use those low-level methods.
But here, you refer to BaseEventLoop.create_task().  I cannot use it
with tornado. :-( In Tornado, most tasks are not created explicitely.
They are started when an external interrupt (http request) comes in.
There is also support for "add_timeout" but I guess it does not use
asyncio.call_later. Frankly I don't know what is the difference in their
implementation.

The thing I want to achieve is "suspend me, give back execution to MY
CURRENT event loop, and resume me when somebody wants to resume me
explicitely (from the same event loop)". This is a basic feature, and I
thought it could be implemented in a way that is independent of the
event loop implementation. (Just like async, await, yield, send and
throw are independent - they can be used with twisted, tornado or
BaseEventLoop etc.)

> I think a better approach would be to make the waiter a Future and
> signal it by setting its result. Something like this, as a rough
> sketch:
>
> async def waitfor(self, keys, timeout=None):
>     waiter = asyncio.Future()
>     for key in keys:
>         self._waiters[key].add(waiter)
>     handle = None
>     if timeout:
>         handle = asyncio.call_later(timeout, self._handle_timeout, waiter)
Will this work if I use tornado's ioloop? I'm sorry but I do not see
what is the difference between tornado ioloop.add_timeout and
asyncio.call_later. Are they fundamentally different? Or can I just use
asyncio.call_later and expect that it will work with any event loop
implementation?
>     try:
>         return await waiter
>     finally:
>         # TODO: Use a context manager to add and remove the keys.
>         for key in keys:
>             self._waiters[key].discard(waiter)
>         if handle:
>             handle.cancel()
>
> def notify(self, key, message):
>     if key in self._waiters and self._waiters[key]:
>         waiter = next(iter(self._waiters[key]))
>         waiter.set_result((key, message))
I think this is what I needed. I'm going to try this tomorrow.



More information about the Python-list mailing list