[stdlib-sig] futures - a new package for asynchronous execution

Brett Cannon brett at python.org
Fri Jan 15 22:19:12 CET 2010


On Fri, Jan 15, 2010 at 02:50, Anh Hai Trinh <anh.hai.trinh at gmail.com> wrote:
> Hello all,
>
> I'd like to point out an alternative module with respect to
> asynchronous computation: `stream` (which I wrote) supports
> ThreadPool, ProcessPool and Executor with a simpler API and
> implementation.
>
> My module takes a list-processing oriented view in which a
> ThreadPool/ProcessPool is simply a way of working with each stream
> element concurrently and output results possibly in out of order.
>
> A trivial example is:
>
>  from stream import map
>  range(10) >> ThreadPool(map(lambda x: x*x)) >> sum
>  # returns 285

I have not looked at the code at all, but the overloading of binary
shift is not going to be viewed as a good thing. I realize there is an
analogy to C++ streams, but typically Python's stdlib frowns upon
overloading operators like this beyond what a newbie would think an
operator is meant to do.

-Brett

>
>
> The URLs retrieving example is:
>
>  import urllib2
>  from stream import ThreadPool
>
>  URLs = [
>     'http://www.cnn.com/',
>     'http://www.bbc.co.uk/',
>     'http://www.economist.com/',
>     'http://nonexistant.website.at.baddomain/',
>     'http://slashdot.org/',
>     'http://reddit.com/',
>     'http://news.ycombinator.com/',
>  ]
>
>  def retrieve(urls, timeout=10):
>     for url in urls:
>        yield url, urllib2.urlopen(url, timeout=timeout).read()
>
>  if __name__ == '__main__':
>     retrieved = URLs >> ThreadPool(retrieve, poolsize=len(URLs))
>     for url, content in retrieved:
>        print '%r is %d bytes' % (url, len(content))
>     for url, exception in retrieved.failure:
>        print '%r failed: %s' % (url, exception)
>
>
> Note that the main argument to ThreadPool is an iterator-processing
> function: one that takes an iterator and returns an iterator. A
> ThreadPool/Process simply distributes the input to workers running
> such function and gathers their output as a single stream.
>
> One important different between `stream` and `futures` is the order of
> returned results.  The pool object itself is an iterable and the
> returned iterator's `next()` calls unblocks as soon as there is an
> output value.  The order of output is the order of job completion,
> whereas for `futures.run_to_results()`, the order of the returned
> iterator is based on the submitted FutureList --- this means if the
> first item takes a long time to complete, subsequent processing of the
> output can not benefit from other results already available.
>
> The other difference is that there is no absolutely no abstraction but
> two bare iterables for client code to deal with: one iterable over the
> results, and one iterable over the failure; both are thread-safe.
>
> If delicate job control is necessary, an Executor can be used. It is
> implemented on top of the pool, and offers submit(*items) which
> returns job ids to be used for cancel() and status().  Jobs can be
> submitted and canceled concurrently.
>
> The documentation is available at <http://www.trinhhaianh.com/stream.py>.
>
> The code repository is located at <http://github.com/aht/stream.py>.
> The implementation of ThreadPool, ProcessPool and Executor is little
> more than 300 lines of code.
>
>
> Peace,
>
> --
> // aht
> http://blog.onideas.ws
> _______________________________________________
> stdlib-sig mailing list
> stdlib-sig at python.org
> http://mail.python.org/mailman/listinfo/stdlib-sig
>


More information about the stdlib-sig mailing list