Bi-directional sub-process communication

Ian Kelly ian.g.kelly at gmail.com
Mon Nov 23 14:55:49 EST 2015


On Mon, Nov 23, 2015 at 10:54 AM, Israel Brewster <israel at ravnalaska.net> wrote:
> Concern: Since the master process is multi-threaded, it seems likely enough that multiple threads on the master side would make requests at the same time. I understand that the Queue class has locks that make this fine (one thread will complete posting the message before the next is allowed to start), and since the child process only has a single thread processing messages from the queue, it should process them in order and post the responses (if any) to the master_queue in order. But now I have multiple master processes all trying to read master_queue at the same time. Again, the locks will take care of this and prevent any overlapping reads, but am I guaranteed that the threads will obtain the lock and therefore read the responses in the right order? Or is there a possibility that, say, thread three will get the response that should have been for thread one? Is this something I need to take into consideration, and if so, how?

Yes, if multiple master threads are waiting on the queue, it's
possible that a master thread could get a response that was not
intended for it. As far as I know there's no guarantee that the
waiting threads will be woken up in the order that they called get(),
but even if there are, consider this case:

Thread A enqueues a request.
Thread B preempts A and enqueues a request.
Thread B calls get on the response queue.
Thread A calls get on the response queue.
The response from A's request arrives and is given to B.

Instead of having the master threads pull objects off the response
queue directly, you might create another thread whose sole purpose is
to handle the response queue. That could look like this:


request_condition = threading.Condition()
response_global = None

def master_thread():
    global response_global
    with request_condition:
        request_queue.put(request)
        request_condition.wait()
        # Note: the Condition should remain acquired until
response_global is reset.
        response = response_global
        response_global = None
    if wrong_response(response):
        raise RuntimeError("got a response for the wrong request")
    handle_response(response)

def response_thread():
    global response_global
    while True:
        response = response_queue.get()
        with request_condition:
            response_global = response
            request_condition.notify()


As another option you could use a multiprocessing.Manager to
coordinate passing the response back more directly, but starting a
third process seems like overkill for this.



More information about the Python-list mailing list