[Tutor] How to use threads ?

Terry Carroll carroll at tjc.com
Wed Mar 2 10:02:22 CET 2005


On Tue, 1 Mar 2005, Mark Kels wrote:

> Can anyone give me a very simple example on thread programming ?

I don't think a simple example is possible, given that threads are 
inherently for slightly more complex processing than you ordinarily do.

That being said, here's an example.

This is a made-up process.  It shows both queuing and an arbitrary number 
of threads.

First, the imports:

###########################
import random, os, time, sys
import threading, Queue
###########################

No, I'll define a class for the "work" to be performed.  Basically, each 
work unit just specifies how much time the system is supposed to wait, in 
second; pretend that it's doing some amount of work that takes some 
number of seconds to be performed:

###########################
class workunit(object):
    """
    Sample object to be put on a queue similating work to be done.
    variables:
      counter: a serial number just for identification purposes.
      waittime: the time in second it uses up.
      Done: a flag used for a special-purpose end-of-queue sentinel,
         in which case waittime is ignored.
    """
    counter = 1
    def __init__(self, waittime=0, Done=False):
        self.counter = workunit.counter
        self.waittime = waittime
        self.Done = Done
        workunit.counter += 1
###########################

This will be called in one of two ways:

 w = workunit(waittime=20) # to indicate wait for 20 seconds; or
 w = workunit(Done=True) # to add a "dummy" work unit to the queue so 
                         # everyone know the queue is empty and finished.


Okay, imagine a queue (or just a list) full of work units like the above.  
Here's a plain old sequential NON-THREAD way of processing this:

###########################
def ProcessQueue(work_queue):
    """
    Method to process an element from the queue 
     (Non-Threaded implementation).
    All it does is loop, doing the following:
      pull an element from the queue (break out if it's a "Done" marker)
      print a starting message;
      wait for the specified amount of time;
      print an ending message
    """
    while True:
        queue_entry = work_queue.get()
        if queue_entry.Done:
            break
        print "%s starting on workunit %d, %d secs" % \
           (time.asctime(), queue_entry.counter, queue_entry.waittime)
        time.sleep(queue_entry.waittime)
        print "%s ending for workunit %d" % \
           (time.asctime(), queue_entry.counter)
############################

Okay, understand that, first  See what it's doing?  It's just 
popping things off the work_queue, printing a message, waiting for 
the indicated amount of time in the work unit, and printing another 
message; then starting over.

Now, let's try the same approach with threads.  Firrst, the class 
declaration:

#############################
class ThreadProcessQueue(threading.Thread):
    """
    This is a Threaded equivalent to ProcessQueue().
    """
#############################

Now, here's the ThreadProcessQueue.__init__:

#############################
    def __init__(self, threadname, work_queue, **kwds):
        self.tname = threadname
        self.work_queue = work_queue
        threading.Thread.__init__(self, **kwds)
        print "%s Thread %s started" % (time.asctime(), self.tname) 
#############################

The parameters here are an arbitrary name for the thread, and the queue it 
will process.  All __init__ does is print a message that the thread 
started.

Here's the guts of it, the ThreadProcessQueue.__run__ method.  NOte how 
similar it is to the non-Threaded version:

#############################
    def run(self):
        while True:
            queue_entry = work_queue.get()
            if queue_entry.Done:
                break
            print "%s Thread %s starting on workunit %d, %d secs" % \
               (time.asctime(), self.tname, queue_entry.counter, 
queue_entry.waittime)
            time.sleep(queue_entry.waittime)
            print "%s Thread %s ending for workunit %d" % \
               (time.asctime(), self.tname, queue_entry.counter)
        
        print "%s %s thead ending." % (time.asctime(), self.tname)
        self.work_queue.put(queue_entry)
#############################

The only real difference is that the messages produced include an 
identifier so you can see which thread is generating which message; and 
also there's that self.work_queue.put(queue_entry) at the end. I'll 
discuss that at the end of this message.

Now, here's the main program that uses these.  First some setup:

############################
print "MAIN: %s starting..." % (time.asctime())
work_queue = Queue.Queue()
NumWorkUnits=8
NumThreads=3
WaitTimes = [3,6,9,12,1,5,5,1]
lenWaitTimes = len(WaitTimes)
# WaitTimes is just a list of some arbitrary times representing work
# A particular WorkUnit will wait for one of these times.
ThreadList=[]
###########################

Queue is s specialized type of FIFO list, which is made to be shared among 
concurrently running threads.  We use that instead of a plain list.

NumWorkUnits and NumThreads are just constants, for the number of work 
units that we'll put in the queue, and the number of threads that will 
read from them.  

