Idle wait for outstanding threads?

Alex Martelli aleax at aleax.it
Thu Jan 31 08:15:08 EST 2002


"Alan Kennedy" <alanmk at hotmail.com> wrote in message
news:f72dac0d.0201310334.1a442342 at posting.google.com...
    ...
> Really what I need is a threading equivalent of the "select"
> call for file descriptors. "select" allows me to block on multiple
> file descriptors simultaneously, and return when something arrives on
> any one of them. I need something equivalent for threads, that allows
> me to block on multiple 'thread.lock's or multiple
> 'threading.Condition's at the same time. Also, something that had a
> timeout as well, just like the "select" call, would be ideal.

I see several other have already recommended Python standard module Queue,
and I heartily second the recommendation.  Many useful multithreaded
architectures can be put together most simply with Queue as the only
synchronization mechanism.

See http://www.python.org/doc/current/lib/module-Queue.html for all
details.  While Queue doesn't directly support timeouts, should you
need them you could synthesize them, for Queue.get, via an auxiliary
threading.Timer object posting a "timedout" 'event', or via a utility
thread periodically posting "tick" 'events' to all relevant Queues,
subclassing Queue.Queue and overriding method get to check for timeouts
(if you don't need very fine granularity on timeouts, the resulting
"polling" overhead shouldn't be too terrible).

But for your problem as you outline it elsewhere, timeouts don't
seem strictly necessary.  You could have 4 dedicated working threads
each waiting on a separate Queue instance; when a work-request
arrives on the working thread's Queue, the thread does the work,
posts the result-description to a single results-Queue (common to
all working threads), and loops back to waiting.  The main thread,
when a request arrives, slices it up into work-requests, posts each
to the appropriate working thread's Queue, then goes pluck the N
expected result-descriptions from the single results-Queue, collates
overall results, and responds to the request.

You probably don't want to incur the overhead of starting and
terminating N working threads per request; the above-sketched
"thread-pool" approach is generally more effective.  You can
easily have n1 separate working threads waiting on queue 1, each
ready for the same kind of work-request, &c for each i-th queue,
if several overall-requests may need to undergo processing at
the same time.  Or you could pool more, having undifferentiated
working-threads and using the function to be executed (or a way
to retrieve it) as part of the work-request.

E.g. (untested code, hopefully helpful/suggestive):

import threading, Queue

class RequestOrchestrator:
    # working functions, must be overridden in subclasses
    def fee(self, *args): raise NotImplementedError
    def fie(self, *args): raise NotImplementedError
    def foo(self, *args): raise NotImplementedError
    def fum(self, *args): raise NotImplementedError

    def __init__(self, N=0):
        # default/minimum number of working threads in pool
        if N<4: N=4

        # prepare queues
        self.resultsQueue = Queue.Queue()
        self.workRequestsQueue = Queue.Queue()

        # prepare and start N working threads
        for i in range(N):
            t = threading.Thread(target=self._waitForWork)
            t.setDaemon(1)
            t.start()

    class WorkRequest:
        # group arbitrary function call, plus arbitrary tag
        def __init__(self, tag, function=None, args=()):
            self.tag = tag
            self.function = function
            self.args = args

    def _waitForWork(self):
        while 1:
            # get work request, perform work, post result
            workRequest = self.workRequestsQueue.get()
            result = workRequest.function(*workRequest.args)
            self.resultsQueue.put((workRequest.tag, result))

    def satisfyRequest(self, requestString):
        numSlices = len(requestString)

        # prepare and post the work requests
        for i in range(numSlices):
            req = self.WorkRequest(i);
            c = requestString[i]
            if c=='e': req.function = self.fee
            elif c=='i': req.function = self.fie
            elif c=='o': req.function = self.foo
            else:
                req.function = self.fum
                req.args = (c,)
            self.workRequestsQueue.put(req)

        # collect/collate/return results
        slices = [None] * numSlices
        for i in range(numSlices):
            index, result = self.resultsQueue.get()
            slices[index] = str(result)
        return ''.join(slices)


Couldn't some architecture patterned on this example satisfy
your needs?


Alex






More information about the Python-list mailing list