[Python-checkins] CVS: distutils/test test_spawn.py,1.1,1.2

Greg Ward python-dev@python.org
Sun, 4 Jun 2000 06:44:06 -0700


Update of /cvsroot/python/distutils/test
In directory slayer.i.sourceforge.net:/tmp/cvs-serv31650

Modified Files:
	test_spawn.py 
Log Message:
[from 2000/04/23]
Hopeless attempt to make this test script for the 'spawn' module more useful,
reliable, and portable.  I have since abandoned hope of testing 'spawn'
portably -- perhaps I'll write a Unix version, and just hope that someone
out there writes a Windows version.  ;-(

Index: test_spawn.py
===================================================================
RCS file: /cvsroot/python/distutils/test/test_spawn.py,v
retrieving revision 1.1
retrieving revision 1.2
diff -C2 -r1.1 -r1.2
*** test_spawn.py	1999/09/29 13:10:23	1.1
--- test_spawn.py	2000/06/04 13:44:03	1.2
***************
*** 2,16 ****
  # test_spawn.py
  #
! # test suite for distutils.util.spawn() (currently Unix-only)
  #
  # GPW 1999/07/20
  #
! # $Id: test_spawn.py,v 1.1 1999/09/29 13:10:23 gward Exp $
  #
  
! import sys
  from distutils.spawn import spawn
  from distutils.errors import *
  from util import try_it
  
  spawn (["/bin/ls"])
--- 2,222 ----
  # test_spawn.py
  #
! # test script for distutils.spawn module
  #
  # GPW 1999/07/20
  #
! # $Id: test_spawn.py,v 1.2 2000/06/04 13:44:03 gward Exp $
  #
  
! import sys, os, string
! from tempfile import mktemp
  from distutils.spawn import spawn
+ from distutils.util import abspath
  from distutils.errors import *
+ from unittest import TestScenario, parse_args, run_scenarios
  from util import try_it
+ 
+ class SpawnTest (TestScenario):
+ 
+     def setup (self):
+         self.python = os.path.normpath (abspath (sys.executable))
+ 
+         self.cmd_trivial = self.gen_cmd ('1')
+         self.cmd_trivial_s = string.join (self.cmd_trivial)
+         self.cmd_print = self.gen_cmd ('print "hello"')
+         self.cmd_print_s = string.join (self.cmd_print)
+         self.cmd_err = self.gen_cmd ('import sys; sys.stderr.write("foo\\n")')
+         self.cmd_err_s = string.join (self.cmd_err)
+         self.cmd_mixed = self.gen_cmd ("import sys; "
+                                        "sys.stdout.write('out'); "
+                                        "sys.stderr.write('err'); "
+                                        "sys.stdout.write('put'); "
+                                        "sys.stderr.write('or')")
+ 
+ 
+     def shutdown (self):
+         pass
+ 
+ 
+     def capture_output (self):
+         """Temporarily redirect stdout and stderr to files, the
+         contents of which will be returned by 'stop_capture()'."""
+         self.out_filename = mktemp()
+         self.err_filename = mktemp()
+         self.out_file = open(self.out_filename, 'w')
+         self.err_file = open(self.err_filename, 'w')
+         self.save_stdout = os.dup(1)
+         self.save_stderr = os.dup(2)
+         os.close(1)
+         if os.dup(self.out_file.fileno()) != 1:
+             raise RuntimeError, "couldn't redirect stdout - dup() error"
+         os.close(2)
+         if os.dup(self.err_file.fileno()) != 2:
+             raise RuntimeError, "couldn't redirect stderr - dup() error"
+ 
+     def stop_capture (self):
+         os.close(1)
+         os.dup(self.save_stdout)
+         os.close(2)
+         os.dup(self.save_stderr)
+ 
+         self.out_file.close()
+         self.err_file.close()
+ 
+         out_file = open(self.out_filename)
+         output = out_file.read()
+         out_file.close()
+         os.unlink (self.out_filename)
+ 
+         err_file = open(self.err_filename)
+         error = err_file.read()
+         err_file.close()
+         os.unlink (self.err_filename)
+ 
+         return (output, error)
+ 
+ 
+     #def capture_test_1 (self, test_method, args):
+     #    """Run a 
+ 
+ 
+     def test_out (self, code, output, error):
+         # XXX I have no idea how to do this in a portable way!
+ 
+         from unittest import get_caller_env
+         (globals, locals) = get_caller_env ()
+ 
+         if os.name == 'posix':
+             p2cread, p2cwrite = os.pipe()
+             c2pread, c2pwrite = os.pipe()
+             err_read, err_write = os.pipe()
+ 
+             pid = os.fork()
+             if pid == 0:                # in the child
+                 os.close(0)             # close stdin
+                 os.close(1)             # and stdout
+                 os.close(2)             # and stderr
+ 
+                 if os.dup(p2cread) != 0:
+                     raise os.error, "dup stdin (read) != 0"
+                 if os.dup(c2pwrite) != 1:
+                     raise os.error, "dup stdout (write) != 1"
+                 if os.dup(err_write) != 2:
+                     raise os.error, "dup stderr (write) != 2"
+ 
+                 eval (code, globals, locals)
+                 os._exit (0)
+ 
+             # in the parent
+             os.close(p2cread)
+             child_stdin = os.fdopen (p2cwrite, 'w')
+             os.close(c2pwrite)
+             child_stdout = os.fdopen (c2pread, 'r')
+             os.close(err_write)
+             child_stderr = os.fdopen (err_read, 'r')
+ 
+             child_out = child_stdout.read()
+             child_err = child_stderr.read()
+ 
+             child_stdin.close()
+             child_stdout.close()
+             child_stderr.close()
+             os.waitpid (pid, 0)
+ 
+             self.tests_run = self.tests_run + 1
+ 
+             if output == child_out and error == child_err:
+                 self.report_pass (
+                     code + "\n" +
+                     "    wrote %s to stdout\n" % `child_out` +
+                     "    and %s to stderr" % `child_err`)
+             elif output != child_out and error == child_err:
+                 self.report_fail (
+                     code + "\n" +
+                     "        did not write %s to stdout (wrote %s)\n" %
+                       (`output`, `child_out`) +
+                     "        but wrote %s to stderr" % child_err)
+             elif output == child_out and error != child_err:
+                 self.report_fail (
+                     code + "\n" +
+                     "        wrote %s to stdout" % `child_out` +
+                     "        but did not write %s to stderr (wrote %s)" %
+                       (`error`, `child_err`))
+             else:
+                 self.report_fail (
+                     code + "\n" +
+                     "        did not write %s to stdout (wrote %s)\n" %
+                       (`output`, `child_out`) +
+                     "        and did not write %s to stderr (wrote %s)" %
+                       (`error`, `child_err`))
+ 
+         else:
+             raise RuntimeError, \
+                   "don't know how to capture stdout/stderr on platform '%s'"%\
+                   os.name
+ 
+     # test_out ()
+         
+ 
+ 
+     def gen_cmd (self, script):
+         """Generate a Python command line to run 'script' as a list
+         for use by 'spawn()'.  Eg. if 'script' is "print 'hello'",
+         returns the list ["python", "-c", "print 'hello'"] -- except
+         the full path to the executable is used."""
+         return [self.python, "-c", str(script)]
+ 
+ 
+     test_cases = ['simple', 'output', 'verbose', 'dryrun']
+ 
+ 
+     def check_simple (self):
+         "Simple spawns (success and failure): 6"
+ 
+         self.test_stmt ("spawn (%s)" % self.cmd_mixed)
+         self.test_stmt ("spawn (%s)" % self.gen_cmd("import sys;sys.exit(0)"))
+         self.test_exc ("spawn (%s)" % self.gen_cmd("import sys;sys.exit(42)"),
+                        DistutilsExecError)
+ 
+         self.capture_output()
+         self.test_exc ("spawn (['vjwe9ghwe09fnkwef098vdsjn3209'])",
+                        DistutilsExecError)
+         (output, error) = self.stop_capture()
+         exp_error = "unable to execute vjwe9ghwe09fnkwef098vdsjn3209:"
+         self.test_val ("output", "")
+         self.test_val ("error[0:%d]" % len(exp_error), exp_error)
+ 
+     def check_output (self):
+         "Spawn commands with output and error output: 3"
+ 
+         self.test_out ("spawn (%s)" % self.cmd_print,
+                        "hello\n", "")
+         self.test_out ("spawn (%s)" % self.cmd_err,
+                        "", "foo\n")
+         self.test_out ("spawn (%s)" % self.cmd_mixed,
+                        "output", "error")
+ 
+     def check_verbose (self):
+         "Verbose reporting of command executed: 2"
+ 
+         self.test_out ("spawn (%s, verbose=1)" % self.cmd_trivial,
+                        self.cmd_trivial_s + "\n", "")
+         self.test_out ("spawn (%s, verbose=1)" % self.cmd_print,
+                        self.cmd_print_s + "\n" + "hello\n",
+                        "")
+ 
+     def check_dryrun (self):
+         "Spawn commands in dry-run mode (ie. don't actually do anything): "
+         self.test_out ("spawn (%s, dry_run=1)" % self.cmd_print,
+                        "", "")
+         self.test_out ("spawn (%s, dry_run=1, verbose=1)" % self.cmd_print,
+                        "%s -c %s\n" % (self.python, self.cmd_print_s),
+                        "")
+         
+         
+ if __name__ == "__main__":
+     (scenarios, options) = parse_args()
+     run_scenarios (scenarios, options)
+     sys.exit()
  
  spawn (["/bin/ls"])