Multiprocessing queue sharing and python3.8

Israel Brewster ijbrewster at alaska.edu
Mon Apr 6 13:23:52 EDT 2020


Under python 3.7 (and all previous versions I have used), the following code works properly, and produces the expected output:

import multiprocessing as mp

mp_comm_queue = None #Will be initalized in the main function
mp_comm_queue2=mp.Queue() #Test pre-initalized as well

def some_complex_function(x):
    print(id(mp_comm_queue2))
    assert(mp_comm_queue is not None)
    print(x)    
    return x*2
        
def main():
    global mp_comm_queue
    #initalize the Queue
    mp_comm_queue=mp.Queue()
    
    #Set up a pool to process a bunch of stuff in parallel
    pool=mp.Pool()
    values=range(20)
    data=pool.imap(some_complex_function,values)
    
    for val in data:
        print(f"**{val}**")
        
if __name__=="__main__":
    main()

- mp_comm_queue2 has the same ID for all iterations of some_complex_function, and the assert passes (mp_comm_queue is not None). However, under python 3.8, it fails - mp_comm_queue2 is a *different* object for each iteration, and the assert fails. 

So what am I doing wrong with the above example block? Assuming that it broke in 3.8 because I wasn’t sharing the Queue properly, what is the proper way to share a Queue object among multiple processes for the purposes of inter-process communication?

The documentation (https://docs.python.org/3.8/library/multiprocessing.html#exchanging-objects-between-processes <https://docs.python.org/3.8/library/multiprocessing.html#exchanging-objects-between-processes>) appears to indicate that I should pass the queue as an argument to the function to be executed in parallel, however that fails as well (on ALL versions of python I have tried) with the error:

Traceback (most recent call last):
  File "test_multi.py", line 32, in <module>
    main()
  File "test_multi.py", line 28, in main
    for val in data:
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 748, in next
    raise value
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 431, in _handle_tasks
    put(task)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/queues.py", line 58, in __getstate__
    context.assert_spawning(self)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/context.py", line 356, in assert_spawning
    ' through inheritance' % type(obj).__name__
RuntimeError: Queue objects should only be shared between processes through inheritance

after I add the following to the code to try passing the queue rather than having it global:

#Try by passing queue
values=[(x,mp_comm_queue) for x in range(20)]
data=pool.imap(some_complex_function,values)
for val in data:
    print(f"**{val}**")   

So if I can’t pass it as an argument, and having it global is incorrect (at least starting with 3.8), what is the proper method of getting multiprocessing queues to child processes?

---
Israel Brewster
Software Engineer
Alaska Volcano Observatory 
Geophysical Institute - UAF 
2156 Koyukuk Drive 
Fairbanks AK 99775-7320
Work: 907-474-5172
cell:  907-328-9145



More information about the Python-list mailing list