[py-svn] r36379 - in py/dist/py: . compat compat/testing

cfbolz at codespeak.net cfbolz at codespeak.net
Tue Jan 9 17:55:55 CET 2007


Author: cfbolz
Date: Tue Jan  9 17:55:49 2007
New Revision: 36379

Added:
   py/dist/py/compat/testing/test_subprocess.py
Modified:
   py/dist/py/__init__.py
   py/dist/py/compat/doctest.py
   py/dist/py/compat/optparse.py
   py/dist/py/compat/subprocess.py
   py/dist/py/compat/testing/test_doctest.py
   py/dist/py/compat/testing/test_doctest2.py
   py/dist/py/compat/testing/test_optparse.py
   py/dist/py/compat/testing/test_textwrap.py
   py/dist/py/compat/textwrap.py
Log:
import the newest versions of the modules in compat from python 2.4.4


Modified: py/dist/py/__init__.py
==============================================================================
--- py/dist/py/__init__.py	(original)
+++ py/dist/py/__init__.py	Tue Jan  9 17:55:49 2007
@@ -130,7 +130,7 @@
     'log.Syslog'             : ('./log/consumer.py', 'Syslog'),
     'log.get'                : ('./log/logger.py', 'get'), 
 
-    # compatibility modules (taken from 2.4.1) 
+    # compatibility modules (taken from 2.4.4) 
     'compat.doctest'         : ('./compat/doctest.py', '*'),
     'compat.optparse'        : ('./compat/optparse.py', '*'),
     'compat.textwrap'        : ('./compat/textwrap.py', '*'),

Modified: py/dist/py/compat/doctest.py
==============================================================================
--- py/dist/py/compat/doctest.py	(original)
+++ py/dist/py/compat/doctest.py	Tue Jan  9 17:55:49 2007
@@ -128,9 +128,8 @@
 
 OPTIONFLAGS_BY_NAME = {}
 def register_optionflag(name):
-    flag = 1 << len(OPTIONFLAGS_BY_NAME)
-    OPTIONFLAGS_BY_NAME[name] = flag
-    return flag
+    # Create a new flag unless `name` is already known.
+    return OPTIONFLAGS_BY_NAME.setdefault(name, 1 << len(OPTIONFLAGS_BY_NAME))
 
 DONT_ACCEPT_TRUE_FOR_1 = register_optionflag('DONT_ACCEPT_TRUE_FOR_1')
 DONT_ACCEPT_BLANKLINE = register_optionflag('DONT_ACCEPT_BLANKLINE')
@@ -849,6 +848,11 @@
         # Recursively expore `obj`, extracting DocTests.
         tests = []
         self._find(tests, obj, name, module, source_lines, globs, {})
+        # Sort the tests by alpha order of names, for consistency in
+        # verbose-mode output.  This was a feature of doctest in Pythons
+        # <= 2.3 that got lost by accident in 2.4.  It was repaired in
+        # 2.4.4 and 2.5.
+        tests.sort()
         return tests
 
     def _filter(self, obj, prefix, base):
@@ -1045,12 +1049,13 @@
 
         >>> tests = DocTestFinder().find(_TestClass)
         >>> runner = DocTestRunner(verbose=False)
+        >>> tests.sort(key = lambda test: test.name)
         >>> for test in tests:
-        ...     print runner.run(test)
-        (0, 2)
-        (0, 1)
-        (0, 2)
-        (0, 2)
+        ...     print test.name, '->', runner.run(test)
+        _TestClass -> (0, 2)
+        _TestClass.__init__ -> (0, 2)
+        _TestClass.get -> (0, 2)
+        _TestClass.square -> (0, 1)
 
     The `summarize` method prints a summary of all the test cases that
     have been run by the runner, and returns an aggregated `(f, t)`
@@ -2472,6 +2477,7 @@
           blah
        #
        #     Ho hum
+       <BLANKLINE>
        """
     output = []
     for piece in DocTestParser().parse(s):
@@ -2494,7 +2500,8 @@
     while output and output[0] == '#':
         output.pop(0)
     # Combine the output, and return it.
-    return '\n'.join(output)
+    # Add a courtesy newline to prevent exec from choking (see bug #1172785)
+    return '\n'.join(output) + '\n'
 
 def testsource(module, name):
     """Extract the test sources from a doctest docstring as a script.

Modified: py/dist/py/compat/optparse.py
==============================================================================
--- py/dist/py/compat/optparse.py	(original)
+++ py/dist/py/compat/optparse.py	Tue Jan  9 17:55:49 2007
@@ -68,12 +68,9 @@
 
 import sys, os
 import types
+import textwrap
 from gettext import gettext as _
 
-import py
-textwrap = py.compat.textwrap
-
-
 def _repr(self):
     return "<%s at 0x%x: %s>" % (self.__class__.__name__, id(self), self)
 
@@ -550,8 +547,10 @@
                 else:
                     setattr(self, attr, None)
         if attrs:
+            attrs = attrs.keys()
+            attrs.sort()
             raise OptionError(
-                "invalid keyword arguments: %s" % ", ".join(attrs.keys()),
+                "invalid keyword arguments: %s" % ", ".join(attrs),
                 self)
 
 
@@ -1559,6 +1558,7 @@
             raise BadOptionError(_("no such option: %s") % s)
         else:
             # More than one possible completion: ambiguous prefix.
+            possibilities.sort()
             raise BadOptionError(_("ambiguous option: %s (%s?)")
                                  % (s, ", ".join(possibilities)))
 

