[Python-ideas] The async API of the future: yield-from

Guido van Rossum guido at python.org
Sun Oct 14 16:36:38 CEST 2012


On Fri, Oct 12, 2012 at 10:05 PM, Greg Ewing
<greg.ewing at canterbury.ac.nz> wrote:
[Long sections snipped, all very clear]
> Guido van Rossum wrote:

>> (6) Spawning off multiple async subtasks
>>
>> Futures:
>>   f1 = subtask1(args1)  # Note: no yield!!!
>>   f2 = subtask2(args2)
>>   res1, res2 = yield f1, f2
>>
>> Yield-from:
>>   ??????????
>>
>> *** Greg, can you come up with a good idiom to spell concurrency at
>> this level? Your example only has concurrency in the philosophers
>> example, but it appears to interact directly with the scheduler, and
>> the philosophers don't return values. ***
>
>
> I don't regard the need to interact directly with the scheduler
> as a problem. That's because in the world I envisage, there would
> only be *one* scheduler, for much the same reason that there can
> really only be one async event handling loop in any given program.
> It would be part of the standard library and have a well-known
> API that everyone uses.
>
> If you don't want things to be that way, then maybe this is a
> good use for yielding things to the scheduler. Yielding a generator
> could mean "spawn this as a concurrent task".
>
> You could go further and say that yielding a tuple of generators
> means to spawn them all concurrently, wait for them all to
> complete and send back a tuple of the results. The yield-from
> code would then look pretty much the same as the futures code.

Sadly it looks that

  r = yield from (f1(), f2())

ends up interpreting the tuple as the iterator, and you end up with

  r = (f1(), f2())

(i.e., a tuple of generators) rather than the desired

 r = ((yield from f1()), (yield from f2()))

> However, I'm inclined to think that this is too much functionality
> to build directly into the scheduler, and that it would be better
> provided by a class or function that builds on more primitive
> facilities.

Possibly. In NDB it is actually a very common operation which looks
quite elegant. But your solution below is fine (and helps by giving
people a specific entry in the documentation they can look up!)

> So it would look something like
>
> Yield-from:
>    task1 = subtask1(args1)
>    task2 = subtask2(args2)
>    res1, res2 = yield from par(task1, task2)
>
> where the implementation of par() is left as an exercise for
> the reader.

So, can par() be as simple as

def par(*args):
  results = []
  for task in args:
    result = yield from task
    results.append(result)
  return results

???

Or does it need to interact with the scheduler to ensure fairness?
(Not having built one of these, my intuition for how the primitives
fit together is still lacking, so excuse me for asking naive
questions.)

Of course there's the question of what to do when one of the tasks
raises an error -- I haven't quite figured that out in NDB either, it
runs all the tasks to completion but the caller only sees the first
exception. I briefly considered having an "multi-exception" but it
felt too weird -- though I'm not married to that decision.

>> (7) Checking whether an operation is already complete
>>
>> Futures:
>>   if f.done(): ...
>
>
> I'm inclined to think that this is not something the
> scheduler needs to be directly concerned with. If it's
> important for one task to know when another task is completed,
> it's up to those tasks to agree on a way of communicating
> that information between them.
>
> Although... is there a way to non-destructively test whether
> a generator is exhausted? If so, this could easily be provided
> as a scheduler primitive.

Nick answered this affirmatively.

>> (8) Getting the result of an operation multiple times
>>
>> Futures:
>>
>>   f = async_op(args)
>>   # squirrel away a reference to f somewhere else
>>   r = yield f
>>   # ... later, elsewhere
>>   r = f.result()
>
>
> Is this really a big deal? What's wrong with having to store
> the return value away somewhere if you want to use it
> multiple times?

I suppose that's okay.

>> (9) Canceling an operation
>>
>> Futures:
>>   f.cancel()
>
>
> This would be another scheduler primitive.
>
> Yield-from:
>    cancel(task)
>
> This would remove the task from the ready list or whatever
> queue it's blocked on, and probably throw an exception into
> it to give it a chance to clean up.

Ah, of course. (I said I was asking newbie questions. Consider me your
first newbie!)

>> (10) Registering additional callbacks
>>
>> Futures:
>>   f.add_done_callback(callback)
>
>
> Another candidate for a higher-level facility, I think.
> The API might look something like
>
> Yield-from:
>    cbt = task_with_callbacks(task)
>    cbt.add_callback(callback)
>    yield from cbt.run()
>
> I may have a go at coming up with implementations for some of
> these things and send them in later posts.

Or better, add them to the tutorial. (Or an advanced tutorial, "common
async patterns". That would actually be a useful collection of use
cases for whatever we end up building.)

Here's another pattern that I can't quite figure out. It started when
Ben Darnell posted a link to Tornado's chat demo
(https://github.com/facebook/tornado/blob/master/demos/chat/chatdemo.py).
I didn't understand it and asked him offline what it meant.
Essentially, it's a barrier pattern where multiple tasks (each
representing a different HTTP request, and thus not all starting at
the same time) render a partial web page and then block until a new
HTTP request comes in that provides the missing info. (For technical
reasons they only do this once, and then the browsers re-fetch the
URL.) When the missing info is available, it must wake up all blocked
task and give then the new info.

I wrote a Futures-based version of this -- not the whole thing, but
the block-until-more-info-and-wakeup part. Here it is (read 'info' for
'messages'):

Each waiter executes this code when it is ready to block:

f = Future()  # Explicitly create a future!
waiters.add(f)
messages = yield f
<process messages and quit>

I'd write a helper for the first two lines:

def register():
  f = Future()
  waiters.add(f)
  return f

Then the waiter's code becomes:

messages = yield register()
<process messages and quit>

When new messages become available, the code just sends the same
results to all those Futures:

def wakeup(messages):
  for waiter in waiters:
    waiter.set_result(messages)
  waiters.clear()

(OO sauce left to the reader. :-)

If you wonder where the code is that hooks up the waiter.set_result()
call with the yield, that's done by the scheduler: when a task yields
a Future, it adds a callback to the Future that reschedules the task
when the Future's result is set.

Edge cases:

- Were the waiter to lose interest, it could remove its Future from
the list of waiters, but no harm is done leaving it around either.
(NDB doesn't have this feature, but if you have a way to remove
callbacks, setting the result of a Future that nobody cares about has
no ill effect. You could even use a weak set...)

- It's possible to broadcast an exception to all waiters by using
waiter.set_exception().

-- 
--Guido van Rossum (python.org/~guido)



More information about the Python-ideas mailing list