Threading.Condition problem

Gabriel Rossetti gabriel.rossetti at arimaz.com
Mon Jul 13 13:28:27 EDT 2009


Piet van Oostrum wrote:
>>>>>> Gabriel Rossetti <gabriel.rossetti at arimaz.com> (GR) wrote:
>>>>>>             
>
>   
>> GR> Sorry if this appears twice, I sent it once with an attachment and it never
>> GR> arrived so maybe the attachment is posing problems. I inlined the code this
>> GR> time (at the bottom), thank you,
>>     
>
>   
>> GR> Gabriel
>>     
>
>   
>> GR> ########################## Original message ############################
>>     
>
>   
>> GR> Hello everyone,
>>     
>
>   
>> GR> I wrote a small example that listens for xmpp msgs in a thread. The main
>> GR> program calls a function that blocks (using Condition.wait) until a msg
>> GR> has been received and then returns the msg. When a msg arrives, it is
>> GR> put in a variable in the thread's object, it then calls the notify()
>> GR> attr on the Condition object. For some reason, this doesn't work, the
>> GR> thread gets the msg, tries to notify the Condition object, fails because
>> GR> the lock has not been acquired yet and blocks. I tried ignoring the
>> GR> failure, thinking that since it has not been acquired yet then when it
>> GR> is, it will get the msg right away and never call Condition.wait, thus
>> GR> not causing any problems, but this does not work either. Does someone
>> GR> know what I am doing wrong? I attached the code to this msg.
>>     
>
> The code that puts the message in the variable should also acquire the
> lock:
>
>
>      def onMessage(self, conn, msg):
>          with self._cv:
>              self.message = msg
>              self._cv.notify()
>   

Thank you, that was the problem, I eventually found that
> A couple of remarks:
>
> 1. I think the code is neater if all manipulation with the condition is
>    done in the same class (actually in the same instance -- making this
>    instance into a monitor).
>   

The reason I didn't do that is that I don' t want the Listener to sleep, 
I maybe over simplified the example, I actually put them in a dictionary 
as they come in, so in your example, if I have several threads waiting 
on msgs it wouldn't work. I'm trying to make a webservice api thay will 
also be turned into a java .jar for people that need java. Now that I 
think about it, each session will have an instance of the object so msgs 
shouldn' t get mixed up (one connection per user), so I could block in 
the thread. I'll try your suggestion as I think it is cleaner.

> class Listener(Thread):
>     def __init__(self, ws):
>         Thread.__init__(self)
>         self.interrupt = Event()
>         self.message = None
>         self._cv = Condition()
>         self.client = ws._client
>         self.client.RegisterHandler('message', self.onMessage)
>     
>     def onMessage(self, conn, msg):
>         with self._cv:
>             self.message = msg
>             try:
>                 self._cv.notify()
>             except RuntimeError:
>                 print "self._cv has not acquired the lock yet"
>     
>     def getMsg(self):
>         with self._cv:
>             while !self.message
>                 self._cv.wait()
>             return self.message
>
> class WS(object):
>     def __init__(self, username, password, res):
>         self._jid = xmpp.protocol.JID(username)
>         self._client = xmpp.Client(self._jid.getDomain())
> #        self._cv = Condition()
>     
>     def getMsg(self, mid=None):
>         """
>         """
>         return self._listener.getMsg()
>
> Of course I haven't tested this code as I don't have the context
> modules.
>
> 2. I don't know if more than one message can be delivered in the same
>    instance. If yes, than your code will not work, and neither will the
>    code above as, the message instance variable is never cleared. So the
>    next getMsg will be happy to deliver the previous one.
>    You would have to clear it when returning this one.
>     
>   
Like I said above, in reality I have a dict not just a simple variable.
>     def getMsg(self):
>         with self._cv:
>             while !self.message
>                 self._cv.wait()
>             msg = self.message
>             self.message = None
>             return msg
>
> 3. If the messages come in faster than they can be processed some will
>    be lost as they will overwrite the previous one in the self.message
>    variable. The solution is to use a threading.Queue to transfer the
>    messages from one thread to the other. This also saves you the hassle
>    of doing your own synchronisation like above. If you are not familiar
>    with synchronising multithreaded applications it is very easy to make
>    errors and even if you are it is quite easy to do them wrong. I have
>    been involved in distributed programming courses at university level
>    and I have seen many errors in this area.
>   
I used a dict because the API can also be setup to be async and use 
callbacks, so I had to be able to
access the msgs directly and quickly.

Gabriel



More information about the Python-list mailing list