How to implement an async message bus
Nagy László Zsolt
gandalf at shopzeus.com
Thu Oct 15 09:16:54 EDT 2015
> async def waitfor(self, keys, timeout=None):
> """Wait for keys.
>
> :arg keys: An iterable of immutable keys.
> :arg timeout: Raise TimeoutError if nothing hits the
> bus for this amount of time.
> None means: wait indefinitely. It should be a
> datetime.timedelta object.
> """
> # Register for keys
> if not keys:
> raise Exception("Need some keys to wait for...")
>
> waiter = ?????????????????????????????
>
> for key in keys:
> if key in self._waiters:
> self._waiters[key].append(waiter)
> else:
> self._waiters[key] = [waiter]
> try:
> # Add timeout and wake me up if nothing comes in.
> if timeout:
> self._add_timeout(timeout,
> functools.partial(waiter.throw, BusTimeoutError))
> return await waiter
> finally:
> for key in keys:
> if key in self._waiters:
> self._waiters[key].remove(waiter)
The waiter should be an awaitable that suspends execution and is resumed
when waiter.throw() or waiter.send() is called explicitely.
Here are the options for awaitable objects
(https://www.python.org/dev/peps/pep-0492/#await-expression):
* A native coroutine object - of course this is ouf of the question
because I'm programming in pure python.
* Objects defined with CPython C API with a tp_as_async.am_await
function - out of the question for the same reason
* An object with an __await__ method returning an interator - does not
help me, because it leaves me with a missing iterator implementation
with the same problem in its next().
* A generator based object returned from a function decorated by
types.coroutine() - this might work
I have tried this:
@types.coroutinedef _create_waiter():_key, _value = yield# Suspend
execution of this coroutine and resume when send() or throw() is called
on it. return _key, _valuewaiter = _create_waiter()
However, I got this exception:
File "C:/Python/Projects/_test/main.py", line 104, in post
username = await login_notifier.waitfor(self.sid,
timeout=datetime.timedelta(seconds=10))
File "C:\Python\Projects\_test\asyncbus.py", line 63, in waitfor
return await waiter
File "C:\Python\Projects\_test\asyncbus.py", line 50, in _create_waiter
_key, _value = yield
File
"C:\Python35\lib\site-packages\tornado-4.3.dev1-py3.5-win-amd64.egg\tornado\gen.py",
line 999, in run
value = future.result()
File
"C:\Python35\lib\site-packages\tornado-4.3.dev1-py3.5-win-amd64.egg\tornado\concurrent.py",
line 232, in result
raise_exc_info(self._exc_info)
File "<string>", line 3, in raise_exc_info
File
"C:\Python35\lib\site-packages\tornado-4.3.dev1-py3.5-win-amd64.egg\tornado\gen.py",
line 1081, in handle_yield
self.future = convert_yielded(yielded)
File "C:\Python35\lib\functools.py", line 743, in wrapper
return dispatch(args[0].__class__)(*args, **kw)
File
"C:\Python35\lib\site-packages\tornado-4.3.dev1-py3.5-win-amd64.egg\tornado\gen.py",
line 1213, in convert_yielded
raise BadYieldError("yielded unknown object %r" % (yielded,))
tornado.gen.BadYieldError: yielded unknown object None
Apparently, the following statement:
_key, _value = yield
yielded None to the ioloop event loop runner, and it wanted to something
with it. But why?
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.python.org/pipermail/python-list/attachments/20151015/a8c69e84/attachment.html>
More information about the Python-list
mailing list