Asynchronous programming

Chris Angelico rosuav at gmail.com
Thu Aug 11 13:09:15 EDT 2016


On Fri, Aug 12, 2016 at 12:55 AM, Steven D'Aprano
<steve+python at pearwood.info> wrote:
> On Thu, 11 Aug 2016 02:41 pm, Chris Angelico wrote:
>
>> Consider these three ways of doing a database transaction:
>>
>> def synchronous(id):
>>     trn = conn.begin_transaction()
>>     trn.execute("select name from people where id=%d", (id,))
>>     name, = trn.fetchone()
>>     trn.execute("update people set last_seen=now() where id=%d", (id,))
>>     trn.commit()
>>     return name
>
> That makes perfect sense. Good old fashioned synchronous programming.

Let's assume in this case that we started with:

conn = synchronous_database_connection()

>> def callbacks_1(cb, id):
>>     conn.begin_transaction(callbacks_2, cb, id)
>> def callbacks_2(trn, cb, id):
>>     trn.execute("select name from people where id=%d", (id,),
>>                 callbacks_3, cb, id)
>> def callbacks_3(trn, cb, id):
>>     trn.fetchone(callbacks_4, cb, id)
>> def callbacks_4(trn, data, cb, id):
>>     name, = data
>>     trn.execute("update people set last_seen=now() where id=%d",
>>                 (id,), callbacks_5, cb, name)
>> def callbacks_5(trn, cb, name):
>>     trn.commit(callbacks_6, cb, name)
>> def callbacks_6(trn, cb, name):
>>     cb(name)
>
> Now you're surely pulling my leg. Your conn.begin_transaction has a
> completely different signature! (No arguments in the first case, three in
> this case.)

Let's assume that this one started with:

conn = callback_database_connection()

It's doing the same job as the 'conn' in the first example, but it's a
completely different API to cater to the fact that it has to handle
callbacks. You could use this API for synchronous calls by doing
something like this:

def begin_transaction(callback, *args):
    real_conn.begin_transaction()
    callback(*args)

The asynchronous version would end up saving callback and args
somewhere, triggering the operation, and having code somewhere that
processes the response. Supposing we're talking to PostgreSQL over a
socket (TCP or Unix domain), the response handler would be triggered
any time that socket becomes readable (ie via select() on the socket),
and it would decode the response, figure out which transaction is
being responded to (if there are multiple in flight), and send the
response on its way. Most likely the transaction would have some kind
of "current in-flight query" attribute (and would reject reentrant
calls - see, any form of async programming has to cope with
reentrancy), so that's where the callback would be stored.

>> def asynchronous(id):
>>     trn = yield from conn.begin_transaction()
>>     yield from trn.execute("select name from people where id=%d", (id,))
>>     name, = yield from trn.fetchone()
>>     yield from trn.execute("update people set last_seen=now() where
>>                            id=%d", (id,))
>>     yield from trn.commit()
>>     return name
>
> That ... looks wrong. You're taking something which looks like a procedure
> in the first case (trn.execute), so it probably returns None, and yielding
> over it. Even it that's not wrong, and it actually returned something which
> you ignored in the first case, it looks like you're mixing two distinct
> ways of using generators:
>
> - Generator as iterator ("yield x" or "yield from subiterator");
>   something which *sends* values out for the purpose of iteration.
>
> - Generator as coroutine ("y = yield x"); something which *receives*
>   values from the called using the send() method.

Yeah, generators as coroutines are a bit weird. That's another good
reason for using the new async and await "keywords" (not technically
keywords yet), as it doesn't look as weird. But ultimately, it's doing
the same thing - the methods would look something like this:

def begin_transaction():
    # trigger the "begin transaction" query
    yield Awaitable("waiting for transaction...")
    # verify that the query was successful

The event loop attempts to step the "asynchronous" generator. It
yields from begin_transaction, which yields an Awaitable. The event
loop thus receives, from the generator, an object to be placed on the
queue. It's that simple.

Here's a very VERY simple, but complete, example of yield-based coroutines.

# Partially borrowed from example in Python docs:
# https://docs.python.org/3/library/selectors.html#examples
import selectors
import socket
import time

sel = selectors.DefaultSelector()
def eventloop():
    while "loop forever":
        for key, mask in sel.select():
            sel.unregister(key.fileobj)
            run_task(key.data)

def run_task(gen):
    try:
        waitfor = next(gen)
        sel.register(waitfor, selectors.EVENT_READ, gen)
    except StopIteration:
        pass

def mainsock():
    sock = socket.socket()
    sock.bind(('localhost', 1234))
    sock.listen(100)
    sock.setblocking(False)
    print("Listening on port 1234.")
    while "moar sockets":
        yield sock
        conn, addr = sock.accept()  # Should be ready
        print('accepted', conn, 'from', addr)
        conn.setblocking(False)
        run_task(client(conn))

def client(conn):
    while "moar data":
        yield conn
        data = conn.recv(1000)  # Should be ready
        if not data: break
        print("Got data")
        # At this point, you'd do something smart with the data.
        # But we don't. We just echo back.
        conn.send(data)  # Hope it won't block
        if b"quit" in data: break
    print('closing', conn)
    conn.close()

run_task(mainsock())
eventloop()


Aside from slapping a "yield sock" before accepting or reading from a
socket, it's exactly like synchronous code. Obviously a real example
would be able to yield other types of events too (most common would be
the clock, to handle an asynchronous time.sleep() equivalent), but
this is fully functional and potentially even useful.

ChrisA



More information about the Python-list mailing list