[Python-3000-checkins] r58781 - in python/branches/py3k-pep3137/Lib: subprocess.py test/test_subprocess.py

guido.van.rossum python-3000-checkins at python.org
Fri Nov 2 19:25:48 CET 2007


Author: guido.van.rossum
Date: Fri Nov  2 19:25:47 2007
New Revision: 58781

Modified:
   python/branches/py3k-pep3137/Lib/subprocess.py
   python/branches/py3k-pep3137/Lib/test/test_subprocess.py
Log:
Fix the subprocess tests.
Clean up some odd indentation in test_subprocess.py.


Modified: python/branches/py3k-pep3137/Lib/subprocess.py
==============================================================================
--- python/branches/py3k-pep3137/Lib/subprocess.py	(original)
+++ python/branches/py3k-pep3137/Lib/subprocess.py	Fri Nov  2 19:25:47 2007
@@ -552,10 +552,9 @@
                 self.stderr = io.TextIOWrapper(self.stderr)
 
 
-    def _translate_newlines(self, data):
-        data = data.replace(b"\r\n", b"\n")
-        data = data.replace(b"\r", b"\n")
-        return str(data)
+    def _translate_newlines(self, data, encoding):
+        data = data.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
+        return data.decode(encoding)
 
 
     def __del__(self, sys=sys):
@@ -825,16 +824,6 @@
             if stderr is not None:
                 stderr = stderr[0]
 
-            # Translate newlines, if requested.  We cannot let the file
-            # object do the translation: It is based on stdio, which is
-            # impossible to combine with select (unless forcing no
-            # buffering).
-            if self.universal_newlines:
-                if stdout is not None:
-                    stdout = self._translate_newlines(stdout)
-                if stderr is not None:
-                    stderr = self._translate_newlines(stderr)
-
             self.wait()
             return (stdout, stderr)
 
@@ -960,7 +949,8 @@
                         os.close(p2cread)
                     if c2pwrite is not None and c2pwrite not in (p2cread, 1):
                         os.close(c2pwrite)
-                    if errwrite is not None and errwrite not in (p2cread, c2pwrite, 2):
+                    if (errwrite is not None and
+                        errwrite not in (p2cread, c2pwrite, 2)):
                         os.close(errwrite)
 
                     # Close all other fds, if asked for
@@ -1071,6 +1061,9 @@
             while read_set or write_set:
                 rlist, wlist, xlist = select.select(read_set, write_set, [])
 
+                # XXX Rewrite these to use non-blocking I/O on the
+                # file objects; they are no longer using C stdio!
+
                 if self.stdin in wlist:
                     # When select has indicated that the file is writable,
                     # we can write up to PIPE_BUF bytes without risk
@@ -1098,19 +1091,19 @@
 
             # All data exchanged.  Translate lists into strings.
             if stdout is not None:
-                stdout = b''.join(stdout)
+                stdout = b"".join(stdout)
             if stderr is not None:
-                stderr = b''.join(stderr)
+                stderr = b"".join(stderr)
 
-            # Translate newlines, if requested.  We cannot let the file
-            # object do the translation: It is based on stdio, which is
-            # impossible to combine with select (unless forcing no
-            # buffering).
+            # Translate newlines, if requested.
+            # This also turns bytes into strings.
             if self.universal_newlines:
                 if stdout is not None:
-                    stdout = self._translate_newlines(stdout)
+                    stdout = self._translate_newlines(stdout,
+                                                      self.stdout.encoding)
                 if stderr is not None:
-                    stderr = self._translate_newlines(stderr)
+                    stderr = self._translate_newlines(stderr,
+                                                      self.stderr.encoding)
 
             self.wait()
             return (stdout, stderr)

Modified: python/branches/py3k-pep3137/Lib/test/test_subprocess.py
==============================================================================
--- python/branches/py3k-pep3137/Lib/test/test_subprocess.py	(original)
+++ python/branches/py3k-pep3137/Lib/test/test_subprocess.py	Fri Nov  2 19:25:47 2007
@@ -24,7 +24,7 @@
 # shutdown time.  That frustrates tests trying to check stderr produced
 # from a spawned Python process.
 def remove_stderr_debug_decorations(stderr):
-    return re.sub(r"\[\d+ refs\]\r?\n?$", "", str(stderr))
+    return re.sub(r"\[\d+ refs\]\r?\n?$", "", stderr)
 
 class ProcessTestCase(unittest.TestCase):
     def setUp(self):
