collecting results in threading app

George Sakkis george.sakkis at gmail.com
Fri Apr 4 13:54:51 EDT 2008


On Apr 4, 11:27 am, Gerardo Herzig <gher... at fmed.uba.ar> wrote:

> John Nagle wrote:
> >Gerardo Herzig wrote:
>
> >>Hi all. Newbee at threads over here. Im missing some point here, but cant
> >>figure out which one.
>
> >>This little peace of code executes a 'select count(*)' over every table
> >>in a database, one thread per table:
> >><code>
> >>class TableCounter(threading.Thread):
> >>   def __init__(self, conn, table):
> >>       self.connection = connection.Connection(host=conn.host,
> >>port=conn.port, user=conn.user, password='', base=conn.base)
> >>       threading.Thread.__init__(self)
> >>       self.table = table
>
> >>   def run(self):
> >>       result =  self.connection.doQuery("select count(*) from %s" %
> >>self.table, [])[0][0]
> >>       print result
> >>       return result
>
> >>class DataChecker(metadata.Database):
>
> >>   def countAll(self):
> >>       for table in self.tables:
> >>           t = TableCounter(self.connection, table.name)
> >>           t.start()
> >>       return
> >></code>
>
> >>It works fine, in the sense that every run() method prints the correct
> >>value.
> >>But...I would like to store the result of t.start() in, say, a list. The
> >>thing is, t.start() returns None, so...what im i missing here?
> >>Its the desing wrong?
>
> >     1.  What interface to MySQL are you using?  That's not MySQLdb.
> >     2.  If SELECT COUNT(*) is slow, check your table definitions.
> >         For MyISAM, it's a fixed-time operation, and even for InnoDB,
> >         it shouldn't take that long if you have an INDEX.
> >     3.  Threads don't return "results" as such; they're not functions.
>
> >As for the code, you need something like this:
>
> >class TableCounter(threading.Thread):
> >    def __init__(self, conn, table):
> >      self.result = None
> >      ...
>
> >     def run(self):
> >         self.result =  self.connection.doQuery("select count(*) from %s" %
> >  self.table, [])[0][0]
>
> >     def countAll(self):
> >         mythreads = [] # list of TableCounter objects
> >    # Start all threads
> >         for table in self.tables:
> >             t = TableCounter(self.connection, table.name)
> >             mythreads.append(t) # list of counter threads
> >             t.start()
> >         # Wait for all threads to finish
> >         totalcount = 0
> >         for mythread in mythreads:         # for all threads
> >        mythread.join()                     # wait for thread to finish
> >             totalcount += mythread.result  # add to result
> >    print "Total size of all tables is:", totalcount
>
> >                                    John Nagle
>
> Thanks John, that certanly works. According to George's suggestion, i
> will take a look to the Queue module.
> One question about
>
> for mythread in mythreads:              # for all threads
>             mythread.join()                     # wait for thread to finish
>
> That code will wait for the first count(*) to finish and then continues
> to the next count(*). Because if is that so, it will be some kind of
> 'use threads, but execute one at the time'.
> I mean, if mytreads[0] is a very longer one, all the others will be
> waiting...rigth?

No, all will be executed in parallel; only the main thread will be
waiting for the first thread to finish. So if only the first job is
long, as soon as it finishes and join()s, all the others will already
have finished and their join() will be instantaneous.

> There is an approach in which i can 'sum' after *any* thread finish?
>
> Could a Queue help me there?

Yes, you can push each result to a queue and have the main thread wait
in a loop doing a queue.get() every time. After each get() you can do
whatever with the results so far (partial sum, update a progress bar,
etc.)

<shameless-plug>
You can take a look at papyros [1], a small package I wrote for hiding
the details behind a simple Pythonic API. Using papyros, your example
would look something like this:


import sys
from papyros import Job
from papyros.multithreaded import MultiThreadedMaster

# a papyros.Job subclass for each type of task you want to run
concurrently
class CountJob(Job):
    def __call__(self, connection, table_name):
        return connection.doQuery("select count(*) from %s" %
table_name, [])[0][0]


class DataChecker(metadata.Database):
    def countAll(self):
        sum_count = 0
        # create a pool of 4 threads
        master = MultiThreadedMaster(4)
        # issue all the jobs
        for table in self.tables:
            master.addJob(CountJob(self.connection, table.name))
        # get each processed job as soon as it finishes
        for job in iter(master.popProcessedJob, None):
            # the job arguments are available as job.args
            table_name = job.args[1]
            try: # try to get the result
                count = job.result
            except Exception, ex:
                # some exception was raised when executing this job
                print '* Exception raised for table %s: %s' %
(table_name, ex)
            else:
                # job finished successfully
                sum_count += count
                print 'Table %s: count=%d (running total=%d)' % (
                                    table_name, count, sum_count)
        return sum_count


As you can see, any exception raised in a thread is stored and
reraised on the main thread when you attempt to get the result. You
can also specify a timeout in popProcessedJob() so that the main
thread doesn't wait forever in case a job hangs.

Last but not least, the same API is implemented both for threads and
processes (using Pyro) so it's not restricted by the GIL in case the
jobs are CPU-intensive.
</shameless-plug>

George



More information about the Python-list mailing list