Modified: py/dist/py/compat/subprocess.py
==============================================================================
--- py/dist/py/compat/subprocess.py	(original)
+++ py/dist/py/compat/subprocess.py	Tue Jan  9 17:55:49 2007
@@ -1,32 +1,13 @@
-# Copied from CPython 2.4 with one modification to dynamically
-# fall back to pywin32 code when _subprocess is not available.
-
 # subprocess - Subprocesses with accessible I/O streams
 #
 # For more information about this module, see PEP 324.
 #
-# Copyright (c) 2003-2004 by Peter Astrand <astrand at lysator.liu.se>
-#
-# By obtaining, using, and/or copying this software and/or its
-# associated documentation, you agree that you have read, understood,
-# and will comply with the following terms and conditions:
+# This module should remain compatible with Python 2.2, see PEP 291.
 #
-# Permission to use, copy, modify, and distribute this software and
-# its associated documentation for any purpose and without fee is
-# hereby granted, provided that the above copyright notice appears in
-# all copies, and that both that copyright notice and this permission
-# notice appear in supporting documentation, and that the name of the
-# author not be used in advertising or publicity pertaining to
-# distribution of the software without specific, written prior
-# permission.
+# Copyright (c) 2003-2005 by Peter Astrand <astrand at lysator.liu.se>
 #
-# THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
-# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS.
-# IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, INDIRECT OR
-# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
-# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
-# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
-# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+# Licensed to PSF under a Contributor Agreement.
+# See http://www.python.org/2.4/license for licensing details.
 
 r"""subprocess - Subprocesses with accessible I/O streams
 
@@ -369,16 +350,7 @@
 if mswindows:
     import threading
     import msvcrt
-    try:
-        from _subprocess import *
-        class STARTUPINFO:
-            dwFlags = 0
-            hStdInput = None
-            hStdOutput = None
-            hStdError = None
-        class pywintypes:
-            error = IOError
-    except ImportError:
+    if 0: # <-- change this to use pywin32 instead of the _subprocess driver
         import pywintypes
         from win32api import GetStdHandle, STD_INPUT_HANDLE, \
                              STD_OUTPUT_HANDLE, STD_ERROR_HANDLE
@@ -390,6 +362,16 @@
                                  GetExitCodeProcess, STARTF_USESTDHANDLES, \
                                  STARTF_USESHOWWINDOW, CREATE_NEW_CONSOLE
         from win32event import WaitForSingleObject, INFINITE, WAIT_OBJECT_0
+    else:
+        from _subprocess import *
+        class STARTUPINFO:
+            dwFlags = 0
+            hStdInput = None
+            hStdOutput = None
+            hStdError = None
+            wShowWindow = 0
+        class pywintypes:
+            error = IOError
 else:
     import select
     import errno
@@ -681,18 +663,17 @@
                 args = list2cmdline(args)
 
             # Process startup details
-            default_startupinfo = STARTUPINFO()
             if startupinfo == None:
-                startupinfo = default_startupinfo
-            if not None in (p2cread, c2pwrite, errwrite):
+                startupinfo = STARTUPINFO()
+            if None not in (p2cread, c2pwrite, errwrite):
                 startupinfo.dwFlags |= STARTF_USESTDHANDLES
                 startupinfo.hStdInput = p2cread
                 startupinfo.hStdOutput = c2pwrite
                 startupinfo.hStdError = errwrite
 
             if shell:
-                default_startupinfo.dwFlags |= STARTF_USESHOWWINDOW
-                default_startupinfo.wShowWindow = SW_HIDE
+                startupinfo.dwFlags |= STARTF_USESHOWWINDOW
+                startupinfo.wShowWindow = SW_HIDE
                 comspec = os.environ.get("COMSPEC", "cmd.exe")
                 args = comspec + " /c " + args
                 if (GetVersion() >= 0x80000000L or
@@ -886,7 +867,7 @@
 
 
         def _close_fds(self, but):
-            for i in range(3, MAXFD):
+            for i in xrange(3, MAXFD):
                 if i == but:
                     continue
                 try:

Modified: py/dist/py/compat/testing/test_doctest.py
==============================================================================
--- py/dist/py/compat/testing/test_doctest.py	(original)
+++ py/dist/py/compat/testing/test_doctest.py	Tue Jan  9 17:55:49 2007
@@ -2,14 +2,15 @@
 Test script for doctest.
 """
 
+from test import test_support
+import warnings
+
 import py
 doctest = py.compat.doctest
 
 import sys
 sys.modules['doctest'] = py.compat.doctest
 
-from test import test_support
-import warnings
 
 ######################################################################
 ## Sample Objects (used by test cases)
@@ -424,7 +425,6 @@
 
     >>> finder = doctest.DocTestFinder()
     >>> tests = finder.find(SampleClass)
-    >>> tests.sort()
     >>> for t in tests:
     ...     print '%2s  %s' % (len(t.examples), t.name)
      3  SampleClass
@@ -440,7 +440,6 @@
 New-style classes are also supported:
 
     >>> tests = finder.find(SampleNewStyleClass)
-    >>> tests.sort()
     >>> for t in tests:
     ...     print '%2s  %s' % (len(t.examples), t.name)
      1  SampleNewStyleClass
@@ -480,7 +479,6 @@
     >>> # ignoring the objects since they weren't defined in m.
     >>> import test.test_doctest
     >>> tests = finder.find(m, module=test.test_doctest)
