threading - race condition?
skunkwerk
skunkwerk at gmail.com
Sun May 11 12:16:25 EDT 2008
On May 10, 1:31 pm, Dennis Lee Bieber <wlfr... at ix.netcom.com> wrote:
> On Fri, 9 May 2008 08:40:38 -0700 (PDT),skunkwerk<skunkw... at gmail.com>
> declaimed the following in comp.lang.python:
>
> Coming in late...
>
> > On May 9, 12:12 am, John Nagle <na... at animats.com> wrote:
> > >skunkwerkwrote:
> > > > i've declared a bunch of workerthreads(100) and a queue into which
> > > > new requests are inserted, like so:
>
> <snip>
>
>
>
> > > > queue = Queue.Queue(0)
> > > > WORKERS=100
> > > > for i in range(WORKERS):
> > > > thread = SDBThread(queue)
> > > > thread.setDaemon(True)
> > > > thread.start()
>
> > > > the thread:
>
> > > > class SimpleDBThread ( threading.Thread ):
> > > > def __init__ ( self, queue ):
> > > > self.__queue = queue
>
> Note: double-leading __ means "name mangling" -- typically only
> needed when doing multiple layers of inheritance where different parents
> have similar named items that need to be kept independent; a single _ is
> the convention for "don't touch me unless you know what you are doing"
>
> > > > threading.Thread.__init__ ( self )
> > > > def run ( self ):
> > > > while 1:
> > > > item = self.__queue.get()
> > > > if item!=None:
> > > > model = domain.get_item(item[0])
> > > > logger.debug('sdbthread item:'+item[0])
> > > > title = model['title']
> > > > scraped = model['scraped']
> > > > logger.debug("sdbthread title:"+title)
>
> > > > any suggestions?
> > > > thanks
>
> <snip>
>
> > thanks John, Gabriel,
> > here's the 'put' side of the requests:
>
> > def prepSDBSearch(results):
> > modelList = [0]
> > counter=1
> > for result in results:
> > data = [result.item, counter, modelList]
> > queue.put(data)
> > counter+=1
> > while modelList[0] < len(results):
> > print 'waiting...'#wait for them to come home
> > modelList.pop(0)#now remove '0'
> > return modelList
>
> My suggestion, if you really want diagnostic help -- follow the
> common recommendation of posting the minimal /runable (if erroneous)/
> code... If "domain.get_item()" is some sort of RDBM access, you might
> fake it using a pre-loaded dictionary -- anything that allows it to
> return something when given the key value.
>
> > responses to your follow ups:
> > 1) 'item' in thethreadsis a list that corresponds to the 'data'
> > list in the above function. it's not global, and the initial values
> > seem ok, but i'm not sure if every time i pass in data to the queue it
> > passes in the same memory address or declares a new 'data' list (which
> > I guess is what I want)
>
> Rather confusing usage... In your "put" you have a list whose first
> element is "result.item", but then in the work thread, you refer to the
> entire list as "item"
>
> > 3) the first item in the modelList is a counter that keeps track of
> > the number ofthreadsfor this call that have completed - is there any
> > better way of doing this?
>
> Where? None of your posted code shows either "counter" or modelList
> being used by thethreads.
>
> And yes, if you havethreadstrying to update a shared mutable, you
> have a race condition.
>
> You also have a problem if you are using "counter" to define where
> in modelList a thread is supposed to store its results -- as you can not
> access an element that doesn't already exist...
>
> a = [0]
> a[3] = 1 #failure, need to create elements 1, 2, 3 first
>
> Now, if position is irrelevant, and a thread just appends its
> results to modelList, then you don't need some counter, all you need is
> to check the length of modelList against the count expected.
>
> Overall -- even though you are passing things via the queue, the
> contents being pass via the queue are being treated as if they were
> global entities (you could make modelList a global, remove it from the
> queue entries, and have the same net access)...
>
> IOWs, you have too much coupling between thethreadsand the feed
> routine...
>
> As for me... I'd be using a second queue for return values...
>
> WORKERTHREADS = 100
> feed = Queue.Queue()
> result = Queue.Queue()
>
> def worker():
> while True:
> (ID, item) = feed.get() #I leave the queues globals
> #since they perform locking
> #internally
> model = domain.get_item(item)
> results.put( (ID, model["title"], model["scraped"]) )
>
> for i in range(WORKERTHREADS):
> aThread = threading.Thread(target=worker)
> #overkill to subclass as there is now no specialized init
> #and if I really wanted to make the queues non-global
> #I'd pass them as arguments:
> # threading.Thread(target=worker, args=(feed, results))
> #where worker is now
> # def worker(feed, results):
> aThread.setDaemon(True)
> aThread.start()
>
> ...
>
> def prepSearch(searches):
> modelList = []
> counter = 0
> for searchItem in searches:
> feed.put( (counter, searchItem) )
> counter += 1
> modelList.append(None) #extend list one element per search
> while counter:
> (ID, title, scraped) = results.get()
> modelList[ID] = (title, scraped)
> counter -= 1
> return modelList
>
> The only place counter and modelList are modified are within the
> prepSearch. I'm passing counter out and back to use as an ID value if
> the final results are supposed to be in order -- that way if one thread
> finishes before another, the items can be placed into the list where
> they'd have been sequentially.
>
> I can only hope that "domain.get_item" is an activity that is I/O
> bound AND that it supports parallel accesses... Otherwise the above
> workerthreadsseem to be adding a lot of overhead for queue I/O and
> threading swaps for what is otherwise a rather linear process.
>
> Perhaps your posts don't reveal enough... Maybe you have multiple
> mainthreadsthat are posting to the worker feed queue (and you were
> using separate mutables for storing the results). In this situation, I'd
> remove the results queue from being a global entity, create one queue
> per main processing thread, and pass the queue as one of the parameters.
> This way, a worker can return data to any source thread by using the
> supplied queue for the return...
>
> Modify prepSearch with:
>
> myQueue = Queue.Queue()
> ...
> feed.put( (counter, searchItem, myQueue) )
> ...
> (ID, title, scraped) = myQueue.get()
>
> Modify worker with:
>
> (ID, item, retqueue) = feed.get()
> ...
> retqueue.put( (ID, model["title"], model["scraped"]) )
> --
> Wulfraed Dennis Lee Bieber KD6MOG
> wlfr... at ix.netcom.com wulfr... at bestiaria.com
> HTTP://wlfraed.home.netcom.com/
> (Bestiaria Support Staff: web-a... at bestiaria.com)
> HTTP://www.bestiaria.com/
wow. thanks for the help.
i seem to have fixed my problem though - it turns out
domain.get_item was not thread safe as it was using the underlying
httplib. the solution was to create a new connection to the database
for each thread (which is ok as the database is meant to be queried in
a massively paralell fashion). the missing part of the code included
a part where i inserted the results at the given position into the
list.
the only issue i have now is that it takes a long time for 100 threads
to initialize that connection (>5 minutes) - and as i'm doing this on
a webserver any time i update the code i have to restart all those
threads, which i'm doing right now in a for loop. is there any way I
can keep the thread stuff separate from the rest of the code for this
file, yet allow access? It wouldn't help having a .pyc or using
psycho, correct, as the time is being spent in the runtime? something
along the lines of 'start a new thread every minute until you get to a
100' without blocking the execution of the rest of the code in that
file? or maybe any time i need to do a search, start a new thread if
the #threads is <100?
thanks again
More information about the Python-list
mailing list