Adding a Par construct to Python?

Iain King iainking at gmail.com
Wed May 20 09:01:31 EDT 2009


On May 19, 10:24 am, Steven D'Aprano
<ste... at REMOVE.THIS.cybersource.com.au> wrote:
> On Mon, 18 May 2009 02:27:06 -0700, jeremy wrote:
> > Let me clarify what I think par, pmap, pfilter and preduce would mean
> > and how they would be implemented.
>
> [...]
>
> Just for fun, I've implemented a parallel-map function, and done a couple
> of tests. Comments, criticism and improvements welcome!
>
> import threading
> import Queue
> import random
> import time
>
> def f(arg):  # Simulate a slow function.
>     time.sleep(0.5)
>     return 3*arg-2
>
> class PMapThread(threading.Thread):
>     def __init__(self, clients):
>         super(PMapThread, self).__init__()
>         self._clients = clients
>     def start(self):
>         super(PMapThread, self).start()
>     def run(self):
>         while True:
>             try:
>                 data = self._clients.get_nowait()
>             except Queue.Empty:
>                 break
>             target, where, func, arg = data
>             result = func(arg)
>             target[where] = result
>
> class VerbosePMapThread(threading.Thread):
>     def __init__(self, clients):
>         super(VerbosePMapThread, self).__init__()
>         print "Thread %s created at %s" % (self.getName(), time.ctime())
>     def start(self):
>         super(VerbosePMapThread, self).start()
>         print "Thread %s starting at %s" % (self.getName(), time.ctime())
>     def run(self):
>         super(VerbosePMapThread, self).run()
>         print "Thread %s finished at %s" % (self.getName(), time.ctime())
>
> def pmap(func, seq, verbose=False, numthreads=4):
>     size = len(seq)
>     results = [None]*size
>     if verbose:
>         print "Initiating threads"
>         thread = VerbosePMapThread
>     else:
>         thread = PMapThread
>     datapool = Queue.Queue(size)
>     for i in xrange(size):
>         datapool.put( (results, i, f, seq[i]) )
>     threads = [PMapThread(datapool) for i in xrange(numthreads)]
>     if verbose:
>         print "All threads created."
>     for t in threads:
>         t.start()
>     # Block until all threads are done.
>     while any([t.isAlive() for t in threads]):
>         if verbose:
>             time.sleep(0.25)
>             print results
>     return results
>
> And here's the timing results:
>
> >>> from timeit import Timer
> >>> setup = "from __main__ import pmap, f; data = range(50)"
> >>> min(Timer('map(f, data)', setup).repeat(repeat=5, number=3))
> 74.999755859375
> >>> min(Timer('pmap(f, data)', setup).repeat(repeat=5, number=3))
>
> 20.490942001342773
>
> --
> Steven

I was going to write something like this, but you've beat me to it :)
Slightly different though; rather than have pmap collate everything
together then return it, have it yield results as and when it gets
them and stop iteration when it's done, and rename it to par to keep
the OP happy and you should get something like what he initially
requests (I think):

total = 0
for score in par(f, data):
    total += score


Iain



More information about the Python-list mailing list