Bidirectional communication through pipes: read/write popen()

Donn Cave donn at u.washington.edu
Mon Oct 18 15:00:00 EDT 1999


Quoth Hrvoje Niksic <hniksic at srce.hr>:
| In all kinds of circumstances it would be very useful to call an
| external filter to process some data, and read the results back in.
| What I needed was something like popen(), only working for both
| reading and writing.  However, such a thing is hard to write in a
| simple-minded fashion because of deadlocks that occur when handling
| more than several bytes of data.  Deadlocks can either be caused by
| both programs waiting for not-yet-generated input, or (in my case) by
| both their writes being blocked waiting for the other to read.
|
| The usual choices are to:
|
| a) Write a deadlock-free communication protocol and use it on both
|    ends.  This is rarely a good solution, because the program that
|    needs to be invoked is in most cases an external filter that knows
|    nothing about our deadlock problems.
|
| b) Use PTY's instead of pipes.  Many programmers prefer to avoid this
|    path because of the added system resources that the PTY's require,
|    and because of the increased complexity.
|
| Given these choices, most people opt to use a temporary file and get
| it over with.
|
| However, discussing this problem with a colleague, he thought of a
| third solution: break the circularity by using a process only for
| reading and writing.  This can be done whenever reading and writing
| are independent, i.e. when the data read from the subprocess does not
| influence future writes to it.
|
| The function below implements that idea.  Usage is something like:
|
| rwpopen("""Some long string here...""", "sed", ["s/long/short/"])
|   -> 'Some short string here...'
|
| I've put the function to good use in a program I'm writing.  In
| addition to getting rid of temporary files, the whole operation timed
| faster than using a tmpfile (that was on a single-CPU machine).  The
| function will, of course, work only under Unix and its lookalikes.
| Additional information is embedded in the docstring and the comments.
|
| I'd like to hear feedback.  Do other people find such a thing useful?
| Is there a fundamental flaw or a possibility of a deadlock that I'm
| missing?

Interesting idea.  I was inspired to try a slightly different
approach, which I will append here.

It's definitely a solution, possibly the only general one, for
deadlocks caused by the pipe buffer size.  That's an interesting
problem, but I think a relatively unusual one.  In order to get
here, your processes need to be ignoring their input so it stalls
in the pipe ... for example, the parent might wait() for the child
and then read its output, while the child is stuck trying to
finish writing its large output.  But I am having a hard time
thinking of an example where it isn't easily avoided.  I'm also
surprised that the intermediate process would be more economical
than a temporary file, so I wonder if the resources were all
accounted for.  Temporary files do have the liability that their
filesystem may run out of space, but then it seems like a much
safer way to buffer large transfers.  

By far the most common intractable deadlock problem is internal
buffering in a command that uses C I/O and hasn't flushed its
own buffer.  This is where the pty device comes in, and to my
knowledge it's the only general cure.  It works because C I/O
switches to line buffering with a tty device.  But again this
problem can be easily avoided in a situation where all the input
for the command can be written before you wait for its output -
just close the pipe after you're finished writing to it!  The
problem really arises when you're trying to conduct an exchange
that really needs to alternate reads and writes, like, try to
write a line to "awk", read awk's output, and then write another
line to the same awk process.  To do this, you need a pty device.

Anyway, here's my attempt at the 3rd process solution.  I made
both processes children of the calling process, the 3rd process
copies I/O both ways, and the caller can issue reads and writes
to the command at its convenience.  It's a subclass of a normal
1-stage read/write command.  The 3rd process avoids blocking on
reads or writes with the select system call, which is specific
to UNIX.

#	Donn Cave, University Computing Services, University of Washington
#	donn at u.washington.edu
#----------------------------
import os
import select
import sys
import traceback