@@ -77,9 +77,9 @@
         newenv = os.environ.copy()
         newenv["FRUIT"] = "banana"
         rc = subprocess.call([sys.executable, "-c",
-                          'import sys, os;' \
-                          'sys.exit(os.getenv("FRUIT")=="banana")'],
-                        env=newenv)
+                              'import sys, os;'
+                              'sys.exit(os.getenv("FRUIT")=="banana")'],
+                             env=newenv)
         self.assertEqual(rc, 1)
 
     def test_stdin_none(self):
@@ -180,7 +180,7 @@
                           'import sys; sys.stderr.write("strawberry")'],
                          stderr=subprocess.PIPE)
         self.assertEqual(remove_stderr_debug_decorations(p.stderr.read()),
-                         "strawberry")
+                         b"strawberry")
 
     def test_stderr_filedes(self):
         # stderr is set to open file descriptor
@@ -192,7 +192,7 @@
         p.wait()
         os.lseek(d, 0, 0)
         self.assertEqual(remove_stderr_debug_decorations(os.read(d, 1024)),
-                         "strawberry")
+                         b"strawberry")
 
     def test_stderr_fileobj(self):
         # stderr is set to open file object
@@ -203,36 +203,36 @@
         p.wait()
         tf.seek(0)
         self.assertEqual(remove_stderr_debug_decorations(tf.read()),
-                         "strawberry")
+                         b"strawberry")
 
     def test_stdout_stderr_pipe(self):
         # capture stdout and stderr to the same pipe
         p = subprocess.Popen([sys.executable, "-c",
-                          'import sys;' \
-                          'sys.stdout.write("apple");' \
-                          'sys.stdout.flush();' \
-                          'sys.stderr.write("orange")'],
-                         stdout=subprocess.PIPE,
-                         stderr=subprocess.STDOUT)
+                              'import sys;'
+                              'sys.stdout.write("apple");'
+                              'sys.stdout.flush();'
+                              'sys.stderr.write("orange")'],
+                             stdout=subprocess.PIPE,
+                             stderr=subprocess.STDOUT)
         output = p.stdout.read()
         stripped = remove_stderr_debug_decorations(output)
-        self.assertEqual(stripped, "appleorange")
+        self.assertEqual(stripped, b"appleorange")
 
     def test_stdout_stderr_file(self):
         # capture stdout and stderr to the same open file
         tf = tempfile.TemporaryFile()
         p = subprocess.Popen([sys.executable, "-c",
-                          'import sys;' \
-                          'sys.stdout.write("apple");' \
-                          'sys.stdout.flush();' \
-                          'sys.stderr.write("orange")'],
-                         stdout=tf,
-                         stderr=tf)
+                              'import sys;'
+                              'sys.stdout.write("apple");'
+                              'sys.stdout.flush();'
+                              'sys.stderr.write("orange")'],
+                             stdout=tf,
+                             stderr=tf)
         p.wait()
         tf.seek(0)
         output = tf.read()
         stripped = remove_stderr_debug_decorations(output)
-        self.assertEqual(stripped, "appleorange")
+        self.assertEqual(stripped, b"appleorange")
 
     def test_stdout_filedes_of_stdout(self):
         # stdout is set to 1 (#1531862).
@@ -249,10 +249,10 @@
         tmpdir = os.getcwd()
         os.chdir(cwd)
         p = subprocess.Popen([sys.executable, "-c",
-                          'import sys,os;' \
-                          'sys.stdout.write(os.getcwd())'],
-                         stdout=subprocess.PIPE,
-                         cwd=tmpdir)
+                              'import sys,os;'
+                              'sys.stdout.write(os.getcwd())'],
+                             stdout=subprocess.PIPE,
+                             cwd=tmpdir)
         normcase = os.path.normcase
         self.assertEqual(normcase(p.stdout.read().decode("utf-8")),
                          normcase(tmpdir))
