Newbie queue question

Piet van Oostrum piet at cs.uu.nl
Thu Jun 18 11:02:27 EDT 2009


>>>>> Jure Erznožnik <jure.erznoznik at gmail.com> (JE) wrote:

>JE> Hi,
>JE> I'm pretty new to Python (2.6) and I've run into a problem I just
>JE> can't seem to solve.
>JE> I'm using dbfpy to access DBF tables as part of a little test project.
>JE> I've programmed two separate functions, one that reads the DBF in main
>JE> thread and the other which reads the DBF asynchronously in a separate
>JE> thread.
>JE> Here's the code:

>JE> def demo_01():
>JE>     '''DBF read speed only'''

>JE>     dbf1 = Dbf('D:\\python\\testdbf\\promet.dbf', readOnly=1)
>JE>     for i1 in xrange(len(dbf1)):
>JE>         rec = dbf1[i1]
>JE>     dbf1.close()

>JE> def demo_03():
>JE>     '''DBF read speed into a FIFO queue'''

>JE>     class mt(threading.Thread):

>JE>         q = Queue.Queue(64)
>JE>         def run(self):
>JE>             dbf1 = Dbf('D:\\python\\testdbf\\promet.dbf', readOnly=1)
>JE>             for i1 in xrange(len(dbf1)):
>JE>                 self.q.put(dbf1[i1])
>JE>             dbf1.close()
>JE>             del dbf1
>JE>             self.q.join()

>JE>     t = mt()
>JE>     t.start()
>JE>     while t.isAlive():
>JE>         try:
>JE>             rec = t.q.get(False, 0.2)
>JE>             t.q.task_done();
>JE>         except:
>JE>             pass

>JE>     del t


>JE> However I'm having serious issues with the second method. It seems
>JE> that as soon as I start accessing the queue from both threads, the
>JE> reading speed effectively halves.

>JE> I have tried the following:
>JE> 1. using deque instead of queue (same speed)
>JE> 2. reading 10 records at a time and inserting them in a separate loop
>JE> (hoped the congestion would help)
>JE> 3. Increasing queue size to infinite and waiting 10 seconds in main
>JE> thread before I started reading - this one yielded full reading speed,
>JE> but the waiting took away all the threading benefits

>JE> I'm sure I'm doing something very wrong here, I just can't figure out
>JE> what.

For a start the thread switching and the queue administration just take
time, that you can avoid if you do everything sequentially.
Threading can have an advantage if there is the possiblilty of overlap.
But there is none in your example, so it's just overhead. If your
processing would do something substantially and if the reading of the
file would be I/O bound for example. You don't say how big the file is,
and it also may be in your O.S. cache so then reading it would
essentially be CPU bound.

And then there is this code:

     while t.isAlive():
         try:
             rec = t.q.get(False, 0.2)
             t.q.task_done(); 
        except:
            pass

t.q.get(False, 0.2) means do a non-blocking get, so that when there is
nothing in the queue it returns immediately and then takes the exception
path which also is substantial overhead. Whether this will happen or not
depends on the timing which depends on the scheduling of the O.S. For
example when the O.S. schedules the main task first it will be busy wait
looping quite a lot before the first item arrives in the queue. If the
file is small it will probably put then all the items in the queue and
there will be no more busy wait looping. But busy wait looping just
consumes CPU time.

By the way, the second parameter (0.2) which is supposed to be the
timeout period is just ignored if the first parameter is false. You
might be better off giving True as the first parameter to get.

I dislike any form of busy wait loop. It would be better to just use a
normal get(), but that conflicts with your end detection. while
t.isAlive() is not a particularly good way to detect that the processing
is finished I think because of timing issues. After the last
t.q.task_done() [which doesn't need a semicolon, by the way] it takes
some time before the self.q.join() will be processed and the thread
finishes. In the mean time while t.isAlive() is constantly being tested,
also wasting CPU time.

IMHO a better way is to put a sentinel object in the queue:

        def run(self):
             dbf1 = Dbf('D:\\python\\testdbf\\promet.dbf', readOnly=1)
             for i1 in xrange(len(dbf1)):
                 self.q.put(dbf1[i1])
             self.q.put(None)
             dbf1.close()
             del dbf1
             self.q.join()

    while True:
        rec = t.q.get()
        t.q.task_done()
        if rec is None: break

And then you probably can also get rid of the self.q.join() and
t.q.task_done()

-- 
Piet van Oostrum <piet at cs.uu.nl>
URL: http://pietvanoostrum.com [PGP 8DAE142BE17999C4]
Private email: piet at vanoostrum.org



More information about the Python-list mailing list