Need advice on multithreading problem

Gerhard Häring gerhard at bigfoot.de
Tue Jun 4 09:07:50 EDT 2002


This morning, I've tried to seriously use multithreading in Python for the
first time. So I'd like to get any comments on the design of my
proof-of-concept that I'll paste below at full length. Hope it's not too long
for usenet, but I don't want to throw my comments/docstrings away.

The problem is this: I have a C library with a function of the following
interface:

    def sqlite_exec(conn, sql, callback, arg1):

The only important parameter here is 'callback', it is a callback function that
you need to give to sqlite_exec. The callback function will be called with the
items sqlite_exec returns. If the callback functions returns a value other than
zero, sqlite_exec will stop its processing.

On the Python side, I wanted to force this C library call into a generator.
Which I did using a thread that's spawned for a new sqlite_exec call and
Queue.Queue.

The proof-of-concept *seems* to work, but there might be ugly errors hidden
still. Any comments are welcome.

Gerhard

--- begin code ---

"""
 This is a proof-of-concept for the implementation of a new alternative
 Cursor class that will exploit Python 2.2 generators and the SQLite
 sqlite_exec function with a custom callback.
 
 The point of this exercise is to provide a means to _not_ have to keep the
 entire result set in memory, but to return row by row to the caller on demand,
 using generators.
 
 Of course it will be slower than the current implementation. If nothing else,
 it's a nice programming exercise for me to learn how to handle multithreading
 reliably.
 """
 
 from __future__ import generators
 from Queue import Queue, Empty, Full
 import threading
 
 # Singleton to mark that the last element of a queue. If this is put into a
 # queue, the consumer will stop reading from the queue.
 END_OF_QUEUE = 1
 
 # SQLite's sqlite_exec function looks similar to this one:
 def sqlite_exec(conn, sql, callback, arg1):
     # Let's assume that SQLite would return 15 results
     for i in range(15):
         # According to the docs, if the callback returns something other
         # than 0, the query will be aborted.
         if callback(arg1, ["a", "b", i]) != 0:
             break
 
 class Producer(threading.Thread):
     """A separate producer thread that wraps sqlite_exec, provides all callback
     function for sqlite_exec and fills a queue with the result rows for the
     consumer to read from.""" 
     def __init__(self, *args, **kwargs):
         self.stopflag = 0
         self.queue = kwargs["queue"]
         self.conn = kwargs["conn"]
         self.sql = kwargs["sql"]
         del kwargs["queue"]
         del kwargs["conn"]
         del kwargs["sql"]
         threading.Thread.__init__(self, *args, **kwargs)
 
     def run(self):
         def callback(arg1, items):
             """Callback function for sqlite_exec."""
             if self.stopflag:
                 self.queue.put(END_OF_QUEUE, 1)
                 # We want to abort the query:
                 return 1
             else:
                 self.queue.put(items, 1)
             return 0
             
         sqlite_exec(self.conn, self.sql, callback, None)
         self.queue.put(END_OF_QUEUE, 1)
             
 class Cursor:
     def __init__(self):
         self.conn = None
         self.queue = None
         self.producer = None
         self.__invalidate()
 
     def __invalidate(self):
         """Invalidate is called when a new query is processed with execute or
         executemany or the cursor is closed or the cursor object gets out of
         scope."""
         self.current_sql = None
         if self.producer is not None:
             self.producer.stopflag = 1
 
             # Try to empty the queue, so the producer can insert its
             # END_OF_QUEUE and won't block forever.
             try:
                 while 1:
                     item = self.queue.get(0)
             except Empty, reason:
                 pass
 
     def execute(self, sql, *parms):
         # The query isn't actually processed at this point. It will only be
         # processed in the fetchone/fetchmany/fetchall methods, using __execute
         self.__invalidate()
         self.current_sql = sql
 
     def __execute(self):
         """Build a queue and start the producer thread."""
 
         # We keep max 10 result rows in memory at any given point in time:
         self.queue = Queue(10)
 
         self.producer = Producer(conn=None, sql=self.current_sql, queue=self.queue)
         self.producer.start()
         self.current_sql = None
 
     def fetchall(self):
         if self.current_sql is not None:
             self.__execute()
 
         while 1:
             item = self.queue.get(1)
             if item is END_OF_QUEUE:
                 raise StopIteration
             yield item
 
     def fetchmany(self, howmany=15):
         if self.current_sql is not None:
             self.__execute()
         for i in range(howmany):
             item = self.queue.get(1)
             if item is END_OF_QUEUE:
                 raise StopIteration
             yield item
 
     def fetchone(self):
         if self.current_sql is not None:
             self.__execute()
         item = self.queue.get(1)
         if item is END_OF_QUEUE:
             return None
         else:
             return item
         
     def close(self):
         self.__invalidate()
 
     def __del__(self):
         self.__invalidate()
 
 c = Cursor()
 c.execute("select * from foo")
 print "--- fetchone() ---"
 print c.fetchone()
 print "--- fetchmany(5) ---"
 for res in c.fetchmany(5):
     print res
 print "--- fetchall() ---"
 for res in c.fetchall():
     print res
 c.close()
 
 




More information about the Python-list mailing list