Multiprocessing queue sharing and python3.8

David Raymond David.Raymond at tomtom.com
Mon Apr 6 16:27:47 EDT 2020


Looks like this will get what you need.


def some_complex_function(x):
    global q
    #stuff using q

def pool_init(q2):
    global q
    q = q2

def main():
    #initalize the Queue
    mp_comm_queue = mp.Queue()
    
    #Set up a pool to process a bunch of stuff in parallel
    pool = mp.Pool(initializer = pool_init, initargs = (mp_comm_queue,))
    ...



-----Original Message-----
From: David Raymond 
Sent: Monday, April 6, 2020 4:19 PM
To: python-list at python.org
Subject: RE: Multiprocessing queue sharing and python3.8

Attempting reply as much for my own understanding.

Are you on Mac? I think this is the pertinent bit for you:
Changed in version 3.8: On macOS, the spawn start method is now the default. The fork start method should be considered unsafe as it can lead to crashes of the subprocess. See bpo-33725.

When you start a new process (with the spawn method) it runs the module just like it's being imported. So your global " mp_comm_queue2=mp.Queue()" creates a new Queue in each process. Your initialization of mp_comm_queue is also done inside the main() function, which doesn't get run in each process. So each process in the Pool is going to have mp_comm_queue as None, and have its own version of mp_comm_queue2. The ID being the same or different is the result of one or more processes in the Pool being used repeatedly for the multiple steps in imap, probably because the function that the Pool is executing finishes so quickly.

Add a little extra info to the print calls (and/or set up logging to stdout with the process name/id included) and you can see some of this. Here's the hacked together changes I did for that.

import multiprocessing as mp
import os

mp_comm_queue = None #Will be initalized in the main function
mp_comm_queue2 = mp.Queue() #Test pre-initalized as well

def some_complex_function(x):
    print("proc id", os.getpid())
    print("mp_comm_queue", mp_comm_queue)
    print("queue2 id", id(mp_comm_queue2))
    mp_comm_queue2.put(x)
    print("queue size", mp_comm_queue2.qsize())
    print("x", x)
    return x * 2

def main():
    global mp_comm_queue
    #initalize the Queue
    mp_comm_queue = mp.Queue()
    
    #Set up a pool to process a bunch of stuff in parallel
    pool = mp.Pool()
    values = range(20)
    data = pool.imap(some_complex_function, values)
    
    for val in data:
        print(f"**{val}**")
    print("final queue2 size", mp_comm_queue2.qsize())
    
if __name__ == "__main__":
    main()



When making your own Process object and stating it then the Queue should be passed into the function as an argument, yes. The error text seems to be part of the Pool implementation, which I'm not as familiar with enough to know the best way to handle it. (Probably something using the "initializer" and "initargs" arguments for Pool)(maybe)



-----Original Message-----
From: Python-list <python-list-bounces+david.raymond=tomtom.com at python.org> On Behalf Of Israel Brewster
Sent: Monday, April 6, 2020 1:24 PM
To: Python <python-list at python.org>
Subject: Multiprocessing queue sharing and python3.8

Under python 3.7 (and all previous versions I have used), the following code works properly, and produces the expected output:

import multiprocessing as mp

mp_comm_queue = None #Will be initalized in the main function
mp_comm_queue2=mp.Queue() #Test pre-initalized as well

def some_complex_function(x):
    print(id(mp_comm_queue2))
    assert(mp_comm_queue is not None)
    print(x)    
    return x*2
        
def main():
    global mp_comm_queue
    #initalize the Queue
    mp_comm_queue=mp.Queue()
    
    #Set up a pool to process a bunch of stuff in parallel
    pool=mp.Pool()
    values=range(20)
    data=pool.imap(some_complex_function,values)
    
    for val in data:
        print(f"**{val}**")
        
if __name__=="__main__":
    main()

- mp_comm_queue2 has the same ID for all iterations of some_complex_function, and the assert passes (mp_comm_queue is not None). However, under python 3.8, it fails - mp_comm_queue2 is a *different* object for each iteration, and the assert fails. 

So what am I doing wrong with the above example block? Assuming that it broke in 3.8 because I wasn’t sharing the Queue properly, what is the proper way to share a Queue object among multiple processes for the purposes of inter-process communication?

The documentation (https://docs.python.org/3.8/library/multiprocessing.html#exchanging-objects-between-processes <https://docs.python.org/3.8/library/multiprocessing.html#exchanging-objects-between-processes>) appears to indicate that I should pass the queue as an argument to the function to be executed in parallel, however that fails as well (on ALL versions of python I have tried) with the error:

Traceback (most recent call last):
  File "test_multi.py", line 32, in <module>
    main()
  File "test_multi.py", line 28, in main
    for val in data:
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 748, in next
    raise value
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 431, in _handle_tasks
    put(task)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/queues.py", line 58, in __getstate__
    context.assert_spawning(self)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/context.py", line 356, in assert_spawning
    ' through inheritance' % type(obj).__name__
RuntimeError: Queue objects should only be shared between processes through inheritance

after I add the following to the code to try passing the queue rather than having it global:

#Try by passing queue
values=[(x,mp_comm_queue) for x in range(20)]
data=pool.imap(some_complex_function,values)
for val in data:
    print(f"**{val}**")   

So if I can’t pass it as an argument, and having it global is incorrect (at least starting with 3.8), what is the proper method of getting multiprocessing queues to child processes?

---
Israel Brewster
Software Engineer
Alaska Volcano Observatory 
Geophysical Institute - UAF 
2156 Koyukuk Drive 
Fairbanks AK 99775-7320
Work: 907-474-5172
cell:  907-328-9145

-- 
https://mail.python.org/mailman/listinfo/python-list


More information about the Python-list mailing list