#  External command, with plain read and write dual pipe.
#
#  ex.  cmd = RWPipe('/bin/sh', ('sh', '-c', 'nslookup'))
#       os.write(cmd.input, 'set q=any\n')
#	os.write(cmd.input, 'nosuchhost\n')
#	os.close(cmd.input)
#	while 1:
#		x = os.read(cmd.output, 8192)
#		if not x:
#			break
#		print 'output:', x
#	status = cmd.wait()
#
#  I/O is unbuffered UNIX read/write, caller may make file objects.
#
class RWPipe:
	def __init__(self, command, argv, environ = None):
		self.command = command
		self.argv = argv
		if environ is None:
			self.environ = os.environ
		else:
			self.environ = environ
		self.start()
	def pipexec(self, pipes):
		for unit, use in pipes:
			os.dup2(use, unit)
		os.execve(self.command, self.argv, self.environ)
	def setpipes(self, rp, wp, xp):
		#  Close unused pipe ends.
		for p in rp:
			#  Using read end here.
			os.close(p[1])
		for p in wp:
			#  Using write end here.
			os.close(p[0])
		for p in xp:
			#  Not using this pipe here.
			os.close(p[0])
			os.close(p[1])
	def start(self):
		tocmd = os.pipe()
		frcmd = os.pipe()
		pid = os.fork()
		if not pid:
			try:
				self.setpipes([tocmd], [frcmd], [])
				self.pipexec([(0, tocmd[0]), (1, frcmd[1])])
			finally:
				traceback.print_exc()
				os._exit(127)
		self.pid = pid
		self.setpipes([frcmd], [tocmd], [])
		self.input = tocmd[1]
		self.output = frcmd[0]
	def wait(self):
		p, s = os.waitpid(self.pid, 0)
		return (s >> 8) & 0x7f

#  Industrial strength external command, with an intermediate process
#  that copies I/O, buffering as necessary to avoid deadlock due to
#  system pipe buffer size limit.
#
class BigRWPipe(RWPipe):
	def buffer(self, xferunits):
		#  Transfer I/O between pipes:  self.buffer([(from, to), ...])
		#
		xfers = []
		for r, w in xferunits:
			xfers.append((r, w, ''))
		while xfers:
			wsel = []
			rsel = []
			esel = []
			nxf = []
			for r, w, buf in xfers:
				#  Compile select masks for active units.
				if w >= 0:
					if buf:
						#  Only check for write if any
						#  data buffered to write.
						wsel.append(w)
				elif r >= 0:
					#  If dest invalid, close source.
					#  Will cause SIGPIPE in source proc.
					os.close(r)
					r = -1
				if r >= 0:
					rsel.append(r)
					esel.append(r)
				elif w >= 0 and not buf:
					#  If source invalid and no data,
					#  close dest.  Will usually cause
					#  dest to finish normally.
					os.close(w)
					w = -1
				if w >= 0:
					esel.append(w)
				if w >= 0 or r >= 0:
					nxf.append((r, w, buf))
			xfers = nxf
			if not xfers:
				break

			rdset, wdset, edset = select.select(rsel, wsel, esel)

			nxf = []
			for r, w, buf in xfers:
				if r in rdset:
					b = os.read(r, 8192)
					if b:
						buf = buf + b
					else:
						os.close(r)
						r = -1
				if r in edset:
					r = -1
				if w in wdset:
					n = os.write(w, buf)
					buf = buf[n:]
				if w in edset:
					w = -1
				if r >= 0 or w >= 0:
					nxf.append((r, w, buf))
			xfers = nxf
	def start(self):
		frcmd = os.pipe()
		tocmd = os.pipe()
		frmed = os.pipe()
		tomed = os.pipe()

		pid = os.fork()
		if not pid:
			#  Set up the buffer process.
			try:
				self.setpipes([frcmd, tomed], [tocmd, frmed], [])
				self.buffer([(frcmd[0], frmed[1]),
					(tomed[0], tocmd[1])])
			except:
				traceback.print_exc()
				sys.exit(1)
			sys.exit(0)
		self.med = pid

		pid = os.fork()
		if not pid:
			try:
				self.setpipes([tocmd], [frcmd], [tomed, frmed])
				self.pipexec([(0, tocmd[0]), (1, frcmd[1])])
			finally:
				traceback.print_exc()
				os._exit(127)
		self.pid = pid
		self.setpipes([frmed], [tomed], [frcmd, tocmd])
		self.output = frmed[0]
		self.input = tomed[1]
	def wait(self):
		p, s = os.waitpid(self.med, 0)
		p, s = os.waitpid(self.pid, 0)
		return (s >> 8) & 0x7f




More information about the Python-list mailing list