Thread processing

Dave Brueck dave at pythonapocrypha.com
Thu Mar 6 11:51:48 EST 2003


On Wed, 5 Mar 2003, Paul Moore wrote:

> In practice, I'd nearly always want to write a log message saying the
> processing had finished, so I'd wait for all the threads to complete.
> For that, simply joining in any order is fine (but I need to keep a
> list of the threads so I can join them all, and hence actually know
> when I finished).

FWIW I very rarely use thread.join because of the problems you've already
discovered. Besides, IMO it emphasizes the thread as the important thing
in your framework rather than the work the thread is doing.

> The basic problem is that I can easily *see* my solution in terms of
> waiting for "the next thread to finish". But I can't *code* it that
> way. So whatever I do is a workaround, and makes it harder for me to
> express what I really want, because the infrastructure is getting in
> the way.

And therein lies the problem, IMO. Again, maybe things would "feel" more
right if you focus more on the work being done than on the threads. I
mean, do you really care about which thread finishes next or is it the
completion of a chunk of work? The rest of the program doesn't really care
how the work is getting processed. For example, it may be processed in any
of these ways:

- a single thread handles all work one chunk at a time in a FIFO manner
- a new thread is started for each piece of work
- you start a fixed number of threads and each takes a piece of work, does
it, gets a new piece of work, etc, until all the work is done
- the work is I/O related so it's all handled asynchronously by
select/poll

If you take the work-centric view, then any of the above implementations
can "feel" fine because your focus is on getting the work done.

> > You don't need to join the thread if it is just going to terminate on its
> > own.
> >>
> >>         threads.remove[thread]
> >
> > Do you actually need a list of all the threads? Why bother?
>
> So that I can do the "while threads" above. Otherwise, I have no way
> of knowing when I have no threads left, and hence no way of knowing
> when to *stop* waiting on the queue.

Another approach to use is to use a semaphore as a counter of outstanding
work:

counter = threading.Semaphore(0)

def ThreadFunc(a,b,c):
  try:
    # do some work
  finally:
    counter.release()

WORK_COUNT=10
for i in range(10):
  threading.Thread(targe=ThreadFunc).start()

for i in range(10):
  counter.acquire()

> (OK, a simple counter is probably enough here, but the thread gives me
> better generality for if I have a thread subclass with useful extra data
> in it).

I know it's tempting to come up with a general-purpose framework, but to
be honest, there's so little code involved that it might not be worth the
effort - once you're familiar with the patterns then it takes about the
same amount of effort to write it as to find/import/use a prebuilt
framework. For example, the above 10 lines of code is a pattern I use from
time to time but it's so tiny that I've never bothered to save it to
some library.

> Can you explain how you'd structure the above without needing a "while
> threads" loop? So that the main thread stops waiting on the queue when
> the last worker has finished. If I could see that, I might be closer
> to understanding this.]

Here's another pattern that's sometimes useful (coding from memory, not
tested):

WORKER_COUNT = 5
WORKER_QUIT = 'quit!'

class GV: pass
GV.inQ = Queue.Queue()
GV.outQ = Queue.Queue()

def StartWorkers():
  for i in range(WORKER_COUNT):
    threading.Thread(target=Worker).start()

def StopWorkers():
  for i in range(WORKER_COUNT):
    GV.inQ.put(WORKER_QUIT)

def Worker():
  while 1:
    work = GV.inQ.get()
    if work == WORKER_QUIT: break

    # do something with work
    # add result to GV.outQ if needed

Using this approach you call StartWorkers to create a fixed-size pool of
workers - useful in cases when you have e.g. 1000s of jobs to do so
one-thread-per-job isn't feasible. Anyway, each worker waits for work to
do, finishes it as quickly as possible, and optionally adds results to
some output queue. If your work was to fetch 1000 objects off the web then
your main thread might look something like this:

StartWorkers()

# Hand off all the work
count = 0
f = open('urls.list')
for line in f.xreadlines():
  count += 1
  GV.inQ.put(line)

# Process the results
for i in xrange(count):
  obj = GV.outQ.get()
  # do something with it

StopWorkers()

The above approach seems very natural to me because it is work-centric -
you care about doing work and for all you know the worker pool could be a
single thread, a fixed number of threads, no threads, whatever. Notice
that the main thread doesn't ever explicitly wait for the threads to
finish - they _do_ finish, of course, but the how and when isn't something
the rest of the program really cares about.

> > Is it really _that_ big of a deal to have the worker thread put done work
> > on a queue?
>
> For cases where all the worker is saying is "I've finished", I believe
> so. For a start, I have to pass that queue to the worker in the first
> place, which constrains the form of my worker.

As someone else pointed out - your worker func could simply return a value
(making it easy to test in isolation) and you could just wrap the worker
function in some other function that takes the result and adds it to a
work queue. This also generalizes well to the cases where there are no
results to report - the worker function implicitly returns None anyway.

-Dave





More information about the Python-list mailing list