Multiprocessing interactive processes with connections

Charles Hixson charleshixsn at earthlink.net
Tue Sep 6 18:25:25 EDT 2016


I want to process a bunch of processes that all talk to each other.  
I've figured out how to do this using queues with the main process as 
the mail handler, but I'd rather have them talk directly.  If I use 
connections, then I can pass the pipes to the processes, but there 
doesn't seem to be anything analogous to queue, so it looks like I need 
to divide each process into two threads, one of which does nothing but 
mail handling.

Is this correct?


FWIW, a small, unoptimized, test of the working "mailhandler" approach is:

## Multiprocessing:  main thread as post office.
#  This is a bit clumsy, but it works.

from  multiprocessing import Process, Queue
from  queue import   Empty

def f(i, lim, q_rcv, q_snd):
    for j in range(lim):
       if i != j:
          q_snd.put([i, "message from process {0} to proess 
{1}".format(i, j), j])
       if not q_rcv.empty():
          val   =  q_rcv.get()
          print (val)
    q_snd.put([i, "done"])
    while (not q_recv.empty()):
       val   =  q_rcv.get()
       print (val, " :: ", i)
       if val == "done": return


if __name__ == '__main__':
    ps =  []
    qs =  []
    q_recv   =  Queue()
    for i in range(3):
       q  =  Queue()
       qs.append (q)
       ps.append (Process(target=f, args=(i, 3, q, q_recv)) )
    for p in ps:
       p.start()
    dones =  0
    try:
       v = q_recv.get(True, 1)
       while (v):
          print ("v = ", v)
          if v[1] == "done":
             dones =  dones + 1
          else:
             if len(v) == 3:
                assert v[2] < len(qs), "unexpected value"
                qs[v[2]].put(v[1])
             else:
                print ("error:  ", v, " was an unexpected value.")
          v = q_recv.get(True, 1)
    except   Empty:
       print ("1: Timeout of maine receive queue")
    for q in qs:
       q.put("done")
    try:
       v = q_recv.get(True, 1)
       while (v):
          print ("2: v = ", v)
          if v[1] == "done":
             dones =  dones + 1
          else:
             if len(v) == 3:
                assert v[3] < len(qs), "2: unexpected value"
                qs[v[3]].put(v[2])
             else:
                print ("2: error:  ", v, " was an unexpected value.")
          v = q_recv.get(True, 1)
    except   Empty:
       print ("2: Timeout of maine receive queue")
    for i in range(len(qs)):
       qs[i].close();
       qs[i] =  None




More information about the Python-list mailing list