How to implement an async message bus

Nagy László Zsolt gandalf at shopzeus.com
Fri Oct 16 02:30:39 EDT 2015


> Sorry, I didn't realize that you were using tornado and not asyncio.
> Presumably it should be on whatever loop the wait call was awaited in,
> but I have no idea how compatible asyncio constructs are with tornado
> event loops.
Exactly. I had the same doubts.
> I think it's a bad idea because then you're using the generator
> synchronously (at which point it might as well just be a function)
> rather than as an asynchronous coroutine. For example:
>
> def waiter():
>     key, message = yield
>     do_something(key, message)
>
> waiter().send((key, message))
>
> This is really not very different from:
>
> def waiter(key, message):
>     do_something(key, message)
>
> waiter(key, message)

I wanted to make a big difference - give back execution to the event
loop by the "key, message = yield" statement. But apparently, it is not
possible to give back control to the event loop with a simple yield, I
would have to yield a Future. (As I said: I'm new to async programming
and it was a beginner's mistake.)


>
> What if do_something needs to make an async call? Of course it can
> always just make a call using a callback, but that's so last decade.
> If you want to express it as a coroutine, then you have to use the
> first form and then have it yield futures back to the event loop.
> Problem: any futures yielded from here aren't going to go to the event
> loop, but to the caller of send. That caller then has to loop on the
> send call, yielding any futures it gets back to the event loop, and
> then sending the results back on to waiter again. At this point,
> you've basically implemented await and you might as well have just
> used await instead of send in the first place.
Good point. :-)
>
>> The thing I want to achieve is "suspend me, give back execution to MY
>> CURRENT event loop, and resume me when somebody wants to resume me
>> explicitely (from the same event loop)". This is a basic feature, and I
>> thought it could be implemented in a way that is independent of the
>> event loop implementation. (Just like async, await, yield, send and
>> throw are independent - they can be used with twisted, tornado or
>> BaseEventLoop etc.)
> I think that resuming a generator asynchronously is always going to be
> dependent on the event loop. Async and await technically don't require
> an event loop to be used, but they're kind of useless without one.
Yes, they they need an event loop to be useful. I just wanted them to be
independent of the concrete implementation. This is why we have abstract
classes like BaseEventLoop, right? I would like to express my idea in a
way that uses standard (asyncio) synchronization only, and will work
with any event loop implementation.

>
>>> I think a better approach would be to make the waiter a Future and
>>> signal it by setting its result. Something like this, as a rough
>>> sketch:
>>>
>>> async def waitfor(self, keys, timeout=None):
>>>     waiter = asyncio.Future()
>>>     for key in keys:
>>>         self._waiters[key].add(waiter)
>>>     handle = None
>>>     if timeout:
>>>         handle = asyncio.call_later(timeout, self._handle_timeout, waiter)
>> Will this work if I use tornado's ioloop? I'm sorry but I do not see
>> what is the difference between tornado ioloop.add_timeout and
>> asyncio.call_later. Are they fundamentally different? Or can I just use
>> asyncio.call_later and expect that it will work with any event loop
>> implementation?
> I wouldn't expect asyncio.call_later to work with a tornado event
> loop, but you should be able to use the tornado equivalent to pretty
> much the same effect.
And this is why I was confused! asyncio event loop uses asyncio.sleep,
tornado uses gen.sleep (
http://www.tornadoweb.org/en/stable/gen.html#tornado.gen.sleep ).
asyncio uses "call_later" but tornado uses "add_timeout". This is why I
had the add_timeout parameter in my AsyncBus class - tried to make it
independent of the event loop implementation. The standard way to get
the current event loop is asyncio.get_event_loop(). But then again, we
have tornado.ioloop.IOLoop.current(). And they are totally different:


ioloop = tornado.ioloop.IOLoop.current()
import asyncio
def f():
    print(asyncio.get_event_loop())
    print(ioloop)
ioloop.add_timeout(1, f)
ioloop.start()

Result is:

<_WindowsSelectorEventLoop running=False closed=False debug=False>
<tornado.platform.select.SelectIOLoop object at 0x0000000000772F60>

It is so confusing! tornado has added support for async/await in version
4.3, but they do not use the standard library, and they do not call
asyncio.set_event_loop. Maybe I could write a wrapper object around
tornado's ioloop, and call asyncio.set_event_loop(), but that is not
something a normal end user of the library should do, right?

I like Python because it is so high level, and so expressive. But I just
can't figure out how to write my AsyncBus class in a way that works with
asyncio event loops and tornado's ioloop as well.

It is very annoying that async, await, yield, Future, Condition, Lock
and almost all other async constructs work with any event loop; but
simple things like "sleep for 10 seconds" are totally incompatible. But
I guess I'll have to live with that for now.

Thanks,

   Laszlo

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.python.org/pipermail/python-list/attachments/20151016/14d29d54/attachment.html>


More information about the Python-list mailing list