Threading Pool Event()

Anand anandpillai6 at yahoo.com
Mon Jul 21 03:01:34 EDT 2003


What if you want to make sure that there are exactly
'n' threads in the queue a given moment ? 

I had a problem of a similar kind, which I solved by
deriving a "Thread monitor" class from Queue. I initialized
the Queue with a given size (the number of threads in it
a given moment, the number 'n' above). But, I found that
this does not happen accurately as I wanted. I solved
it by using a non-blocking queue and managing the exception
by using an Event and by using my own locks.

The code would look something like this ...
It uses a polling mechanism with sleeping.

# Note: bad indentation!

class TrackerMonitor:

def __init__(self):
     ...
     ...
     self._event= threading.Event()
     self._lock = threading.Lock()
     # Set the event so that threads dont block!
     self._event.set()

def runThreads(self):

    # Use a non-blocking queue and manage exceptions
    # on my own.

    while threading.activeCount()>1:  # Run till all threads are exhausted
          if self._flag: break        # flag to signal end of polling
          time.sleep(1.0)
          try:
              t=self.get(0)
              self._trackers.append(t) # local list for easier management
              t.start()
          except Empty:
              self._event.set()       # Wake up all threads waiting for this
                                      # event

def wait(self):
    """ Block on the event """

    self._event.wait()


def addToQueue(self, tracker): 
    """ Add a thread to the thread queue """

    # Acquire lock
    self._lock.acquire()
    cond = 0
    
    # Use a non-blocking queue and manage exceptions
    # my own
    while not cond:
       try:
          self.put(tracker, 0)
          cond = 1
       except Full:
          self._event.clear()   # Clear event so that all waiting threads
                                # block, till the queue is empty again
        
    # Release lock
    self._lock.release()

And in the thread code...

class Tracker(threading.Thread):

     def __init___(self, monitor):
          ...
          ...
          self._monitor = monitor # Instance of above class

     def doMyJobInAloop(self):

         for int in range(0, len(jobs)):
             ...
             ...
             # Wait on the event of the monitor
             self._monitor.wait()
             # Do my work
             ...
             # This code spawns more thread objects of this
             # class, all of which call the same loop as this
             # one.


As you can see, I use an event in my queue derived class to signal
threads when they can join the queue. The event is set when the queue
is empty and cleared when the queue is full. All threads block on this
event in their main loop.

With this mechanism, I was able to get more control on the queue's
handling of threads, which is what I wanted.

I am not sure how this affetcs performance theorotically, since I am not
using the Queue's internal locking mechanism. 

But as far as the program goes, I am happy with the way it performs.

This might be one way the O.P can use Events in his implementation.

~Anand

"Cliff Wells" <logiplex at qwest.net> wrote in message news:<mailman.1058567369.23096.python-list at python.org>...
> On Fri, 2003-07-18 at 14:28, Graeme Matthew wrote:
> > Aahz
> > 
> > Thanks, ive actually been using your OSCON slides which have helped a lot.
> > So it technically correct that all worker threads in a thread pool are
> > either doing some work  or polling the queue to find out if there is a job
> > to process ?
> 
> They don't poll the queue, they block waiting on it until something is
> available.
> 
> > eg: (pseudocodish :-))
> > 
> > def run(self):
> > 
> >     while 1:
> > 
> >         self.lock.acquire()
> > 
> >         if QueueManager.HasJob:
> >             job = QueueManager.GetFreeJob()
> >             __processJob(job)
> > 
> >         self.lock.release()
> > 
> >  def __processJob(self):
> > 
> >         blah blah blah .....
> 
> More like:
> 
> def run():
>     while 1:
>         job = queue.get() # this blocks until something is queue.put()
>         _processjob(job)
> 
> 
> Acquiring the locks isn't necessary and ill-advised.  The Queue itself
> can be thought of as a type of locking mechanism, so you raise the
> possibility of a deadlock condition by nesting locks (for instance if
> you also put locks around the queue.put()).
> 
> Regards,




More information about the Python-list mailing list