asyncio with map&reduce flavor and without flooding the event loop

Maxime Steisel maximesteisel at gmail.com
Wed Aug 6 13:03:38 EDT 2014


2014-08-03 16:01 GMT+02:00 Valery Khamenya <khamenya at gmail.com>:
> Hi all
>
> [snip]
>
> Consider a task like crawling the web starting from some web-sites. Each
> site leads to generation of new downloading tasks in exponential(!)
> progression. However we don't want neither to flood the event loop nor to
> overload our network. We'd like to control the task flow. This is what I
> achieve well with modification of nice Maxime's solution proposed here:
> https://mail.python.org/pipermail/python-list/2014-July/675048.html
>
> Well, but I'd need as well a very natural thing, kind of map() & reduce() or
> functools.reduce() if we are on python3 already. That is, I'd need to call a
> "summarizing" function for all the downloading tasks completed on links from
> a page. This is where i fail :(

Hi Valery,

With the modified as_completed, you can write map and reduce
primitives quite naturally.

It could look like that:

========

def async_map(corofunc, *iterables):
    """
    Equivalent to map(corofunc, *iterables) except that
    corofunc must be a coroutine function and is executed asynchronously.

    This is not a coroutine, just a normal generator yielding Task instances.
    """
    for args in zip(*iterables):
        yield asyncio.async(corofunc(*args))

@asyncio.coroutine
def async_reduce(corofunc, futures, initial=0):
    """
    Equivalent to functools.reduce(corofunc, [f.result() for f in
futures]) except that
    corofunc must be a coroutine function and future results can be
evaluated out-of order.

    This function is a coroutine.
    """
    result = initial
    for f in as_completed(futures, max_workers=50):
        new_value = (yield from f)
        result = (yield from corofunc(result, new_value))
    return result

=======

Best,

Maxime



More information about the Python-list mailing list