How to implement an async message bus

Nagy László Zsolt gandalf at shopzeus.com
Thu Oct 15 09:16:54 EDT 2015


>     async def waitfor(self, keys, timeout=None):
>         """Wait for keys.
>
>                 :arg keys: An iterable of immutable keys.
>                 :arg timeout: Raise TimeoutError if nothing hits the
> bus for this amount of time.
>                     None means: wait indefinitely. It should be a
> datetime.timedelta object.
>         """
>         # Register for keys
>         if not keys:
>             raise Exception("Need some keys to wait for...")
>
>         waiter = ?????????????????????????????
>
>         for key in keys:
>             if key in self._waiters:
>                 self._waiters[key].append(waiter)
>             else:
>                 self._waiters[key] = [waiter]
>         try:
>             # Add timeout and wake me up if nothing comes in.
>             if timeout:
>                 self._add_timeout(timeout,
> functools.partial(waiter.throw, BusTimeoutError))
>             return await waiter
>         finally:
>             for key in keys:
>                 if key in self._waiters:
>                     self._waiters[key].remove(waiter)

The waiter should be an awaitable that suspends execution and is resumed
when waiter.throw() or waiter.send() is called explicitely.

Here are the options for awaitable objects
(https://www.python.org/dev/peps/pep-0492/#await-expression):

  * A native coroutine object - of course this is ouf of the question
    because I'm programming in pure python.
  * Objects defined with CPython C API with a tp_as_async.am_await
    function - out of the question for the same reason
  * An object with an __await__ method returning an interator - does not
    help me, because it leaves me with a missing iterator implementation
    with the same problem in its next().
  * A generator based object returned from a function decorated by
    types.coroutine() - this might work


I have tried this:

@types.coroutinedef _create_waiter():_key, _value = yield# Suspend
execution of this coroutine and resume when send() or throw() is called
on it. return _key, _valuewaiter = _create_waiter()

However, I got this exception:

File "C:/Python/Projects/_test/main.py", line 104, in post
username = await login_notifier.waitfor(self.sid,
timeout=datetime.timedelta(seconds=10))
File "C:\Python\Projects\_test\asyncbus.py", line 63, in waitfor
return await waiter
File "C:\Python\Projects\_test\asyncbus.py", line 50, in _create_waiter
_key, _value = yield
File
"C:\Python35\lib\site-packages\tornado-4.3.dev1-py3.5-win-amd64.egg\tornado\gen.py",
line 999, in run
value = future.result()
File
"C:\Python35\lib\site-packages\tornado-4.3.dev1-py3.5-win-amd64.egg\tornado\concurrent.py",
line 232, in result
raise_exc_info(self._exc_info)
File "<string>", line 3, in raise_exc_info
File
"C:\Python35\lib\site-packages\tornado-4.3.dev1-py3.5-win-amd64.egg\tornado\gen.py",
line 1081, in handle_yield
self.future = convert_yielded(yielded)
File "C:\Python35\lib\functools.py", line 743, in wrapper
return dispatch(args[0].__class__)(*args, **kw)
File
"C:\Python35\lib\site-packages\tornado-4.3.dev1-py3.5-win-amd64.egg\tornado\gen.py",
line 1213, in convert_yielded
raise BadYieldError("yielded unknown object %r" % (yielded,))
tornado.gen.BadYieldError: yielded unknown object None

Apparently, the following statement:

_key, _value = yield

yielded None to the ioloop event loop runner, and it wanted to something
with it. But why?


-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.python.org/pipermail/python-list/attachments/20151015/a8c69e84/attachment.html>


More information about the Python-list mailing list