Adding a Par construct to Python?

jeremy at martinfamily.freeserve.co.uk jeremy at martinfamily.freeserve.co.uk
Tue May 19 14:15:52 EDT 2009


On 19 May, 10:24, Steven D'Aprano
<ste... at REMOVE.THIS.cybersource.com.au> wrote:
> On Mon, 18 May 2009 02:27:06 -0700, jeremy wrote:
> > Let me clarify what I think par, pmap, pfilter and preduce would mean
> > and how they would be implemented.
>
> [...]
>
> Just for fun, I've implemented a parallel-map function, and done a couple
> of tests. Comments, criticism and improvements welcome!
>
> import threading
> import Queue
> import random
> import time
>
> def f(arg):  # Simulate a slow function.
>     time.sleep(0.5)
>     return 3*arg-2
>
> class PMapThread(threading.Thread):
>     def __init__(self, clients):
>         super(PMapThread, self).__init__()
>         self._clients = clients
>     def start(self):
>         super(PMapThread, self).start()
>     def run(self):
>         while True:
>             try:
>                 data = self._clients.get_nowait()
>             except Queue.Empty:
>                 break
>             target, where, func, arg = data
>             result = func(arg)
>             target[where] = result
>
> class VerbosePMapThread(threading.Thread):
>     def __init__(self, clients):
>         super(VerbosePMapThread, self).__init__()
>         print "Thread %s created at %s" % (self.getName(), time.ctime())
>     def start(self):
>         super(VerbosePMapThread, self).start()
>         print "Thread %s starting at %s" % (self.getName(), time.ctime())
>     def run(self):
>         super(VerbosePMapThread, self).run()
>         print "Thread %s finished at %s" % (self.getName(), time.ctime())
>
> def pmap(func, seq, verbose=False, numthreads=4):
>     size = len(seq)
>     results = [None]*size
>     if verbose:
>         print "Initiating threads"
>         thread = VerbosePMapThread
>     else:
>         thread = PMapThread
>     datapool = Queue.Queue(size)
>     for i in xrange(size):
>         datapool.put( (results, i, f, seq[i]) )
>     threads = [PMapThread(datapool) for i in xrange(numthreads)]
>     if verbose:
>         print "All threads created."
>     for t in threads:
>         t.start()
>     # Block until all threads are done.
>     while any([t.isAlive() for t in threads]):
>         if verbose:
>             time.sleep(0.25)
>             print results
>     return results
>
> And here's the timing results:
>
> >>> from timeit import Timer
> >>> setup = "from __main__ import pmap, f; data = range(50)"
> >>> min(Timer('map(f, data)', setup).repeat(repeat=5, number=3))
> 74.999755859375
> >>> min(Timer('pmap(f, data)', setup).repeat(repeat=5, number=3))
>
> 20.490942001342773
>
> --
> Steven

Hi Steven,

I am impressed by this - it shows the potential speedup that pmap
could give. Although the GIL would be a problem as things for speed up
of pure Python code. Do Jython and Iron Python include the threading
module?

Jeremy



More information about the Python-list mailing list