-    >>> tests.sort()
     >>> for t in tests:
     ...     print '%2s  %s' % (len(t.examples), t.name)
      1  some_module
@@ -504,7 +502,6 @@
 
     >>> from test import doctest_aliases
     >>> tests = excl_empty_finder.find(doctest_aliases)
-    >>> tests.sort()
     >>> print len(tests)
     2
     >>> print tests[0].name
@@ -526,7 +523,6 @@
     >>> def namefilter(prefix, base):
     ...     return base.startswith('a_')
     >>> tests = doctest.DocTestFinder(_namefilter=namefilter).find(SampleClass)
-    >>> tests.sort()
     >>> for t in tests:
     ...     print '%2s  %s' % (len(t.examples), t.name)
      3  SampleClass
@@ -543,7 +539,6 @@
 
     >>> tests = doctest.DocTestFinder(_namefilter=namefilter,
     ...                                exclude_empty=False).find(SampleClass)
-    >>> tests.sort()
     >>> for t in tests:
     ...     print '%2s  %s' % (len(t.examples), t.name)
      3  SampleClass
@@ -587,7 +582,6 @@
 using the `recurse` flag:
 
     >>> tests = doctest.DocTestFinder(recurse=False).find(SampleClass)
-    >>> tests.sort()
     >>> for t in tests:
     ...     print '%2s  %s' % (len(t.examples), t.name)
      3  SampleClass
@@ -1286,6 +1280,26 @@
         ValueError: 2
     (3, 5)
 
+New option flags can also be registered, via register_optionflag().  Here
+we reach into doctest's internals a bit.
+
+    >>> unlikely = "UNLIKELY_OPTION_NAME"
+    >>> unlikely in doctest.OPTIONFLAGS_BY_NAME
+    False
+    >>> new_flag_value = doctest.register_optionflag(unlikely)
+    >>> unlikely in doctest.OPTIONFLAGS_BY_NAME
+    True
+
+Before 2.4.4/2.5, registering a name more than once erroneously created
+more than one flag value.  Here we verify that's fixed:
+
+    >>> redundant_flag_value = doctest.register_optionflag(unlikely)
+    >>> redundant_flag_value == new_flag_value
+    True
+
+Clean up.
+    >>> del doctest.OPTIONFLAGS_BY_NAME[unlikely]
+
     """
 
     def option_directives(): r"""
@@ -1522,6 +1536,7 @@
     ## 44
     #
     # Yee ha!
+    <BLANKLINE>
 
     >>> name = 'test.test_doctest.SampleNewStyleClass'
     >>> print doctest.testsource(test.test_doctest, name)
@@ -1530,6 +1545,7 @@
     ## 1
     ## 2
     ## 3
+    <BLANKLINE>
 
     >>> name = 'test.test_doctest.SampleClass.a_classmethod'
     >>> print doctest.testsource(test.test_doctest, name)
@@ -1539,6 +1555,7 @@
     print SampleClass(0).a_classmethod(10)
     # Expected:
     ## 12
+    <BLANKLINE>
 """
 
 def test_debug(): r"""

