Problem with copy.deepcopy and multiprocessing.Queue

James DeVincentis admin at hexhost.net
Sat Oct 17 16:19:15 EDT 2015


So, whatever is causing this is a bit deeper in the multiprocessing.Queue class. I tried using a non-blocking  multiprocessing.Queue.get() by setting the first parameter to false and then catching the queue.Empty exception. For some reason even though there are objects in the queue (as evidenced by multiprocessing.Queue.qsize(), the queue.Empty exception is being thrown.

I dug into the multiprocessing.Queue class it looks like when calling multiprocessing.Queue.get() multiprocessing.Queue._poll() is coming up false causing it to throw the Empty exception. However, according to multiprocessing.Queue._sem there are still locks acquired on it thus giving me a multiprocessing.Queue.qsize() > 0.

It looks like it is possible for a process to die (which includes that process’s local multiprocessing.Queue._feed() thread) off before multiprocessing.Queue._buffer is fed out using multiprocessing.Queue._feed().

Thoughts from anyone? 


> On Oct 15, 2015, at 7:37 PM, James DeVincentis <admin at hexhost.net> wrote:
> 
> Looking into it, I seem to have found a race condition where a multiprocessing.Queue.get() can get hung waiting for an object even if there are objects in the Queue when being placed into the queue by a forked process and then the process ending quickly. 
> 
> I don’t know how to track this down any farther or fix it.
> 
> 1. Server gets booted. This spawns the following
> - API Server - HTTPServer(socketserver.ForkingMixIn, http.server.HTTPServer) - https://github.com/jmdevince/cifpy3/blob/master/lib/cif/api/server.py
> - Workers - Process(multiprocessing.Process) - https://github.com/jmdevince/cifpy3/blob/master/lib/cif/worker/worker.py#L93
> 
> 2. HTTP Request comes in and is handled by Handler(http.server.BaseHTTPRequestHandler) in this case using do_PUT https://github.com/jmdevince/cifpy3/blob/master/lib/cif/api/handler.py#L226
> 3. Object is created and placed in the global queue: https://github.com/jmdevince/cifpy3/blob/master/lib/cif/api/handler.py#L283
> 4. HTTP Request Process dies since request has been handled.
> 5. Worker Process has task manager thread that takes the object from a global queue and places it into a local queue https://github.com/jmdevince/cifpy3/blob/master/lib/cif/worker/worker.py#L64
> 6. one of many (30+) threads on the worker process picks up the object from the local queue and processes it https://github.com/jmdevince/cifpy3/blob/master/lib/cif/worker/worker.py#L12
> 
> It seems the task management threads are getting hung up waiting for an object from the global queue, even though the global queue has objects in it…
> 
> Ignore the ERRORS they aren’t actually errors. I did it for temporary visibility.
> 
> 2015-10-16 00:20:07 Manager #3   ERROR    Put <cif.types.observables.fqdn.Fqdn object at 0x7f4384d53278> into local queue
> 2015-10-16 00:20:07 Manager #4   ERROR    Got <cif.types.observables.fqdn.Fqdn object at 0x7f43844e40f0> from global queue
> 2015-10-16 00:20:07 Manager #4   ERROR    Put <cif.types.observables.fqdn.Fqdn object at 0x7f43844e40f0> into local queue
> 2015-10-16 00:23:05 Manager #1   ERROR    Waiting for item from global queue
> 2015-10-16 00:23:05 Manager #2   ERROR    Waiting for item from global queue
> 2015-10-16 00:23:05 Manager #3   ERROR    Waiting for item from global queue
> 2015-10-16 00:23:05 Manager #4   ERROR    Waiting for item from global queue
> 2015-10-16 00:23:08 Manager #1   ERROR    Got <cif.types.observables.fqdn.Fqdn object at 0x7f8892fcf080> from global queue
> 2015-10-16 00:23:08 Manager #1   ERROR    Put <cif.types.observables.fqdn.Fqdn object at 0x7f8892fcf080> into local queue
> 2015-10-16 00:23:08 Manager #1   ERROR    Waiting for item from global queue
> 2015-10-16 00:23:08 Manager #2   ERROR    Got <cif.types.observables.fqdn.Fqdn object at 0x7f8892fce4a8> from global queue
> 2015-10-16 00:23:08 Manager #2   ERROR    Put <cif.types.observables.fqdn.Fqdn object at 0x7f8892fce4a8> into local queue
> 2015-10-16 00:23:08 Manager #2   ERROR    Waiting for item from global queue
> — Nothing else gets logged by the manager threads at all, even if I continue adding objects to the global queue. 
> 
> 
> 2015-10-16 00:25:00 worker #1    DEBUG    Local Queue Size: 0
> 2015-10-16 00:25:00 worker #3    DEBUG    Local Queue Size: 0
> 2015-10-16 00:25:00 worker #4    DEBUG    Local Queue Size: 0
> 2015-10-16 00:25:00 worker #2    DEBUG    Local Queue Size: 0
> 2015-10-16 00:25:00 MAIN         DEBUG    Global Queue Size: 572
> 
> 2015-10-16 00:25:05 worker #1    DEBUG    Local Queue Size: 0
> 2015-10-16 00:25:05 worker #3    DEBUG    Local Queue Size: 0
> 2015-10-16 00:25:05 worker #4    DEBUG    Local Queue Size: 0
> 2015-10-16 00:25:05 worker #2    DEBUG    Local Queue Size: 0
> 2015-10-16 00:25:05 MAIN         DEBUG    Global Queue Size: 572
> 
> 
> 
> 
>> On Oct 15, 2015, at 5:02 PM, James DeVincentis <admin at hexhost.net> wrote:
>> 
>> Anyone have any ideas? I feel like this could be a bug with the garbage collector across multiprocessing.
>> 
>> From: James DeVincentis [mailto:admin at hexhost.net] 
>> Sent: Wednesday, October 14, 2015 12:41 PM
>> To: 'python-list at python.org'
>> Subject: Problem with copy.deepcopy and multiprocessing.Queue
>> 
>> I’ve got a bit of a problem with copy.deepcopy and using multiprocessing.Queue. 
>> 
>> I have an HTTPAPI that gets exposed to add objects to a multiprocessing.Qeue. Source code here: https://github.com/jmdevince/cifpy3/blob/master/lib/cif/api/handler.py#L283
>> 
>> The trouble is, even using deepcopy, my debugging shows that it keeps re-using the same address space on every iteration of the loop (and/or fork since it uses the Forking TCP server mixin). 
>> 
>> This is reflected when the reference to that address space gets removed from the Queue. Source code: https://github.com/jmdevince/cifpy3/blob/master/lib/cif/worker/worker.py
>> 
>> So I submit a few objects very rapidly to the HTTP server. It places them in the queue. For some reason on the second iteration copy.deepcopy stops using new address spaces for deepcopied objects. As such the queue processes that object only once (even though Queue.put() is called repeatedly for the same address space). 
>> 
>> I’ve tried ‘del’ the object before reusing it but it still reallocates to the same address space. I’m at a complete loss. 
>> 
>> 2015-10-14 17:03:30 APIHTTP      WARNING  Storing object: <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbcb2e8>
>> 2015-10-14 17:03:30 THREAD #1-1  WARNING  Got Observable with Object: <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbdb978>
>> 2015-10-14 17:03:30 APIHTTP      WARNING  Storing object: <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbcb0f0>
>> 2015-10-14 17:03:30 THREAD #2-1  WARNING  Got Observable with Object: <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbdab70>
>> 2015-10-14 17:03:30 APIHTTP      WARNING  Storing object: <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbcb0f0>
>> 2015-10-14 17:03:30 APIHTTP      WARNING  Storing object: <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbcb0f0>
>> 2015-10-14 17:03:30 APIHTTP      WARNING  Storing object: <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbcb0f0>
>> 2015-10-14 17:03:30 APIHTTP      WARNING  Storing object: <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbcb0f0>
>> 2015-10-14 17:03:30 APIHTTP      WARNING  Storing object: <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbcb0f0>
>> 2015-10-14 17:03:30 APIHTTP      WARNING  Storing object: <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbcb0f0>
>> 2015-10-14 17:03:30 APIHTTP      WARNING  Storing object: <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbcb0f0>
>> 
>> 
>> -- 
>> https://mail.python.org/mailman/listinfo/python-list
> 
> -- 
> https://mail.python.org/mailman/listinfo/python-list




More information about the Python-list mailing list