Thread processing

Tim Evans t.evans at paradise.net.nz
Thu Mar 6 04:56:44 EST 2003


Paul Moore <gustav at morpheus.demon.co.uk> writes:
[snip]
> Look at this another way, then. Are there any examples of thread pool
> frameworks around which I could use as a starting point? I have
> searched round and found nothing much (Aahz has some specific examples
> on his site, but nothing generic, the cookbook doesn't have anything,
> where else could I look?)

Well, I was bored, so here's one I just threw together.  You can
subclass 'Task' the same as you would threading.Thread to store extra
state, or just pass a function and arguments as in the example.

-----------------------------------------------------------------

#!/usr/bin/python2.2

from __future__ import generators

import threading, Queue
import sys, traceback


class Task(object):
    def __init__(self, func=None, *args, **kw):
        self.__target = (func, args, kw)
        self.result = None
        self.exception = None

    def _run(self, finished):
        try:
            self.result = self.run()
        except:
            traceback.print_exc()
            self.exception = sys.exc_info()[0]
        finished.put(self)

    def run(self):
        func, args, kw = self.__target
        if func is None:
            return None
        else:
            return func(*args, **kw)


class ParallelTasks(object):
    def __init__(self):
        self._count = 0
        self._finished = Queue.Queue()

    def add(self, task):
        t = threading.Thread(target=task._run, args=(self._finished,))
        self._count += 1
        t.start()

    def get_one(self):
        r = self._finished.get()
        self._count -= 1
        return r

    def get_all(self):
        while self._count:
            yield self.get_one()


def _test():
    import time

    def wait_and_return(t, r):
        if t <= 0:
            raise ValueError("'t' must be greater than zero")
        time.sleep(t)
        return r

    pt = ParallelTasks()
    pt.add(Task(wait_and_return, 3, '3 seconds'))
    pt.add(Task(wait_and_return, 2, '2 seconds'))
    pt.add(Task(wait_and_return, -1, '-1 seconds'))

    for task in pt.get_all():
        if task.exception:
            print 'result: task failed'
        else:
            print 'result:', task.result

    print 'all tasks complete'


if __name__ == '__main__':
    _test()

-----------------------------------------------------------------

-- 
Tim Evans




More information about the Python-list mailing list