Modified: py/dist/py/compat/testing/test_doctest2.py
==============================================================================
--- py/dist/py/compat/testing/test_doctest2.py	(original)
+++ py/dist/py/compat/testing/test_doctest2.py	Tue Jan  9 17:55:49 2007
@@ -11,8 +11,6 @@
 ЉЊЈЁЂ
 
 """
-from _findpy import py 
-print py.__file__
 
 from test import test_support
 

Modified: py/dist/py/compat/testing/test_optparse.py
==============================================================================
--- py/dist/py/compat/testing/test_optparse.py	(original)
+++ py/dist/py/compat/testing/test_optparse.py	Tue Jan  9 17:55:49 2007
@@ -5,7 +5,7 @@
 # (taradino at softhome.net) -- translated from the original Optik
 # test suite to this PyUnit-based version.
 #
-# $Id: test_optparse.py,v 1.10 2004/10/27 02:43:25 tim_one Exp $
+# $Id: test_optparse.py 46506 2006-05-28 18:15:43Z armin.rigo $
 #
 
 import sys
@@ -19,21 +19,13 @@
 
 import py
 optparse = py.compat.optparse 
-make_option = optparse.make_option
-Option = optparse.Option
-IndentedHelpFormatter = optparse.IndentedHelpFormatter
-TitledHelpFormatter = optparse.TitledHelpFormatter
-OptionParser = optparse.OptionParser
-OptionContainer = optparse.OptionContainer
-OptionGroup = optparse.OptionGroup
-SUPPRESS_HELP = optparse.SUPPRESS_HELP
-SUPPRESS_USAGE = optparse.SUPPRESS_USAGE
-OptionError = optparse.OptionError
-OptionConflictError = optparse.OptionConflictError
-BadOptionError = optparse.BadOptionError
-OptionValueError = optparse.OptionValueError
-Values = optparse.Values
-_match_abbrev = optparse._match_abbrev
+import sys
+sys.modules['optparse'] = optparse
+
+from optparse import make_option, Option, IndentedHelpFormatter, \
+     TitledHelpFormatter, OptionParser, OptionContainer, OptionGroup, \
+     SUPPRESS_HELP, SUPPRESS_USAGE, OptionError, OptionConflictError, \
+     BadOptionError, OptionValueError, Values, _match_abbrev
 
 # Do the right thing with boolean values for all known Python versions.
 try:
@@ -225,7 +217,7 @@
 
     def test_attr_invalid(self):
         self.assertOptionError(
-            "option -b: invalid keyword arguments: foo, bar",
+            "option -b: invalid keyword arguments: bar, foo",
             ["-b"], {'foo': None, 'bar': None})
 
     def test_action_invalid(self):
@@ -688,9 +680,8 @@
     def test_ambiguous_option(self):
         self.parser.add_option("--foz", action="store",
                                type="string", dest="foo")
-        possibilities = ", ".join({"--foz": None, "--foo": None}.keys())
         self.assertParseFail(["--f=bar"],
-                             "ambiguous option: --f (%s?)" % possibilities)
+                             "ambiguous option: --f (--foo, --foz?)")
 
 
     def test_short_and_long_option_split(self):
@@ -1490,10 +1481,9 @@
     def test_match_abbrev_error(self):
         s = "--f"
         wordmap = {"--foz": None, "--foo": None, "--fie": None}
-        possibilities = ", ".join(wordmap.keys())
         self.assertRaises(
             _match_abbrev, (s, wordmap), None,
-            BadOptionError, "ambiguous option: --f (%s?)" % possibilities)
+            BadOptionError, "ambiguous option: --f (--fie, --foo, --foz?)")
 
 
 def _testclasses():

Added: py/dist/py/compat/testing/test_subprocess.py
==============================================================================
--- (empty file)
+++ py/dist/py/compat/testing/test_subprocess.py	Tue Jan  9 17:55:49 2007
@@ -0,0 +1,563 @@
+import unittest
+from test import test_support
+from py.compat import subprocess
+import sys
+import signal
+import os
+import tempfile
+import time
+import re
+
+mswindows = (sys.platform == "win32")
+
+#
+# Depends on the following external programs: Python
+#
+
+if mswindows:
+    SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), '
+                                                'os.O_BINARY);')
+else:
+    SETBINARY = ''
+
+# In a debug build, stuff like "[6580 refs]" is printed to stderr at
+# shutdown time.  That frustrates tests trying to check stderr produced
+# from a spawned Python process.
+def remove_stderr_debug_decorations(stderr):
+    return re.sub(r"\[\d+ refs\]\r?\n?$", "", stderr)
+
+class ProcessTestCase(unittest.TestCase):
+    def mkstemp(self):
+        """wrapper for mkstemp, calling mktemp if mkstemp is not available"""
+        if hasattr(tempfile, "mkstemp"):
+            return tempfile.mkstemp()
+        else:
+            fname = tempfile.mktemp()
+            return os.open(fname, os.O_RDWR|os.O_CREAT), fname
+
+    #
+    # Generic tests
+    #
+    def test_call_seq(self):
+        # call() function with sequence argument
+        rc = subprocess.call([sys.executable, "-c",
+                              "import sys; sys.exit(47)"])
+        self.assertEqual(rc, 47)
+
+    def test_call_kwargs(self):
+        # call() function with keyword args
+        newenv = os.environ.copy()
+        newenv["FRUIT"] = "banana"
+        rc = subprocess.call([sys.executable, "-c",
+                          'import sys, os;' \
+                          'sys.exit(os.getenv("FRUIT")=="banana")'],
+                        env=newenv)
+        self.assertEqual(rc, 1)
+
+    def test_stdin_none(self):
+        # .stdin is None when not redirected
+        p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],
+                         stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+        p.wait()
+        self.assertEqual(p.stdin, None)
+
+    def test_stdout_none(self):
+        # .stdout is None when not redirected
+        p = subprocess.Popen([sys.executable, "-c",
+                             'print "    this bit of output is from a '
+                             'test of stdout in a different '
+                             'process ..."'],
+                             stdin=subprocess.PIPE, stderr=subprocess.PIPE)
+        p.wait()
+        self.assertEqual(p.stdout, None)
+
+    def test_stderr_none(self):
+        # .stderr is None when not redirected
+        p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],
+                         stdin=subprocess.PIPE, stdout=subprocess.PIPE)
+        p.wait()
+        self.assertEqual(p.stderr, None)
+
+    def test_executable(self):
+        p = subprocess.Popen(["somethingyoudonthave",
+                              "-c", "import sys; sys.exit(47)"],
+                             executable=sys.executable)
+        p.wait()
+        self.assertEqual(p.returncode, 47)
+
+    def test_stdin_pipe(self):
+        # stdin redirection
+        p = subprocess.Popen([sys.executable, "-c",
+                         'import sys; sys.exit(sys.stdin.read() == "pear")'],
+                        stdin=subprocess.PIPE)
+        p.stdin.write("pear")
+        p.stdin.close()
+        p.wait()
+        self.assertEqual(p.returncode, 1)
+
+    def test_stdin_filedes(self):
+        # stdin is set to open file descriptor
+        tf = tempfile.TemporaryFile()
+        d = tf.fileno()
+        os.write(d, "pear")
+        os.lseek(d, 0, 0)
+        p = subprocess.Popen([sys.executable, "-c",
+                         'import sys; sys.exit(sys.stdin.read() == "pear")'],
+                         stdin=d)
+        p.wait()
+        self.assertEqual(p.returncode, 1)
+
+    def test_stdin_fileobj(self):
+        # stdin is set to open file object
+        tf = tempfile.TemporaryFile()
+        tf.write("pear")
+        tf.seek(0)
+        p = subprocess.Popen([sys.executable, "-c",
+                         'import sys; sys.exit(sys.stdin.read() == "pear")'],
+                         stdin=tf)
+        p.wait()
+        self.assertEqual(p.returncode, 1)
+
+    def test_stdout_pipe(self):
+        # stdout redirection
+        p = subprocess.Popen([sys.executable, "-c",
+                          'import sys; sys.stdout.write("orange")'],
+                         stdout=subprocess.PIPE)
+        self.assertEqual(p.stdout.read(), "orange")
+
+    def test_stdout_filedes(self):
+        # stdout is set to open file descriptor
+        tf = tempfile.TemporaryFile()
+        d = tf.fileno()
+        p = subprocess.Popen([sys.executable, "-c",
+                          'import sys; sys.stdout.write("orange")'],
+                         stdout=d)
+        p.wait()
+        os.lseek(d, 0, 0)
+        self.assertEqual(os.read(d, 1024), "orange")
+
+    def test_stdout_fileobj(self):
+        # stdout is set to open file object
+        tf = tempfile.TemporaryFile()
+        p = subprocess.Popen([sys.executable, "-c",
+                          'import sys; sys.stdout.write("orange")'],
+                         stdout=tf)
+        p.wait()
+        tf.seek(0)
+        self.assertEqual(tf.read(), "orange")
+
+    def test_stderr_pipe(self):
+        # stderr redirection
+        p = subprocess.Popen([sys.executable, "-c",
+                          'import sys; sys.stderr.write("strawberry")'],
+                         stderr=subprocess.PIPE)
+        self.assertEqual(remove_stderr_debug_decorations(p.stderr.read()),
+                         "strawberry")
+
+    def test_stderr_filedes(self):
+        # stderr is set to open file descriptor
+        tf = tempfile.TemporaryFile()
+        d = tf.fileno()
+        p = subprocess.Popen([sys.executable, "-c",
+                          'import sys; sys.stderr.write("strawberry")'],
+                         stderr=d)
+        p.wait()
+        os.lseek(d, 0, 0)
+        self.assertEqual(remove_stderr_debug_decorations(os.read(d, 1024)),
+                         "strawberry")
+
+    def test_stderr_fileobj(self):
+        # stderr is set to open file object
+        tf = tempfile.TemporaryFile()
+        p = subprocess.Popen([sys.executable, "-c",
+                          'import sys; sys.stderr.write("strawberry")'],
+                         stderr=tf)
+        p.wait()
+        tf.seek(0)
+        self.assertEqual(remove_stderr_debug_decorations(tf.read()),
+                         "strawberry")
+
+    def test_stdout_stderr_pipe(self):
+        # capture stdout and stderr to the same pipe
+        p = subprocess.Popen([sys.executable, "-c",
+                          'import sys;' \
+                          'sys.stdout.write("apple");' \
+                          'sys.stdout.flush();' \
+                          'sys.stderr.write("orange")'],
+                         stdout=subprocess.PIPE,
+                         stderr=subprocess.STDOUT)
+        output = p.stdout.read()
+        stripped = remove_stderr_debug_decorations(output)
+        self.assertEqual(stripped, "appleorange")
+
+    def test_stdout_stderr_file(self):
+        # capture stdout and stderr to the same open file
+        tf = tempfile.TemporaryFile()
+        p = subprocess.Popen([sys.executable, "-c",
+                          'import sys;' \
+                          'sys.stdout.write("apple");' \
+                          'sys.stdout.flush();' \
+                          'sys.stderr.write("orange")'],
+                         stdout=tf,
+                         stderr=tf)
+        p.wait()
+        tf.seek(0)
+        output = tf.read()
+        stripped = remove_stderr_debug_decorations(output)
+        self.assertEqual(stripped, "appleorange")
+
+    def test_cwd(self):
+        tmpdir = os.getenv("TEMP", "/tmp")
+        # We cannot use os.path.realpath to canonicalize the path,
+        # since it doesn't expand Tru64 {memb} strings. See bug 1063571.
+        cwd = os.getcwd()
+        os.chdir(tmpdir)
+        tmpdir = os.getcwd()
+        os.chdir(cwd)
+        p = subprocess.Popen([sys.executable, "-c",
+                          'import sys,os;' \
+                          'sys.stdout.write(os.getcwd())'],
+                         stdout=subprocess.PIPE,
+                         cwd=tmpdir)
+        normcase = os.path.normcase
+        self.assertEqual(normcase(p.stdout.read()), normcase(tmpdir))
+
+    def test_env(self):
+        newenv = os.environ.copy()
+        newenv["FRUIT"] = "orange"
+        p = subprocess.Popen([sys.executable, "-c",
+                          'import sys,os;' \
+                          'sys.stdout.write(os.getenv("FRUIT"))'],
+                         stdout=subprocess.PIPE,
+                         env=newenv)
+        self.assertEqual(p.stdout.read(), "orange")
+
+    def test_communicate(self):
+        p = subprocess.Popen([sys.executable, "-c",
+                          'import sys,os;' \
+                          'sys.stderr.write("pineapple");' \
+                          'sys.stdout.write(sys.stdin.read())'],
+                         stdin=subprocess.PIPE,
+                         stdout=subprocess.PIPE,
+                         stderr=subprocess.PIPE)
+        (stdout, stderr) = p.communicate("banana")
+        self.assertEqual(stdout, "banana")
+        self.assertEqual(remove_stderr_debug_decorations(stderr),
+                         "pineapple")
+
+    def test_communicate_returns(self):
+        # communicate() should return None if no redirection is active
+        p = subprocess.Popen([sys.executable, "-c",
+                              "import sys; sys.exit(47)"])
+        (stdout, stderr) = p.communicate()
+        self.assertEqual(stdout, None)
+        self.assertEqual(stderr, None)
+
+    def test_communicate_pipe_buf(self):
+        # communicate() with writes larger than pipe_buf
+        # This test will probably deadlock rather than fail, if
+        # communicate() does not work properly.
+        x, y = os.pipe()
+        if mswindows:
+            pipe_buf = 512
+        else:
+            pipe_buf = os.fpathconf(x, "PC_PIPE_BUF")
+        os.close(x)
+        os.close(y)
+        p = subprocess.Popen([sys.executable, "-c",
+                          'import sys,os;'
+                          'sys.stdout.write(sys.stdin.read(47));' \
+                          'sys.stderr.write("xyz"*%d);' \
+                          'sys.stdout.write(sys.stdin.read())' % pipe_buf],
+                         stdin=subprocess.PIPE,
+                         stdout=subprocess.PIPE,
+                         stderr=subprocess.PIPE)
+        string_to_write = "abc"*pipe_buf
+        (stdout, stderr) = p.communicate(string_to_write)
+        self.assertEqual(stdout, string_to_write)
+
+    def test_writes_before_communicate(self):
+        # stdin.write before communicate()
+        p = subprocess.Popen([sys.executable, "-c",
+                          'import sys,os;' \
+                          'sys.stdout.write(sys.stdin.read())'],
+                         stdin=subprocess.PIPE,
+                         stdout=subprocess.PIPE,
+                         stderr=subprocess.PIPE)
+        p.stdin.write("banana")
+        (stdout, stderr) = p.communicate("split")
+        self.assertEqual(stdout, "bananasplit")
+        self.assertEqual(remove_stderr_debug_decorations(stderr), "")
+
+    def test_universal_newlines(self):
+        p = subprocess.Popen([sys.executable, "-c",
+                          'import sys,os;' + SETBINARY +
+                          'sys.stdout.write("line1\\n");'
+                          'sys.stdout.flush();'
+                          'sys.stdout.write("line2\\r");'
+                          'sys.stdout.flush();'
+                          'sys.stdout.write("line3\\r\\n");'
+                          'sys.stdout.flush();'
+                          'sys.stdout.write("line4\\r");'
+                          'sys.stdout.flush();'
+                          'sys.stdout.write("\\nline5");'
+                          'sys.stdout.flush();'
+                          'sys.stdout.write("\\nline6");'],
+                         stdout=subprocess.PIPE,
+                         universal_newlines=1)
+        stdout = p.stdout.read()
+        if hasattr(open, 'newlines'):
+            # Interpreter with universal newline support
+            self.assertEqual(stdout,
+                             "line1\nline2\nline3\nline4\nline5\nline6")
+        else:
+            # Interpreter without universal newline support
+            self.assertEqual(stdout,
+                             "line1\nline2\rline3\r\nline4\r\nline5\nline6")
+
+    def test_universal_newlines_communicate(self):
+        # universal newlines through communicate()
+        p = subprocess.Popen([sys.executable, "-c",
+                          'import sys,os;' + SETBINARY +
+                          'sys.stdout.write("line1\\n");'
+                          'sys.stdout.flush();'
+                          'sys.stdout.write("line2\\r");'
+                          'sys.stdout.flush();'
+                          'sys.stdout.write("line3\\r\\n");'
+                          'sys.stdout.flush();'
+                          'sys.stdout.write("line4\\r");'
+                          'sys.stdout.flush();'
+                          'sys.stdout.write("\\nline5");'
+                          'sys.stdout.flush();'
+                          'sys.stdout.write("\\nline6");'],
+                         stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+                         universal_newlines=1)
+        (stdout, stderr) = p.communicate()
+        if hasattr(open, 'newlines'):
+            # Interpreter with universal newline support
+            self.assertEqual(stdout,
+                             "line1\nline2\nline3\nline4\nline5\nline6")
+        else:
+            # Interpreter without universal newline support
+            self.assertEqual(stdout, "line1\nline2\rline3\r\nline4\r\nline5\nline6")
+
+    def test_no_leaking(self):
+        # Make sure we leak no resources
+        if test_support.is_resource_enabled("subprocess") and not mswindows:
+            max_handles = 1026 # too much for most UNIX systems
+        else:
+            max_handles = 65
+        for i in range(max_handles):
+            p = subprocess.Popen([sys.executable, "-c",
+                    "import sys;sys.stdout.write(sys.stdin.read())"],
+                    stdin=subprocess.PIPE,
+                    stdout=subprocess.PIPE,
+                    stderr=subprocess.PIPE)
+            data = p.communicate("lime")[0]
+            self.assertEqual(data, "lime")
+
+
+    def test_list2cmdline(self):
+        self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']),
+                         '"a b c" d e')
+        self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']),
+                         'ab\\"c \\ d')
+        self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']),
+                         'a\\\\\\b "de fg" h')
+        self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']),
+                         'a\\\\\\"b c d')
+        self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']),
+                         '"a\\\\b c" d e')
+        self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']),
+                         '"a\\\\b\\ c" d e')
+
+
+    def test_poll(self):
+        p = subprocess.Popen([sys.executable,
+                          "-c", "import time; time.sleep(1)"])
+        count = 0
+        while p.poll() is None:
+            time.sleep(0.1)
+            count += 1
+        # We expect that the poll loop probably went around about 10 times,
+        # but, based on system scheduling we can't control, it's possible
+        # poll() never returned None.  It "should be" very rare that it
+        # didn't go around at least twice.
+        self.assert_(count >= 2)
+        # Subsequent invocations should just return the returncode
+        self.assertEqual(p.poll(), 0)
+
+
+    def test_wait(self):
+        p = subprocess.Popen([sys.executable,
+                          "-c", "import time; time.sleep(2)"])
+        self.assertEqual(p.wait(), 0)
+        # Subsequent invocations should just return the returncode
+        self.assertEqual(p.wait(), 0)
+
+
+    def test_invalid_bufsize(self):
+        # an invalid type of the bufsize argument should raise
+        # TypeError.
+        try:
+            subprocess.Popen([sys.executable, "-c", "pass"], "orange")
+        except TypeError:
+            pass
+        else:
+            self.fail("Expected TypeError")
+
+    #
+    # POSIX tests
+    #
+    if not mswindows:
+        def test_exceptions(self):
+            # catched & re-raised exceptions
+            try:
+                p = subprocess.Popen([sys.executable, "-c", ""],
+                                 cwd="/this/path/does/not/exist")
+            except OSError, e:
+                # The attribute child_traceback should contain "os.chdir"
+                # somewhere.
+                self.assertNotEqual(e.child_traceback.find("os.chdir"), -1)
+            else:
+                self.fail("Expected OSError")
+
+        def test_run_abort(self):
+            # returncode handles signal termination
+            p = subprocess.Popen([sys.executable,
+                                  "-c", "import os; os.abort()"])
+            p.wait()
+            self.assertEqual(-p.returncode, signal.SIGABRT)
+
+        def test_preexec(self):
+            # preexec function
+            p = subprocess.Popen([sys.executable, "-c",
+                              'import sys,os;' \
+                              'sys.stdout.write(os.getenv("FRUIT"))'],
+                             stdout=subprocess.PIPE,
+                             preexec_fn=lambda: os.putenv("FRUIT", "apple"))
+            self.assertEqual(p.stdout.read(), "apple")
+
+        def test_args_string(self):
+            # args is a string
+            f, fname = self.mkstemp()
+            os.write(f, "#!/bin/sh\n")
+            os.write(f, "exec %s -c 'import sys; sys.exit(47)'\n" %
+                        sys.executable)
+            os.close(f)
+            os.chmod(fname, 0700)
+            p = subprocess.Popen(fname)
+            p.wait()
+            os.remove(fname)
+            self.assertEqual(p.returncode, 47)
+
+        def test_invalid_args(self):
+            # invalid arguments should raise ValueError
+            self.assertRaises(ValueError, subprocess.call,
+                              [sys.executable,
+                               "-c", "import sys; sys.exit(47)"],
+                              startupinfo=47)
+            self.assertRaises(ValueError, subprocess.call,
+                              [sys.executable,
+                               "-c", "import sys; sys.exit(47)"],
+                              creationflags=47)
+
+        def test_shell_sequence(self):
+            # Run command through the shell (sequence)
+            newenv = os.environ.copy()
+            newenv["FRUIT"] = "apple"
+            p = subprocess.Popen(["echo $FRUIT"], shell=1,
+                                 stdout=subprocess.PIPE,
+                                 env=newenv)
+            self.assertEqual(p.stdout.read().strip(), "apple")
+
+        def test_shell_string(self):
+            # Run command through the shell (string)
+            newenv = os.environ.copy()
+            newenv["FRUIT"] = "apple"
+            p = subprocess.Popen("echo $FRUIT", shell=1,
+                                 stdout=subprocess.PIPE,
+                                 env=newenv)
+            self.assertEqual(p.stdout.read().strip(), "apple")
+
+        def test_call_string(self):
+            # call() function with string argument on UNIX
+            f, fname = self.mkstemp()
+            os.write(f, "#!/bin/sh\n")
+            os.write(f, "exec %s -c 'import sys; sys.exit(47)'\n" %
+                        sys.executable)
+            os.close(f)
+            os.chmod(fname, 0700)
+            rc = subprocess.call(fname)
+            os.remove(fname)
+            self.assertEqual(rc, 47)
+
+
+    #
+    # Windows tests
+    #
+    if mswindows:
+        def test_startupinfo(self):
+            # startupinfo argument
+            # We uses hardcoded constants, because we do not want to
+            # depend on win32all.
+            STARTF_USESHOWWINDOW = 1
+            SW_MAXIMIZE = 3
+            startupinfo = subprocess.STARTUPINFO()
+            startupinfo.dwFlags = STARTF_USESHOWWINDOW
+            startupinfo.wShowWindow = SW_MAXIMIZE
+            # Since Python is a console process, it won't be affected
+            # by wShowWindow, but the argument should be silently
+            # ignored
+            subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"],
+                        startupinfo=startupinfo)
+
+        def test_creationflags(self):
+            # creationflags argument
+            CREATE_NEW_CONSOLE = 16
+            sys.stderr.write("    a DOS box should flash briefly ...\n")
+            subprocess.call(sys.executable +
+                                ' -c "import time; time.sleep(0.25)"',
+                            creationflags=CREATE_NEW_CONSOLE)
+
+        def test_invalid_args(self):
+            # invalid arguments should raise ValueError
+            self.assertRaises(ValueError, subprocess.call,
+                              [sys.executable,
+                               "-c", "import sys; sys.exit(47)"],
+                              preexec_fn=lambda: 1)
+            self.assertRaises(ValueError, subprocess.call,
+                              [sys.executable,
+                               "-c", "import sys; sys.exit(47)"],
+                              close_fds=True)
+
+        def test_shell_sequence(self):
+            # Run command through the shell (sequence)
+            newenv = os.environ.copy()
+            newenv["FRUIT"] = "physalis"
+            p = subprocess.Popen(["set"], shell=1,
+                                 stdout=subprocess.PIPE,
+                                 env=newenv)
+            self.assertNotEqual(p.stdout.read().find("physalis"), -1)
+
+        def test_shell_string(self):
+            # Run command through the shell (string)
+            newenv = os.environ.copy()
+            newenv["FRUIT"] = "physalis"
+            p = subprocess.Popen("set", shell=1,
+                                 stdout=subprocess.PIPE,
+                                 env=newenv)
+            self.assertNotEqual(p.stdout.read().find("physalis"), -1)
+
+        def test_call_string(self):
+            # call() function with string argument on Windows
+            rc = subprocess.call(sys.executable +
+                                 ' -c "import sys; sys.exit(47)"')
+            self.assertEqual(rc, 47)
+
+
+def test_main():
+    test_support.run_unittest(ProcessTestCase)
+
+if __name__ == "__main__":
+    test_main()

Modified: py/dist/py/compat/testing/test_textwrap.py
==============================================================================
--- py/dist/py/compat/testing/test_textwrap.py	(original)
+++ py/dist/py/compat/testing/test_textwrap.py	Tue Jan  9 17:55:49 2007
@@ -5,7 +5,7 @@
 # Converted to PyUnit by Peter Hansen <peter at engcorp.com>.
 # Currently maintained by Greg Ward.
 #
-# $Id: test_textwrap.py,v 1.27.4.1 2005/03/05 02:38:33 gward Exp $
+# $Id: test_textwrap.py 38573 2005-03-05 02:38:33Z gward $
 #
 
 import unittest
@@ -18,7 +18,6 @@
 fill = textwrap.fill
 dedent = textwrap.dedent
 
-
 class BaseTestCase(unittest.TestCase):
     '''Parent class with utility methods for textwrap tests.'''
 

Modified: py/dist/py/compat/textwrap.py
==============================================================================
--- py/dist/py/compat/textwrap.py	(original)
+++ py/dist/py/compat/textwrap.py	Tue Jan  9 17:55:49 2007
@@ -161,7 +161,7 @@
             else:
                 i += 1
 
-    def _handle_long_word(self, chunks, cur_line, cur_len, width):
+    def _handle_long_word(self, reversed_chunks, cur_line, cur_len, width):
         """_handle_long_word(chunks : [string],
                              cur_line : [string],
                              cur_len : int, width : int)