@@ -261,15 +261,16 @@
         newenv = os.environ.copy()
         newenv["FRUIT"] = "orange"
         p = subprocess.Popen([sys.executable, "-c",
-                          'import sys,os;' \
-                          'sys.stdout.write(os.getenv("FRUIT"))'],
-                         stdout=subprocess.PIPE,
-                         env=newenv)
+                              'import sys,os;'
+                              'sys.stdout.write(os.getenv("FRUIT"))'],
+                             stdout=subprocess.PIPE,
+                             env=newenv)
         self.assertEqual(p.stdout.read(), b"orange")
 
     def test_communicate_stdin(self):
         p = subprocess.Popen([sys.executable, "-c",
-                              'import sys; sys.exit(sys.stdin.read() == "pear")'],
+                              'import sys;'
+                              'sys.exit(sys.stdin.read() == "pear")'],
                              stdin=subprocess.PIPE)
         p.communicate(b"pear")
         self.assertEqual(p.returncode, 1)
@@ -294,16 +295,16 @@
 
     def test_communicate(self):
         p = subprocess.Popen([sys.executable, "-c",
-                          'import sys,os;' \
-                          'sys.stderr.write("pineapple");' \
-                          'sys.stdout.write(sys.stdin.read())'],
-                         stdin=subprocess.PIPE,
-                         stdout=subprocess.PIPE,
-                         stderr=subprocess.PIPE)
+                              'import sys,os;'
+                              'sys.stderr.write("pineapple");'
+                              'sys.stdout.write(sys.stdin.read())'],
+                             stdin=subprocess.PIPE,
+                             stdout=subprocess.PIPE,
+                             stderr=subprocess.PIPE)
         (stdout, stderr) = p.communicate("banana")
         self.assertEqual(stdout, b"banana")
         self.assertEqual(remove_stderr_debug_decorations(stderr),
-                         "pineapple")
+                         b"pineapple")
 
     def test_communicate_returns(self):
         # communicate() should return None if no redirection is active
@@ -325,13 +326,13 @@
         os.close(x)
         os.close(y)
         p = subprocess.Popen([sys.executable, "-c",
-                          'import sys,os;'
-                          'sys.stdout.write(sys.stdin.read(47));' \
-                          'sys.stderr.write("xyz"*%d);' \
-                          'sys.stdout.write(sys.stdin.read())' % pipe_buf],
-                         stdin=subprocess.PIPE,
-                         stdout=subprocess.PIPE,
-                         stderr=subprocess.PIPE)
+                              'import sys,os;'
+                              'sys.stdout.write(sys.stdin.read(47));'
+                              'sys.stderr.write("xyz"*%d);'
+                              'sys.stdout.write(sys.stdin.read())' % pipe_buf],
+                             stdin=subprocess.PIPE,
+                             stdout=subprocess.PIPE,
+                             stderr=subprocess.PIPE)
         string_to_write = b"abc"*pipe_buf
         (stdout, stderr) = p.communicate(string_to_write)
         self.assertEqual(stdout, string_to_write)
@@ -339,68 +340,69 @@
     def test_writes_before_communicate(self):
         # stdin.write before communicate()
         p = subprocess.Popen([sys.executable, "-c",
-                          'import sys,os;' \
-                          'sys.stdout.write(sys.stdin.read())'],
-                         stdin=subprocess.PIPE,
-                         stdout=subprocess.PIPE,
-                         stderr=subprocess.PIPE)
+                              'import sys,os;'
+                              'sys.stdout.write(sys.stdin.read())'],
+                             stdin=subprocess.PIPE,
+                             stdout=subprocess.PIPE,
+                             stderr=subprocess.PIPE)
         p.stdin.write(b"banana")
         (stdout, stderr) = p.communicate(b"split")
         self.assertEqual(stdout, b"bananasplit")