WaitTimes is just an arbitrary list of numbers that we'll randomly select 
from later.  Every work unit will get a random one of these, which 
indicates how long it should take to be processed (e.g., 1 for a 1-second 
workunit, 12 for a 12-second workunit, etc.

ThreadList is a list that will contain all the threads.

Let's get those threads going:

#################################
# make up a list of threads (not started yet)
for i in range(1,NumThreads+1):
    ThreadName = "T%03d" % i
    ThreadList.append(ThreadProcessQueue(ThreadName, work_queue))
#################################

Okay, this has created three threads (NumThreads = 3), with names T001, 
T002 and T003.  Each has been passed the work_queue (which is still 
empty), and all three threads have been added to the ThreadList.

The threads all exist at this point, but have not yet been started.  
That's next:

###############################
# start the Threads
for t in ThreadList:
   t.start() 
print "%s MAIN: all threads started" % (time.asctime())
###############################

This just starts the threads.  The __run__ method in each thread starts 
running.  The problem is, with the work queue empty, they'll just sit 
there.

So, let's start putting stuff into the work queue:

###############################
# Start putting things on the queue
for i in range(NumWorkUnits):
    random_wait = WaitTimes[int(random.uniform(0,len(WaitTimes)))]
    w = workunit(random_wait)
    work_queue.put(w)
###############################

This just selects a random amount of time to wait from the WaitTimes list, 
creates a work unit specifying that amount of time, and puts the workunit 
on the queue.

At this point, the threads should start waking up; they were all sitting 
on a queue.get for an empty queue.  NOw that they can start pulling things 
off of it, they will.

I also want to put an end-of-queue element here:

###############################
# Put a shutdown indicator
work_queue.put(workunit(Done=True))
###############################

That's it!  Just for grins, we can add a "final" shutdown message:

############################
print "%s MAIN: all done." % (time.asctime())
############################

But you'll see that doesn't work very well.

Here's what I see as output on one run:

Wed Mar 02 00:40:05 2005 MAIN starting...
Wed Mar 02 00:40:05 2005 Thread T001 started
Wed Mar 02 00:40:05 2005 Thread T002 started
Wed Mar 02 00:40:05 2005 Thread T003 started
Wed Mar 02 00:40:05 2005 MAIN: all threads started
Wed Mar 02 00:40:05 2005 Thread T001 starting on workunit 1, 5 secs
Wed Mar 02 00:40:05 2005 Thread T002 starting on workunit 2, 12 secs
Wed Mar 02 00:40:05 2005 Thread T003 starting on workunit 3, 9 secs
Wed Mar 02 00:40:05 2005 MAIN: all done.
Wed Mar 02 00:40:10 2005 Thread T001 ending for workunit 1
Wed Mar 02 00:40:10 2005 Thread T001 starting on workunit 4, 6 secs
Wed Mar 02 00:40:14 2005 Thread T003 ending for workunit 3
Wed Mar 02 00:40:14 2005 Thread T003 starting on workunit 5, 12 secs
Wed Mar 02 00:40:16 2005 Thread T001 ending for workunit 4
Wed Mar 02 00:40:16 2005 Thread T001 starting on workunit 6, 12 secs
Wed Mar 02 00:40:17 2005 Thread T002 ending for workunit 2
Wed Mar 02 00:40:17 2005 Thread T002 starting on workunit 7, 1 secs
Wed Mar 02 00:40:18 2005 Thread T002 ending for workunit 7
Wed Mar 02 00:40:18 2005 Thread T002 starting on workunit 8, 1 secs
Wed Mar 02 00:40:19 2005 Thread T002 ending for workunit 8
Wed Mar 02 00:40:19 2005 T002 thead ending.
Wed Mar 02 00:40:26 2005 Thread T003 ending for workunit 5
Wed Mar 02 00:40:26 2005 T003 thead ending.
Wed Mar 02 00:40:28 2005 Thread T001 ending for workunit 6
Wed Mar 02 00:40:28 2005 T001 thead ending.


>From the queue's point of view, here is how the work got parcelled out 
among threads:

Work units:
  1: 5 sec (T001)
  2: 12 sec (T002)
  3: 9 sec (T003)
  4: 6 sec (T001)
  5: 12 sec (T003)
  6: 12 sec (T001)
  7: 1 sec (T002)
  8: 1 sec (T002)

Here's a graphical view of what the threads were doing:

T001: 11111444444666666666666
T002: 22222222222278
T003: 333333333555555555555

You can see that T002 ended first, followed by T003 and then T001 (which 
matches the timestamps).

Okay, a couple oddities.  First, that self.work_queue.put(queue_entry)
line.  This is so that when the first thread to see the Done marker sees 
it, it puts it back on the queue before it exits.  That way, each of the 
other threads sees it, puts it back on for the next and exits.

(I could have instead written this just to exeit when the queue was empty,  
but there are some ugly issues with that, too, for example, if the thing 
that loads up the queue pauses, the queue empties and all the threads 
quit, and then the queue loader adds more, with the threads already gone.)

The other oddity: Note that the MAIN "final message" issued long before 
the threads were done.  I wanted this example to nicely wait for all the 
threads to end before putting out that message, but couldn't figure out 
how to do that.  For those familiar with threads: calls to t.isAlive() 
returned True long after the thread referred to had finished up and put 
out its final shitdown message.

Anyway, I hope this helps as a bit of a tutorial.



More information about the Tutor mailing list