[Python-ideas] Loop manager syntax

Nick Coghlan ncoghlan at gmail.com
Wed Jul 29 07:00:00 CEST 2015


On 28 July 2015 at 23:28, Todd <toddrjen at gmail.com> wrote:
> Following the discussion of the new "async" keyword, I think it would be
> useful to provide a generic way to alter the behavior of loops.  My idea is
> to allow a user to take control over the operation of a "for" or "while"
> loop.
>
> The basic idea is similar to context managers, where an object implementing
> certain magic methods, probably "__for__" and "__while__", could be placed
> in front of a "for" or "while" statement, respectively.  This class would
> then be put in charge of carrying out the loop.  Due to the similarity to
> context managers, I am tentatively calling this a "loop manager".

Guido's original PEP 340 (what eventually became PEP 343's context
managers) is worth reading for background here:
https://www.python.org/dev/peps/pep-0340/

And then the motivation section in PEP 343 covers why he changed his
mind away from introducing a new general purpose looping construct and
proposed the simpler context management protocol instead:
https://www.python.org/dev/peps/pep-0343/

As such, rather than starting from the notion of a general purpose
loop manager, we're likely better off focusing specifically on the
parallelisation problem as Andrew suggests, and figuring out how we
might go about enabling parallel execution of the components of a
generator expression or container comprehension for at least the
following cases:

    * native coroutine (async/await)
    * concurrent.futures.ThreadPoolExecutor.map
    * concurrent.futures.ProcessPoolExecutor.map

Consider the following serial operation:

    result = sum(process(x, y, z) for x, y, z in seq)

If "process" is a time consuming function, we may want to dispatch it
to different processes in order to exploit all cores. Currently that
looks like:

    with concurrent.futures.ProcessPoolExector() as pool:
        result = sum(pool.map(process, seq))

If "process" is a blocking IO operation rather than a CPU bound one,
we may decide to save some IPC overhead, and use local threads instead
(there's no default pool size for a thread executor):

    with concurrent.futures.ThreadPoolExector(max_workers=10) as pool:
        result = sum(pool.map(process, seq))

And if we're working with natively asynchronous algorithms:

    result = sum(await asyncio.gather(process_async(x, y, z) for x, y,
z in seq))

That's what parallel dispatch of a loop with independent iterations
already looks like today, with the key requirement being that you name
the operation performed on each iteration (or use a lambda expression
in the case of concurrent.futures).

PEP 492 deliberately postponed the question of "What does an
asynchronous comprehension look like?", because it wasn't clear what
either the syntax *or* semantics should be, and as the above example
shows, it's already fairly tidy if you're working with an already
defined coroutine.

Given the current suite level spelling for the concurrent.futures
case, one could easily imagine a syntax like:

    result = sum(process(x, y, z) with pool for x, y, z in seq)

That translated to:

    def _parallel_genexp(pool, seq):
        futures = []
        with pool:
            for x, y, z in seq:
                futures.append(pool.__submit__(lambda x=x, y=y, z=z:
process(x, y, z))
           for future in futures:
                yield future.result()

    result = sum(_parallel_genexp(pool, seq))

Container comprehensions would replace the "yield future.result()"
with "expr_result.append(item)", "expr_result.add(item)" or
"expr_result[key] = value" as usual.

To avoid destroying the executor with each use, a "persistent pool"
wrapper could be added that delegated __submit__, but changed
__enter__ and __exit__ into no-ops.

Native coroutine syntax could then potentially be added using the
async keyword already introduced in PEP 492, where:

    result = sum(process(x, y, z) with async for x, y, z in seq)

May mean something like:

    async def _async_genexp(seq):
        futures = []
        async for x, y, z in seq:
            async def _iteration(x=x, y=y, z=z):
                return process(x, y, z)
            futures.append(asyncio.ensure_future(_iteration()))
        return asyncio.gather(futures)

    result = sum(await _async_genexp(seq))

Regards,
Nick.

-- 
Nick Coghlan   |   ncoghlan at gmail.com   |   Brisbane, Australia


More information about the Python-ideas mailing list