RFC: my iterthreader module

Justin Azoff justin.azoff at gmail.com
Mon Jul 17 23:15:34 EDT 2006


I have this iterthreader module that I've been working on for a while
now.  It is similar to itertools.imap, but it calls each function in
its own thread and uses Queues for moving the data around.  A better
name for it would probably be ithreadmap, but anyway...

The short explanation of it is if you have a loop like
for item in biglist:
    print "The value for %s is %s" % (item, slowfunc(item))
or
for item,val in ((item, slowfunc(item)) for item in biglist):
    print "The value for %s is %s" % (item, val)

you can simply rewrite it as

for item,val in iterthreader.Threader(slowfunc, biglist):
    print "The value for %s is %s" % (item, val)

and it will hopefully run faster.  The usual GIL issues still apply of
course....  You can also subclass it in various ways, but I almost
always just call it in the above manner.

So, can anyone find any obvious problems with it?  I've been meaning to
re-post [1]  it to the python cookbook, but I'd like to hear what
others think first.  I'm not aware of any other module that makes this
particular use of threading this simple.

[1] I _think_ I posted it before, but that may have just been in a
comment

import threading
import Queue

class Threader:
    def __init__(self, func=None, data=None, numthreads=2):
        if not numthreads > 0:
            raise AssertionError("numthreads should be greater than 0")

        if func:
            self.handle_input=func
        if data:
            self.get_input = lambda : data

        self._numthreads=numthreads
        self.threads = []
        self.run()


    def __iter__(self):
        return self

    def next(self):
        still_running, input, output = self.DQ.get()
        if not still_running:
            raise StopIteration
        return input, output

    def get_input(self):
        raise NotImplementedError, "You must implement get_input as a
function that returns an iterable"

    def handle_input(self, input):
        raise NotImplementedError, "You must implement handle_input as
a function that returns anything"

    def _handle_input(self):
        while 1:
            work_todo, input = self.Q.get()
            if not work_todo:
                break
            self.DQ.put((True, input, self.handle_input(input)))

    def cleanup(self):
        """wait for all threads to stop and tell the main iter to
stop"""
        for t in self.threads:
            t.join()
        self.DQ.put((False,None,None))


    def run(self):
        self.Q=Queue.Queue()
        self.DQ=Queue.Queue()
        for x in range(self._numthreads):
            t=threading.Thread(target=self._handle_input)
            t.start()
            self.threads.append(t)

        try :
            for x in self.get_input():
                self.Q.put((True, x))
        except NotImplementedError, e:
            print e
        for x in range(self._numthreads):
            self.Q.put((False, None))

        threading.Thread(target=self.cleanup).start()


-- 
- Justin




More information about the Python-list mailing list