-        self.assertEqual(remove_stderr_debug_decorations(stderr), "")
+        self.assertEqual(remove_stderr_debug_decorations(stderr), b"")
 
     def test_universal_newlines(self):
         p = subprocess.Popen([sys.executable, "-c",
-                          'import sys,os;' + SETBINARY +
-                          'sys.stdout.write("line1\\n");'
-                          'sys.stdout.flush();'
-                          'sys.stdout.write("line2\\n");'
-                          'sys.stdout.flush();'
-                          'sys.stdout.write("line3\\r\\n");'
-                          'sys.stdout.flush();'
-                          'sys.stdout.write("line4\\r");'
-                          'sys.stdout.flush();'
-                          'sys.stdout.write("\\nline5");'
-                          'sys.stdout.flush();'
-                          'sys.stdout.write("\\nline6");'],
-                         stdout=subprocess.PIPE,
-                         universal_newlines=1)
+                              'import sys,os;' + SETBINARY +
+                              'sys.stdout.write("line1\\n");'
+                              'sys.stdout.flush();'
+                              'sys.stdout.write("line2\\n");'
+                              'sys.stdout.flush();'
+                              'sys.stdout.write("line3\\r\\n");'
+                              'sys.stdout.flush();'
+                              'sys.stdout.write("line4\\r");'
+                              'sys.stdout.flush();'
+                              'sys.stdout.write("\\nline5");'
+                              'sys.stdout.flush();'
+                              'sys.stdout.write("\\nline6");'],
+                             stdout=subprocess.PIPE,
+                             universal_newlines=1)
         stdout = p.stdout.read()
         self.assertEqual(stdout, "line1\nline2\nline3\nline4\nline5\nline6")
 
     def test_universal_newlines_communicate(self):
         # universal newlines through communicate()
         p = subprocess.Popen([sys.executable, "-c",
-                          'import sys,os;' + SETBINARY +
-                          'sys.stdout.write("line1\\n");'
-                          'sys.stdout.flush();'
-                          'sys.stdout.write("line2\\n");'
-                          'sys.stdout.flush();'
-                          'sys.stdout.write("line3\\r\\n");'
-                          'sys.stdout.flush();'
-                          'sys.stdout.write("line4\\r");'
-                          'sys.stdout.flush();'
-                          'sys.stdout.write("\\nline5");'
-                          'sys.stdout.flush();'
-                          'sys.stdout.write("\\nline6");'],
-                         stdout=subprocess.PIPE, stderr=subprocess.PIPE,
-                         universal_newlines=1)
+                              'import sys,os;' + SETBINARY +
+                              'sys.stdout.write("line1\\n");'
+                              'sys.stdout.flush();'
+                              'sys.stdout.write("line2\\n");'
+                              'sys.stdout.flush();'
+                              'sys.stdout.write("line3\\r\\n");'
+                              'sys.stdout.flush();'
+                              'sys.stdout.write("line4\\r");'
+                              'sys.stdout.flush();'
+                              'sys.stdout.write("\\nline5");'
+                              'sys.stdout.flush();'
+                              'sys.stdout.write("\\nline6");'],
+                             stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+                             universal_newlines=1)
         (stdout, stderr) = p.communicate()
         self.assertEqual(stdout, "line1\nline2\nline3\nline4\nline5\nline6")
 
     def test_no_leaking(self):
         # Make sure we leak no resources
-        if not hasattr(test_support, "is_resource_enabled") \
-               or test_support.is_resource_enabled("subprocess") and not mswindows:
+        if (not hasattr(test_support, "is_resource_enabled") or
+            test_support.is_resource_enabled("subprocess") and not mswindows):
             max_handles = 1026 # too much for most UNIX systems
         else:
             max_handles = 65
         for i in range(max_handles):
             p = subprocess.Popen([sys.executable, "-c",
-                    "import sys;sys.stdout.write(sys.stdin.read())"],
-                    stdin=subprocess.PIPE,
-                    stdout=subprocess.PIPE,
-                    stderr=subprocess.PIPE)
+                                  "import sys;"
+                                  "sys.stdout.write(sys.stdin.read())"],
+                                 stdin=subprocess.PIPE,
+                                 stdout=subprocess.PIPE,
+                                 stderr=subprocess.PIPE)
             data = p.communicate("lime")[0]
             self.assertEqual(data, b"lime")
 
@@ -516,10 +518,11 @@
         def test_preexec(self):
             # preexec function
             p = subprocess.Popen([sys.executable, "-c",
-                              'import sys,os;' \
-                              'sys.stdout.write(os.getenv("FRUIT"))'],
-                             stdout=subprocess.PIPE,
-                             preexec_fn=lambda: os.putenv("FRUIT", "apple"))
+                                  'import sys,os;'
+                                  'sys.stdout.write(os.getenv("FRUIT"))'],
+                                 stdout=subprocess.PIPE,
+                                 preexec_fn=lambda: os.putenv("FRUIT",
+                                                              "apple"))
             self.assertEqual(p.stdout.read(), b"apple")
 
         def test_args_string(self):
@@ -654,4 +657,4 @@
         test_support.reap_children()
 
 if __name__ == "__main__":
-    test_main()
+    unittest.main()  # XXX test_main()


More information about the Python-3000-checkins mailing list