Idle wait for outstanding threads?
Alex Martelli
aleax at aleax.it
Thu Jan 31 08:15:08 EST 2002
"Alan Kennedy" <alanmk at hotmail.com> wrote in message
news:f72dac0d.0201310334.1a442342 at posting.google.com...
...
> Really what I need is a threading equivalent of the "select"
> call for file descriptors. "select" allows me to block on multiple
> file descriptors simultaneously, and return when something arrives on
> any one of them. I need something equivalent for threads, that allows
> me to block on multiple 'thread.lock's or multiple
> 'threading.Condition's at the same time. Also, something that had a
> timeout as well, just like the "select" call, would be ideal.
I see several other have already recommended Python standard module Queue,
and I heartily second the recommendation. Many useful multithreaded
architectures can be put together most simply with Queue as the only
synchronization mechanism.
See http://www.python.org/doc/current/lib/module-Queue.html for all
details. While Queue doesn't directly support timeouts, should you
need them you could synthesize them, for Queue.get, via an auxiliary
threading.Timer object posting a "timedout" 'event', or via a utility
thread periodically posting "tick" 'events' to all relevant Queues,
subclassing Queue.Queue and overriding method get to check for timeouts
(if you don't need very fine granularity on timeouts, the resulting
"polling" overhead shouldn't be too terrible).
But for your problem as you outline it elsewhere, timeouts don't
seem strictly necessary. You could have 4 dedicated working threads
each waiting on a separate Queue instance; when a work-request
arrives on the working thread's Queue, the thread does the work,
posts the result-description to a single results-Queue (common to
all working threads), and loops back to waiting. The main thread,
when a request arrives, slices it up into work-requests, posts each
to the appropriate working thread's Queue, then goes pluck the N
expected result-descriptions from the single results-Queue, collates
overall results, and responds to the request.
You probably don't want to incur the overhead of starting and
terminating N working threads per request; the above-sketched
"thread-pool" approach is generally more effective. You can
easily have n1 separate working threads waiting on queue 1, each
ready for the same kind of work-request, &c for each i-th queue,
if several overall-requests may need to undergo processing at
the same time. Or you could pool more, having undifferentiated
working-threads and using the function to be executed (or a way
to retrieve it) as part of the work-request.
E.g. (untested code, hopefully helpful/suggestive):
import threading, Queue
class RequestOrchestrator:
# working functions, must be overridden in subclasses
def fee(self, *args): raise NotImplementedError
def fie(self, *args): raise NotImplementedError
def foo(self, *args): raise NotImplementedError
def fum(self, *args): raise NotImplementedError
def __init__(self, N=0):
# default/minimum number of working threads in pool
if N<4: N=4
# prepare queues
self.resultsQueue = Queue.Queue()
self.workRequestsQueue = Queue.Queue()
# prepare and start N working threads
for i in range(N):
t = threading.Thread(target=self._waitForWork)
t.setDaemon(1)
t.start()
class WorkRequest:
# group arbitrary function call, plus arbitrary tag
def __init__(self, tag, function=None, args=()):
self.tag = tag
self.function = function
self.args = args
def _waitForWork(self):
while 1:
# get work request, perform work, post result
workRequest = self.workRequestsQueue.get()
result = workRequest.function(*workRequest.args)
self.resultsQueue.put((workRequest.tag, result))
def satisfyRequest(self, requestString):
numSlices = len(requestString)
# prepare and post the work requests
for i in range(numSlices):
req = self.WorkRequest(i);
c = requestString[i]
if c=='e': req.function = self.fee
elif c=='i': req.function = self.fie
elif c=='o': req.function = self.foo
else:
req.function = self.fum
req.args = (c,)
self.workRequestsQueue.put(req)
# collect/collate/return results
slices = [None] * numSlices
for i in range(numSlices):
index, result = self.resultsQueue.get()
slices[index] = str(result)
return ''.join(slices)
Couldn't some architecture patterned on this example satisfy
your needs?
Alex
More information about the Python-list
mailing list