threading - race condition?

Rhamphoryncus rhamph at gmail.com
Mon May 12 16:19:15 EDT 2008


On May 12, 1:31 pm, skunkwerk <skunkw... at gmail.com> wrote:
> On May 12, 1:40 am, Rhamphoryncus <rha... at gmail.com> wrote:
>
>
>
> > On May 11, 10:16 am,skunkwerk<skunkw... at gmail.com> wrote:
>
> > > On May 10, 1:31 pm, Dennis Lee Bieber <wlfr... at ix.netcom.com> wrote:
>
> > > > On Fri, 9 May 2008 08:40:38 -0700 (PDT),skunkwerk<skunkw... at gmail.com>
> > > > declaimed the following in comp.lang.python:
>
> > > >         Coming in late...
>
> > > > > On May 9, 12:12 am, John Nagle <na... at animats.com> wrote:
> > > > > >skunkwerkwrote:
> > > > > > > i've declared a bunch of workerthreads(100) and a queue into which
> > > > > > > new requests are inserted, like so:
>
> > > >         <snip>
>
> > > > > > > queue = Queue.Queue(0)
> > > > > > >  WORKERS=100
> > > > > > > for i in range(WORKERS):
> > > > > > >    thread = SDBThread(queue)
> > > > > > >    thread.setDaemon(True)
> > > > > > >    thread.start()
>
> > > > > > > the thread:
>
> > > > > > > class SimpleDBThread ( threading.Thread ):
> > > > > > >    def __init__ ( self, queue ):
> > > > > > >            self.__queue = queue
>
> > > >         Note: double-leading __ means "name mangling" -- typically only
> > > > needed when doing multiple layers of inheritance where different parents
> > > > have similar named items that need to be kept independent; a single _ is
> > > > the convention for "don't touch me unless you know what you are doing"
>
> > > > > > >            threading.Thread.__init__ ( self )
> > > > > > >    def run ( self ):
> > > > > > >            while 1:
> > > > > > >                    item = self.__queue.get()
> > > > > > >                    if item!=None:
> > > > > > >                            model = domain.get_item(item[0])
> > > > > > >                            logger.debug('sdbthread item:'+item[0])
> > > > > > >                            title = model['title']
> > > > > > >                            scraped = model['scraped']
> > > > > > >                            logger.debug("sdbthread title:"+title)
>
> > > > > > > any suggestions?
> > > > > > > thanks
>
> > > >         <snip>
>
> > > > > thanks John, Gabriel,
> > > > >   here's the 'put' side of the requests:
>
> > > > > def prepSDBSearch(results):
> > > > >    modelList = [0]
> > > > >    counter=1
> > > > >    for result in results:
> > > > >            data = [result.item, counter, modelList]
> > > > >            queue.put(data)
> > > > >            counter+=1
> > > > >    while modelList[0] < len(results):
> > > > >            print 'waiting...'#wait for them to come home
> > > > >    modelList.pop(0)#now remove '0'
> > > > >    return modelList
>
> > > >         My suggestion, if you really want diagnostic help -- follow the
> > > > common recommendation of posting the minimal /runable (if erroneous)/
> > > > code... If "domain.get_item()" is some sort of RDBM access, you might
> > > > fake it using a pre-loaded dictionary -- anything that allows it to
> > > > return something when given the key value.
>
> > > > > responses to your follow ups:
> > > > > 1)  'item' in thethreadsis a list that corresponds to the 'data'
> > > > > list in the above function.  it's not global, and the initial values
> > > > > seem ok, but i'm not sure if every time i pass in data to the queue it
> > > > > passes in the same memory address or declares a new 'data' list (which
> > > > > I guess is what I want)
>
> > > >         Rather confusing usage... In your "put" you have a list whose first
> > > > element is "result.item", but then in the work thread, you refer to the
> > > > entire list as "item"
>
> > > > > 3)  the first item in the modelList is a counter that keeps track of
> > > > > the number ofthreadsfor this call that have completed - is there any
> > > > > better way of doing this?
>
> > > >         Where? None of your posted code shows either "counter" or modelList
> > > > being used by thethreads.
>
> > > >         And yes, if you havethreadstrying to update a shared mutable, you
> > > > have a race condition.
>
> > > >         You also have a problem if you are using "counter" to define where
> > > > in modelList a thread is supposed to store its results -- as you can not
> > > > access an element that doesn't already exist...
>
> > > > a = [0]
> > > > a[3] = 1        #failure, need to create elements 1, 2, 3 first
>
> > > >         Now, if position is irrelevant, and a thread just appends its
> > > > results to modelList, then you don't need some counter, all you need is
> > > > to check the length of modelList against the count expected.
>
> > > >         Overall -- even though you are passing things via the queue, the
> > > > contents being pass via the queue are being treated as if they were
> > > > global entities (you could make modelList a global, remove it from the
> > > > queue entries, and have the same net access)...
>
> > > >         IOWs, you have too much coupling between thethreadsand the feed
> > > > routine...
>
> > > >         As for me... I'd be using a second queue for return values...
>
> > > > WORKERTHREADS = 100
> > > > feed = Queue.Queue()
> > > > result = Queue.Queue()
>
> > > > def worker():
> > > >         while True:
> > > >                 (ID, item) = feed.get()                 #I leave the queues globals
> > > >                                                                                 #since they perform locking
> > > >                                                                                 #internally
> > > >                 model = domain.get_item(item)
> > > >                 results.put( (ID, model["title"], model["scraped"]) )
>
> > > > for i in range(WORKERTHREADS):
> > > >         aThread = threading.Thread(target=worker)
> > > >                 #overkill to subclass as there is now no specialized init
> > > >                 #and if I really wanted to make the queues non-global
> > > >                 #I'd pass them as arguments:
> > > >                 #       threading.Thread(target=worker, args=(feed, results))
> > > >                 #where worker is now
> > > >                 #       def worker(feed, results):
> > > >         aThread.setDaemon(True)
> > > >         aThread.start()
>
> > > > ...
>
> > > > def prepSearch(searches):
> > > >         modelList = []
> > > >         counter = 0
> > > >         for searchItem in searches:
> > > >                 feed.put( (counter, searchItem) )
> > > >                 counter += 1
> > > >                 modelList.append(None)  #extend list one element per search
> > > >         while counter:
> > > >                 (ID, title, scraped) = results.get()
> > > >                 modelList[ID] = (title, scraped)
> > > >                 counter -= 1
> > > >         return modelList
>
> > > >         The only place counter and modelList are modified are within the
> > > > prepSearch. I'm passing counter out and back to use as an ID value if
> > > > the final results are supposed to be in order -- that way if one thread
> > > > finishes before another, the items can be placed into the list where
> > > > they'd have been sequentially.
>
> > > >         I can only hope that "domain.get_item" is an activity that is I/O
> > > > bound AND that it supports parallel accesses... Otherwise the above
> > > > workerthreadsseem to be adding a lot of overhead for queue I/O and
> > > > threading swaps for what is otherwise a rather linear process.
>
> > > >         Perhaps your posts don't reveal enough... Maybe you have multiple
> > > > mainthreadsthat are posting to the worker feed queue (and you were
> > > > using separate mutables for storing the results). In this situation, I'd
> > > > remove the results queue from being a global entity, create one queue
> > > > per main processing thread, and pass the queue as one of the parameters.
> > > > This way, a worker can return data to any source thread by using the
> > > > supplied queue for the return...
>
> > > > Modify prepSearch with:
>
> > > >         myQueue = Queue.Queue()
> > > > ...
> > > >         feed.put( (counter, searchItem, myQueue) )
> > > > ...
> > > >         (ID, title, scraped) = myQueue.get()
>
> > > > Modify worker with:
>
> > > >         (ID, item, retqueue) = feed.get()
> > > > ...
> > > >         retqueue.put( (ID, model["title"], model["scraped"]) )
>
> > > wow.  thanks for the help.
> > >    i seem to have fixed my problem though - it turns out
> > > domain.get_item was not thread safe as it was using the underlying
> > > httplib.  the solution was to create a new connection to the database
> > > for each thread (which is ok as the database is meant to be queried in
> > > a massively paralell fashion).  the missing part of the code included
> > > a part where i inserted the results at the given position into the
> > > list.
>
> > > the only issue i have now is that it takes a long time for 100 threads
> > > to initialize that connection (>5 minutes) - and as i'm doing this on
> > > a webserver any time i update the code i have to restart all those
> > > threads, which i'm doing right now in a for loop.  is there any way I
> > > can keep the thread stuff separate from the rest of the code for this
> > > file, yet allow access?  It wouldn't help having a .pyc or using
> > > psycho, correct, as the time is being spent in the runtime?  something
> > > along the lines of 'start a new thread every minute until you get to a
> > > 100' without blocking the execution of the rest of the code in that
> > > file?  or maybe any time i need to do a search, start a new thread if
> > > the #threads is <100?
>
> > $ python2.5 -m timeit -s 'import threading' 't = threading.Thread();
> > t.start(); t.join()'
> > 10000 loops, best of 3: 131 usec per loop
>
> > Clearly it is not the threads themselves, but something else which is
> > expensive.
>
> > It's not clear why you need threads at all.  Unless you've got a lot
> > of cpus/cores running that DBMS, or it's got fairly high latency (and
> > no way to pipeline), attacking it with more threads isn't gonna give
> > significant speedups.
>
> correct.  the threads themselves are not taking up the time, but the
> initialization of each thread (which includes making a new connection
> to the database) - typically this is ~3 seconds.  The database is
> amazon's simpleDB, which is meant to support massively parallel
> queries.  once the connection has been made, queries are very fast.

So it shouldn't take much more than 3 seconds to create all 100
threads.  It certainly should take 5 minutes.  However, 3 seconds *
100 = 5 minutes, so it sounds like the connection process is getting
serialized somehow.

Maybe you're doing the connection in the thread's __init__?  Note that
__init__ is called when the thread *object* is created, by the main
thread, and not when you *start* the thread.

I find the threading.Thread API to be vastly over complicated.  It's
much simpler to wrap it like this:

def start_thread(func, *args, **kwargs):
    t = threading.Thread(target=func, args=args, kwargs=kwargs)
    t.start()
    return t

Then you can pass a function to start_thread and it will run in the
new child thread.



More information about the Python-list mailing list