Multiprocessing queue sharing and python3.8

Israel Brewster ijbrewster at alaska.edu
Mon Apr 6 17:34:15 EDT 2020


> On Apr 6, 2020, at 12:19 PM, David Raymond <David.Raymond at tomtom.com> wrote:
> 
> Attempting reply as much for my own understanding.
> 
> Are you on Mac? I think this is the pertinent bit for you:
> Changed in version 3.8: On macOS, the spawn start method is now the default. The fork start method should be considered unsafe as it can lead to crashes of the subprocess. See bpo-33725.

Ahhh, yep, that would do it! Using spawn rather than fork completely explains all the issues I was suddenly seeing. Didn’t even occur to me that the os I was running might make a difference. And yes, forcing it back to using fork does indeed “fix” the issue. Of course, as is noted there, the fork start method should be considered unsafe, so I guess I get to re-architect everything I do using multiprocessing that relies on data-sharing between processes. The Queue example was just a minimum working example that illustrated the behavioral differences I was seeing :-) Thanks for the pointer!

---
Israel Brewster
Software Engineer
Alaska Volcano Observatory 
Geophysical Institute - UAF 
2156 Koyukuk Drive 
Fairbanks AK 99775-7320
Work: 907-474-5172
cell:  907-328-9145

> 
> When you start a new process (with the spawn method) it runs the module just like it's being imported. So your global " mp_comm_queue2=mp.Queue()" creates a new Queue in each process. Your initialization of mp_comm_queue is also done inside the main() function, which doesn't get run in each process. So each process in the Pool is going to have mp_comm_queue as None, and have its own version of mp_comm_queue2. The ID being the same or different is the result of one or more processes in the Pool being used repeatedly for the multiple steps in imap, probably because the function that the Pool is executing finishes so quickly.
> 
> Add a little extra info to the print calls (and/or set up logging to stdout with the process name/id included) and you can see some of this. Here's the hacked together changes I did for that.
> 
> import multiprocessing as mp
> import os
> 
> mp_comm_queue = None #Will be initalized in the main function
> mp_comm_queue2 = mp.Queue() #Test pre-initalized as well
> 
> def some_complex_function(x):
>    print("proc id", os.getpid())
>    print("mp_comm_queue", mp_comm_queue)
>    print("queue2 id", id(mp_comm_queue2))
>    mp_comm_queue2.put(x)
>    print("queue size", mp_comm_queue2.qsize())
>    print("x", x)
>    return x * 2
> 
> def main():
>    global mp_comm_queue
>    #initalize the Queue
>    mp_comm_queue = mp.Queue()
> 
>    #Set up a pool to process a bunch of stuff in parallel
>    pool = mp.Pool()
>    values = range(20)
>    data = pool.imap(some_complex_function, values)
> 
>    for val in data:
>        print(f"**{val}**")
>    print("final queue2 size", mp_comm_queue2.qsize())
> 
> if __name__ == "__main__":
>    main()
> 
> 
> 
> When making your own Process object and stating it then the Queue should be passed into the function as an argument, yes. The error text seems to be part of the Pool implementation, which I'm not as familiar with enough to know the best way to handle it. (Probably something using the "initializer" and "initargs" arguments for Pool)(maybe)
> 
> 
> 
> -----Original Message-----
> From: Python-list <python-list-bounces+david.raymond=tomtom.com at python.org <mailto:python-list-bounces+david.raymond=tomtom.com at python.org>> On Behalf Of Israel Brewster
> Sent: Monday, April 6, 2020 1:24 PM
> To: Python <python-list at python.org <mailto:python-list at python.org>>
> Subject: Multiprocessing queue sharing and python3.8
> 
> Under python 3.7 (and all previous versions I have used), the following code works properly, and produces the expected output:
> 
> import multiprocessing as mp
> 
> mp_comm_queue = None #Will be initalized in the main function
> mp_comm_queue2=mp.Queue() #Test pre-initalized as well
> 
> def some_complex_function(x):
>    print(id(mp_comm_queue2))
>    assert(mp_comm_queue is not None)
>    print(x)    
>    return x*2
> 
> def main():
>    global mp_comm_queue
>    #initalize the Queue
>    mp_comm_queue=mp.Queue()
> 
>    #Set up a pool to process a bunch of stuff in parallel
>    pool=mp.Pool()
>    values=range(20)
>    data=pool.imap(some_complex_function,values)
> 
>    for val in data:
>        print(f"**{val}**")
> 
> if __name__=="__main__":
>    main()
> 
> - mp_comm_queue2 has the same ID for all iterations of some_complex_function, and the assert passes (mp_comm_queue is not None). However, under python 3.8, it fails - mp_comm_queue2 is a *different* object for each iteration, and the assert fails. 
> 
> So what am I doing wrong with the above example block? Assuming that it broke in 3.8 because I wasn’t sharing the Queue properly, what is the proper way to share a Queue object among multiple processes for the purposes of inter-process communication?
> 
> The documentation (https://docs.python.org/3.8/library/multiprocessing.html#exchanging-objects-between-processes <https://docs.python.org/3.8/library/multiprocessing.html#exchanging-objects-between-processes> <https://docs.python.org/3.8/library/multiprocessing.html#exchanging-objects-between-processes <https://docs.python.org/3.8/library/multiprocessing.html#exchanging-objects-between-processes>>) appears to indicate that I should pass the queue as an argument to the function to be executed in parallel, however that fails as well (on ALL versions of python I have tried) with the error:
> 
> Traceback (most recent call last):
>  File "test_multi.py", line 32, in <module>
>    main()
>  File "test_multi.py", line 28, in main
>    for val in data:
>  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 748, in next
>    raise value
>  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 431, in _handle_tasks
>    put(task)
>  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 206, in send
>    self._send_bytes(_ForkingPickler.dumps(obj))
>  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/reduction.py", line 51, in dumps
>    cls(buf, protocol).dump(obj)
>  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/queues.py", line 58, in __getstate__
>    context.assert_spawning(self)
>  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/context.py", line 356, in assert_spawning
>    ' through inheritance' % type(obj).__name__
> RuntimeError: Queue objects should only be shared between processes through inheritance
> 
> after I add the following to the code to try passing the queue rather than having it global:
> 
> #Try by passing queue
> values=[(x,mp_comm_queue) for x in range(20)]
> data=pool.imap(some_complex_function,values)
> for val in data:
>    print(f"**{val}**")   
> 
> So if I can’t pass it as an argument, and having it global is incorrect (at least starting with 3.8), what is the proper method of getting multiprocessing queues to child processes?
> 
> ---
> Israel Brewster
> Software Engineer
> Alaska Volcano Observatory 
> Geophysical Institute - UAF 
> 2156 Koyukuk Drive 
> Fairbanks AK 99775-7320
> Work: 907-474-5172
> cell:  907-328-9145
> 
> -- 
> https://mail.python.org/mailman/listinfo/python-list <https://mail.python.org/mailman/listinfo/python-list>
> -- 
> https://mail.python.org/mailman/listinfo/python-list <https://mail.python.org/mailman/listinfo/python-list>


More information about the Python-list mailing list