Real-world use of concurrent.futures

Ian Kelly ian.g.kelly at gmail.com
Thu May 8 16:44:28 EDT 2014


On May 8, 2014 12:57 PM, "Andrew McLean" <lists at andros.org.uk> wrote:
> So far so good. However, I thought this would be an opportunity to
> explore concurrent.futures and to see whether it offered any benefits
> over the more explicit approach discussed above. The problem I am having
> is that all the discussions I can find of the use of concurrent.futures
> show use with toy problems involving just a few tasks. The url
> downloader in the documentation is typical, it proceeds as follows:
>
> 1. Get an instance of concurrent.futuresThreadPoolExecutor
> 2. Submit a few tasks to the executer
> 3. Iterate over the results using concurrent.futures.as_completed
>
> That's fine, but I suspect that isn't a helpful pattern if I have a very
> large number of tasks. In my case I could run out of memory if I tried
> submitting all of the tasks to the executor before processing any of the
> results.

I thought that ThreadPoolExecutor.map would handle this transparently if
you passed it a lazy iterable such as a generator.  From my testing though,
that seems not to be the case; with a generator of 100000 items and a pool
of 2 workers, the entire generator was consumed before any results were
returned.

> I'm guessing what I want to do is, submit tasks in batches of perhaps a
> few hundred, iterate over the results until most are complete, then
> submit some more tasks and so on. I'm struggling to see how to do this
> elegantly without a lot of messy code just there to do "bookkeeping".
> This can't be an uncommon scenario. Am I missing something, or is this
> just not a job suitable for futures?

I don't think it needs to be "messy". Something like this should do the
trick, I think:

from concurrent.futures import *
from itertools import islice

def batched_pool_runner(f, iterable, pool, batch_size):
  it = iter(iterable)
  # Submit the first batch of tasks.
  futures = set(pool.submit(f, x) for x in islice(it, batch_size))
  while futures:
    done, futures = wait(futures, return_when=FIRST_COMPLETED)
    # Replenish submitted tasks up to the number that completed.
    futures.update(pool.submit(f, x) for x in islice(it, len(done)))
    yield from done
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.python.org/pipermail/python-list/attachments/20140508/87bc5d5f/attachment.html>


More information about the Python-list mailing list