Multiprocessing interactive processes with connections

Charles_Hixson at f38.n261.z1 Charles_Hixson at f38.n261.z1
Tue Sep 6 11:25:00 EDT 2016


From: Charles Hixson <charleshixsn at earthlink.net>

I want to process a bunch of processes that all talk to each other. I've
figured out how to do this using queues with the main process as the mail
handler, but I'd rather have them talk directly.  If I use connections, then I
can pass the pipes to the processes, but there doesn't seem to be anything
analogous to queue, so it looks like I need to divide each process into two
threads, one of which does nothing but mail handling.

Is this correct?


FWIW, a small, unoptimized, test of the working "mailhandler" approach is:

## Multiprocessing:  main thread as post office.
#  This is a bit clumsy, but it works.

from  multiprocessing import Process, Queue from  queue import   Empty

def f(i, lim, q_rcv, q_snd):
    for j in range(lim):
       if i != j:
          q_snd.put([i, "message from process {0} to proess
{1}".format(i, j), j])
       if not q_rcv.empty():
          val   =  q_rcv.get()
          print (val)
    q_snd.put([i, "done"])
    while (not q_recv.empty()):
       val   =  q_rcv.get()
       print (val, " :: ", i)
       if val == "done": return


if __name__ == '__main__':
    ps =  []
    qs =  []
    q_recv   =  Queue()
    for i in range(3):
       q  =  Queue()
       qs.append (q)
       ps.append (Process(target=f, args=(i, 3, q, q_recv)) )
    for p in ps:
       p.start()
    dones =  0
    try:
       v = q_recv.get(True, 1)
       while (v):
          print ("v = ", v)
          if v[1] == "done":
             dones =  dones + 1
          else:
             if len(v) == 3:
                assert v[2] < len(qs), "unexpected value"
                qs[v[2]].put(v[1])
             else:
                print ("error:  ", v, " was an unexpected value.")
          v = q_recv.get(True, 1)
    except   Empty:
       print ("1: Timeout of maine receive queue")
    for q in qs:
       q.put("done")
    try:
       v = q_recv.get(True, 1)
       while (v):
          print ("2: v = ", v)
          if v[1] == "done":
             dones =  dones + 1
          else:
             if len(v) == 3:
                assert v[3] < len(qs), "2: unexpected value"
                qs[v[3]].put(v[2])
             else:
                print ("2: error:  ", v, " was an unexpected value.")
          v = q_recv.get(True, 1)
    except   Empty:
       print ("2: Timeout of maine receive queue")
    for i in range(len(qs)):
       qs[i].close();
       qs[i] =  None




More information about the Python-list mailing list