How to implement an async message bus

Ian Kelly ian.g.kelly at gmail.com
Thu Oct 15 14:53:08 EDT 2015


On Thu, Oct 15, 2015 at 11:16 AM, Nagy László Zsolt
<gandalf at shopzeus.com> wrote:
> In order to schedule a callback in the future, you would have to have a
> standard event loop interface for scheduling. We do have a base class
> asyncio.BaseEventLoop, but tornado's ioloop is NOT a descendant of it.
> (It is based on tornado.util.Configureable,  which is based on
> "object").  BaseEventLoop.call_later and
> tornado.ioloop.IOLoop.add_timeout are similar, but also they are quite
> different. I'm confused. How they are working together? In order to have
> a timeout on the asyncio.Condition.wait, the Condition object would have
> to know how to wait for the given amount of time. But asyncio.call_after
> and tornado ioloop.add_timeout are fundamentally different. Aren't they?
> If I use multiple io loops (which is uncommon but possible), then how
> would the Condition object know what loop should be used for the callback?

Sorry, I didn't realize that you were using tornado and not asyncio.
Presumably it should be on whatever loop the wait call was awaited in,
but I have no idea how compatible asyncio constructs are with tornado
event loops.

>> It looks like you're assuming that the waiter object will be a
>> coroutine and trying to call its send method directly rather than
>> going through the event loop?
> Well, not exactly. When a message comes in, an external event is
> generated (in this case: starts with a network I/O interrupt that
> continues as an event inside tornado's ioloop). So the call chain DOES
> start in the event loop, and it will eventually calls the send() method
> of the waiter directly. The call will continue to run in code that
> writes back response to the client waiting for it and finish that
> request. Yes, send() is called explicitely. But calling
> generator.send(None) is equivalent of next(generator), and we do that
> all the time. Why is that a bad idea? I do not see anywhere in the
> documentation that send() should not be called. Maybe I'm wrong.

I think it's a bad idea because then you're using the generator
synchronously (at which point it might as well just be a function)
rather than as an asynchronous coroutine. For example:

def waiter():
    key, message = yield
    do_something(key, message)

waiter().send((key, message))

This is really not very different from:

def waiter(key, message):
    do_something(key, message)

waiter(key, message)

What if do_something needs to make an async call? Of course it can
always just make a call using a callback, but that's so last decade.
If you want to express it as a coroutine, then you have to use the
first form and then have it yield futures back to the event loop.
Problem: any futures yielded from here aren't going to go to the event
loop, but to the caller of send. That caller then has to loop on the
send call, yielding any futures it gets back to the event loop, and
then sending the results back on to waiter again. At this point,
you've basically implemented await and you might as well have just
used await instead of send in the first place.

> The thing I want to achieve is "suspend me, give back execution to MY
> CURRENT event loop, and resume me when somebody wants to resume me
> explicitely (from the same event loop)". This is a basic feature, and I
> thought it could be implemented in a way that is independent of the
> event loop implementation. (Just like async, await, yield, send and
> throw are independent - they can be used with twisted, tornado or
> BaseEventLoop etc.)

I think that resuming a generator asynchronously is always going to be
dependent on the event loop. Async and await technically don't require
an event loop to be used, but they're kind of useless without one.

>> I think a better approach would be to make the waiter a Future and
>> signal it by setting its result. Something like this, as a rough
>> sketch:
>>
>> async def waitfor(self, keys, timeout=None):
>>     waiter = asyncio.Future()
>>     for key in keys:
>>         self._waiters[key].add(waiter)
>>     handle = None
>>     if timeout:
>>         handle = asyncio.call_later(timeout, self._handle_timeout, waiter)
> Will this work if I use tornado's ioloop? I'm sorry but I do not see
> what is the difference between tornado ioloop.add_timeout and
> asyncio.call_later. Are they fundamentally different? Or can I just use
> asyncio.call_later and expect that it will work with any event loop
> implementation?

I wouldn't expect asyncio.call_later to work with a tornado event
loop, but you should be able to use the tornado equivalent to pretty
much the same effect.

>>     try:
>>         return await waiter
>>     finally:
>>         # TODO: Use a context manager to add and remove the keys.
>>         for key in keys:
>>             self._waiters[key].discard(waiter)
>>         if handle:
>>             handle.cancel()
>>
>> def notify(self, key, message):
>>     if key in self._waiters and self._waiters[key]:
>>         waiter = next(iter(self._waiters[key]))
>>         waiter.set_result((key, message))
> I think this is what I needed. I'm going to try this tomorrow.

Yes, putting aside the asyncio/tornado distinction, I think a Future
will still solve the problem for you.



More information about the Python-list mailing list