@@ -174,14 +174,14 @@
         # If we're allowed to break long words, then do so: put as much
         # of the next chunk onto the current line as will fit.
         if self.break_long_words:
-            cur_line.append(chunks[0][0:space_left])
-            chunks[0] = chunks[0][space_left:]
+            cur_line.append(reversed_chunks[-1][:space_left])
+            reversed_chunks[-1] = reversed_chunks[-1][space_left:]
 
         # Otherwise, we have to preserve the long word intact.  Only add
         # it to the current line if there's nothing already there --
         # that minimizes how much we violate the width constraint.
         elif not cur_line:
-            cur_line.append(chunks.pop(0))
+            cur_line.append(reversed_chunks.pop())
 
         # If we're not allowed to break long words, and there's already
         # text on the current line, do nothing.  Next time through the
@@ -206,6 +206,10 @@
         if self.width <= 0:
             raise ValueError("invalid width %r (must be > 0)" % self.width)
 
+        # Arrange in reverse order so items can be efficiently popped
+        # from a stack of chucks.
+        chunks.reverse()
+
         while chunks:
 
             # Start the list of chunks that will make up the current line.
@@ -224,15 +228,15 @@
 
             # First chunk on line is whitespace -- drop it, unless this
             # is the very beginning of the text (ie. no lines started yet).
-            if chunks[0].strip() == '' and lines:
-                del chunks[0]
+            if chunks[-1].strip() == '' and lines:
+                del chunks[-1]
 
             while chunks:
-                l = len(chunks[0])
+                l = len(chunks[-1])
 
                 # Can at least squeeze this chunk onto the current line.
                 if cur_len + l <= width:
-                    cur_line.append(chunks.pop(0))
+                    cur_line.append(chunks.pop())
                     cur_len += l
 
                 # Nope, this line is full.
@@ -241,7 +245,7 @@
 
             # The current line is full, and the next chunk is too big to
             # fit on *any* line (not just this one).
-            if chunks and len(chunks[0]) > width:
+            if chunks and len(chunks[-1]) > width:
                 self._handle_long_word(chunks, cur_line, cur_len, width)
 
             # If the last chunk on this line is all whitespace, drop it.
@@ -268,7 +272,6 @@
         converted to space.
         """
         text = self._munge_whitespace(text)
-        indent = self.initial_indent
         chunks = self._split(text)
         if self.fix_sentence_endings:
             self._fix_sentence_endings(chunks)



More information about the pytest-commit mailing list