Pipe IO Problem?

Chris S. chrisks at NOSPAM.udel.edu
Mon Sep 6 06:37:20 EDT 2004


A wrote a small class to handle IO through pipes, but the connection 
only seems to work in one direction. The following code defines 
connection.py, engine.py, and controller.py. Once connected, the engine 
is only able to send and the controller recieve. Also, this only works 
when using popen2.popen3. Using os.popen3 doesn't work at all and seems 
to behave completely differently.
What could be causing these problems? Any help is greatly appreciated.

##-------------------------------------------
# connection.py
import sys
import threading
import socket
import time

class ConnectionError(Exception):
     '''Encapsulates connection errors.'''
     pass

class Connection:
     '''Base class for an interprocess connection.

     This class is meant to be used in two primary ways:
         First, as a caller, the initiator of the connection.
         Second, as a callee, the recepiant of the connection.

     However, once the connection has been made, general use should
     behave symmetrically.'''

     def __init__(self, in_handler=None):
         self.connected = False
         self._recv_handler = in_handler
         self._recv_buffer = [] # stores data if no in_handler given
         self._recv_lock = threading.Lock()
         self._send_lock = threading.Lock()

     def _send_raw(self, string):
         '''Outgoing handler.'''
         raise Exception, 'this method must be overridden'

     def _recv_raw(self):
         '''Incoming handler.'''
         raise Exception, 'this method must be overridden'

     def _launch_recv_handler(self):
         '''Launches general reception handler.'''
         assert not self.connected, 'connection already open'
         print 'launching recv handler'
         self.connected = True
         t = threading.Thread(target=self._recv_raw)
         t.setDaemon(True)
         t.start()

     def _launch_data_handler(self, data):
         '''Launches user defined reception handler.'''

         if self._recv_handler: # launch custom handler (ie in_handler) 
if present
             t = threading.Thread(target=self._recv_handler, args=(data,))
             t.setDaemon(True)
             t.start()
         else: # otherwise append to buffer
             self._recv_lock.acquire()
             self._recv_buffer.append(data)
             self._recv_lock.release()

     def open(self):
         raise Exception, 'this method must be overridden'

     def close(self, message=None):
         raise Exception, 'this method must be overridden'

     def send(self, data):
         assert self.connected, 'not connected'
         self._send_raw(data)

     def recv(self):
         '''Blocks until it has data to return.
         Only use this if you haven't specified a reception handler.'''
         assert self.connected, 'not connected'

         # wait for data
         while not len(self._recv_buffer):
             time.sleep(0.1)

         # get data
         self._recv_lock.acquire()
         data = self._recv_buffer.pop(0)
         self._recv_lock.release()

         return data

     def pending(self):
         '''True if pending data in the buffer. False otherwise.'''
         return bool(len(self._recv_buffer))

     def name(self):
         pass

class Pipe(Connection):
     '''Base class for a pipe connection.'''

     def __init__(self, *args, **kargs):
         Connection.__init__(self, *args, **kargs)

     def _send_raw(self, string):
         assert self.connected, 'not connected'

         try:
             self._send_lock.acquire()
             self._outp.write(string+'\n')
             self._outp.flush()
             self._send_lock.release()
         except Exception, e:
             self._send_lock.release()
             self.close(e)
             raise Exception, e

     def _recv_raw(self):

         while self.connected:
             # get data
             try:
                 data = self._inp.readline()
             except Exception, e:
                 self.close(e)
                 break
             if not len(data):
                 time.sleep(0.1)
                 continue

             # launch handler
             self._launch_data_handler(data)

     def open(self, target=None):
         assert not self.connected, 'connection already open'

         if target:
             #import os # different functionality?
             #(self._inp, self._outp, self._errp) = os.popen3(target)
             import popen2
             (self._inp, self._outp, self._errp) = popen2.popen3(target)
         else:
             from sys import stdin, stdout
             self._inp = stdin
             self._outp = stdout

         self._launch_recv_handler()

     def close(self, message=None):
         assert self.connected, 'connection already closed'

         self.connected = False
         self._inp.close()
         self._inp = None
         self._outp.close()
         self._outp = None
         self._errp.close()
         self._errp = None

         if message:
             print message


##-------------------------------------------
# engine.py
import time

from connection import *

outfile = open('enginetest.data', 'w')

def print_received(data):
     print >>outfile, data

conn = Pipe(in_handler=print_received)
conn.open()

for i in range(5):
     time.sleep(1)
     conn.send('engine '+str(i))

outfile.close()


##-------------------------------------------
# controller.py
import time

from connection import *

def print_received(data):
     print 'controller received:',data

conn = Pipe(in_handler=print_received)
conn.open('pipetest_engine.py')

for i in range(5):
     print 'controller',i
     time.sleep(1)
     conn.send(str(i))



More information about the Python-list mailing list