Real-world use of concurrent.futures

Andrew McLean andrew at andros.org.uk
Tue May 13 16:09:28 EDT 2014


On 08/05/2014 21:44, Ian Kelly wrote:
> On May 8, 2014 12:57 PM, "Andrew McLean" <lists at andros.org.uk
> <mailto:lists at andros.org.uk>> wrote:
> > So far so good. However, I thought this would be an opportunity to
> > explore concurrent.futures and to see whether it offered any benefits
> > over the more explicit approach discussed above. The problem I am having
> > is that all the discussions I can find of the use of concurrent.futures
> > show use with toy problems involving just a few tasks. The url
> > downloader in the documentation is typical, it proceeds as follows:
> >
> > 1. Get an instance of concurrent.futuresThreadPoolExecutor
> > 2. Submit a few tasks to the executer
> > 3. Iterate over the results using concurrent.futures.as_completed
> >
> > That's fine, but I suspect that isn't a helpful pattern if I have a very
> > large number of tasks. In my case I could run out of memory if I tried
> > submitting all of the tasks to the executor before processing any of the
> > results.
>
> I thought that ThreadPoolExecutor.map would handle this transparently
> if you passed it a lazy iterable such as a generator.  From my testing
> though, that seems not to be the case; with a generator of 100000
> items and a pool of 2 workers, the entire generator was consumed
> before any results were returned.
>
> > I'm guessing what I want to do is, submit tasks in batches of perhaps a
> > few hundred, iterate over the results until most are complete, then
> > submit some more tasks and so on. I'm struggling to see how to do this
> > elegantly without a lot of messy code just there to do "bookkeeping".
> > This can't be an uncommon scenario. Am I missing something, or is this
> > just not a job suitable for futures?
>
> I don't think it needs to be "messy". Something like this should do
> the trick, I think:
>
> from concurrent.futures import *
> from itertools import islice
>
> def batched_pool_runner(f, iterable, pool, batch_size):
>   it = iter(iterable)
>   # Submit the first batch of tasks.
>   futures = set(pool.submit(f, x) for x in islice(it, batch_size))
>   while futures:
>     done, futures = wait(futures, return_when=FIRST_COMPLETED)
>     # Replenish submitted tasks up to the number that completed.
>     futures.update(pool.submit(f, x) for x in islice(it, len(done)))
>     yield from done

That worked very nicely, thank you.  I think that would make a good
recipe, whether for the documentation or elsewhere. I suspect I'm not
the only person that would benefit from something to bridge the gap
between a toy example and something practical.

Andrew



-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.python.org/pipermail/python-list/attachments/20140513/09e0e71a/attachment.html>


More information about the Python-list mailing list