[py-svn] r36379 - in py/dist/py: . compat compat/testing
cfbolz at codespeak.net
cfbolz at codespeak.net
Tue Jan 9 17:55:55 CET 2007
Author: cfbolz
Date: Tue Jan 9 17:55:49 2007
New Revision: 36379
Added:
py/dist/py/compat/testing/test_subprocess.py
Modified:
py/dist/py/__init__.py
py/dist/py/compat/doctest.py
py/dist/py/compat/optparse.py
py/dist/py/compat/subprocess.py
py/dist/py/compat/testing/test_doctest.py
py/dist/py/compat/testing/test_doctest2.py
py/dist/py/compat/testing/test_optparse.py
py/dist/py/compat/testing/test_textwrap.py
py/dist/py/compat/textwrap.py
Log:
import the newest versions of the modules in compat from python 2.4.4
Modified: py/dist/py/__init__.py
==============================================================================
--- py/dist/py/__init__.py (original)
+++ py/dist/py/__init__.py Tue Jan 9 17:55:49 2007
@@ -130,7 +130,7 @@
'log.Syslog' : ('./log/consumer.py', 'Syslog'),
'log.get' : ('./log/logger.py', 'get'),
- # compatibility modules (taken from 2.4.1)
+ # compatibility modules (taken from 2.4.4)
'compat.doctest' : ('./compat/doctest.py', '*'),
'compat.optparse' : ('./compat/optparse.py', '*'),
'compat.textwrap' : ('./compat/textwrap.py', '*'),
Modified: py/dist/py/compat/doctest.py
==============================================================================
--- py/dist/py/compat/doctest.py (original)
+++ py/dist/py/compat/doctest.py Tue Jan 9 17:55:49 2007
@@ -128,9 +128,8 @@
OPTIONFLAGS_BY_NAME = {}
def register_optionflag(name):
- flag = 1 << len(OPTIONFLAGS_BY_NAME)
- OPTIONFLAGS_BY_NAME[name] = flag
- return flag
+ # Create a new flag unless `name` is already known.
+ return OPTIONFLAGS_BY_NAME.setdefault(name, 1 << len(OPTIONFLAGS_BY_NAME))
DONT_ACCEPT_TRUE_FOR_1 = register_optionflag('DONT_ACCEPT_TRUE_FOR_1')
DONT_ACCEPT_BLANKLINE = register_optionflag('DONT_ACCEPT_BLANKLINE')
@@ -849,6 +848,11 @@
# Recursively expore `obj`, extracting DocTests.
tests = []
self._find(tests, obj, name, module, source_lines, globs, {})
+ # Sort the tests by alpha order of names, for consistency in
+ # verbose-mode output. This was a feature of doctest in Pythons
+ # <= 2.3 that got lost by accident in 2.4. It was repaired in
+ # 2.4.4 and 2.5.
+ tests.sort()
return tests
def _filter(self, obj, prefix, base):
@@ -1045,12 +1049,13 @@
>>> tests = DocTestFinder().find(_TestClass)
>>> runner = DocTestRunner(verbose=False)
+ >>> tests.sort(key = lambda test: test.name)
>>> for test in tests:
- ... print runner.run(test)
- (0, 2)
- (0, 1)
- (0, 2)
- (0, 2)
+ ... print test.name, '->', runner.run(test)
+ _TestClass -> (0, 2)
+ _TestClass.__init__ -> (0, 2)
+ _TestClass.get -> (0, 2)
+ _TestClass.square -> (0, 1)
The `summarize` method prints a summary of all the test cases that
have been run by the runner, and returns an aggregated `(f, t)`
@@ -2472,6 +2477,7 @@
blah
#
# Ho hum
+ <BLANKLINE>
"""
output = []
for piece in DocTestParser().parse(s):
@@ -2494,7 +2500,8 @@
while output and output[0] == '#':
output.pop(0)
# Combine the output, and return it.
- return '\n'.join(output)
+ # Add a courtesy newline to prevent exec from choking (see bug #1172785)
+ return '\n'.join(output) + '\n'
def testsource(module, name):
"""Extract the test sources from a doctest docstring as a script.
Modified: py/dist/py/compat/optparse.py
==============================================================================
--- py/dist/py/compat/optparse.py (original)
+++ py/dist/py/compat/optparse.py Tue Jan 9 17:55:49 2007
@@ -68,12 +68,9 @@
import sys, os
import types
+import textwrap
from gettext import gettext as _
-import py
-textwrap = py.compat.textwrap
-
-
def _repr(self):
return "<%s at 0x%x: %s>" % (self.__class__.__name__, id(self), self)
@@ -550,8 +547,10 @@
else:
setattr(self, attr, None)
if attrs:
+ attrs = attrs.keys()
+ attrs.sort()
raise OptionError(
- "invalid keyword arguments: %s" % ", ".join(attrs.keys()),
+ "invalid keyword arguments: %s" % ", ".join(attrs),
self)
@@ -1559,6 +1558,7 @@
raise BadOptionError(_("no such option: %s") % s)
else:
# More than one possible completion: ambiguous prefix.
+ possibilities.sort()
raise BadOptionError(_("ambiguous option: %s (%s?)")
% (s, ", ".join(possibilities)))
Modified: py/dist/py/compat/subprocess.py
==============================================================================
--- py/dist/py/compat/subprocess.py (original)
+++ py/dist/py/compat/subprocess.py Tue Jan 9 17:55:49 2007
@@ -1,32 +1,13 @@
-# Copied from CPython 2.4 with one modification to dynamically
-# fall back to pywin32 code when _subprocess is not available.
-
# subprocess - Subprocesses with accessible I/O streams
#
# For more information about this module, see PEP 324.
#
-# Copyright (c) 2003-2004 by Peter Astrand <astrand at lysator.liu.se>
-#
-# By obtaining, using, and/or copying this software and/or its
-# associated documentation, you agree that you have read, understood,
-# and will comply with the following terms and conditions:
+# This module should remain compatible with Python 2.2, see PEP 291.
#
-# Permission to use, copy, modify, and distribute this software and
-# its associated documentation for any purpose and without fee is
-# hereby granted, provided that the above copyright notice appears in
-# all copies, and that both that copyright notice and this permission
-# notice appear in supporting documentation, and that the name of the
-# author not be used in advertising or publicity pertaining to
-# distribution of the software without specific, written prior
-# permission.
+# Copyright (c) 2003-2005 by Peter Astrand <astrand at lysator.liu.se>
#
-# THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
-# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS.
-# IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, INDIRECT OR
-# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
-# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
-# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
-# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+# Licensed to PSF under a Contributor Agreement.
+# See http://www.python.org/2.4/license for licensing details.
r"""subprocess - Subprocesses with accessible I/O streams
@@ -369,16 +350,7 @@
if mswindows:
import threading
import msvcrt
- try:
- from _subprocess import *
- class STARTUPINFO:
- dwFlags = 0
- hStdInput = None
- hStdOutput = None
- hStdError = None
- class pywintypes:
- error = IOError
- except ImportError:
+ if 0: # <-- change this to use pywin32 instead of the _subprocess driver
import pywintypes
from win32api import GetStdHandle, STD_INPUT_HANDLE, \
STD_OUTPUT_HANDLE, STD_ERROR_HANDLE
@@ -390,6 +362,16 @@
GetExitCodeProcess, STARTF_USESTDHANDLES, \
STARTF_USESHOWWINDOW, CREATE_NEW_CONSOLE
from win32event import WaitForSingleObject, INFINITE, WAIT_OBJECT_0
+ else:
+ from _subprocess import *
+ class STARTUPINFO:
+ dwFlags = 0
+ hStdInput = None
+ hStdOutput = None
+ hStdError = None
+ wShowWindow = 0
+ class pywintypes:
+ error = IOError
else:
import select
import errno
@@ -681,18 +663,17 @@
args = list2cmdline(args)
# Process startup details
- default_startupinfo = STARTUPINFO()
if startupinfo == None:
- startupinfo = default_startupinfo
- if not None in (p2cread, c2pwrite, errwrite):
+ startupinfo = STARTUPINFO()
+ if None not in (p2cread, c2pwrite, errwrite):
startupinfo.dwFlags |= STARTF_USESTDHANDLES
startupinfo.hStdInput = p2cread
startupinfo.hStdOutput = c2pwrite
startupinfo.hStdError = errwrite
if shell:
- default_startupinfo.dwFlags |= STARTF_USESHOWWINDOW
- default_startupinfo.wShowWindow = SW_HIDE
+ startupinfo.dwFlags |= STARTF_USESHOWWINDOW
+ startupinfo.wShowWindow = SW_HIDE
comspec = os.environ.get("COMSPEC", "cmd.exe")
args = comspec + " /c " + args
if (GetVersion() >= 0x80000000L or
@@ -886,7 +867,7 @@
def _close_fds(self, but):
- for i in range(3, MAXFD):
+ for i in xrange(3, MAXFD):
if i == but:
continue
try:
Modified: py/dist/py/compat/testing/test_doctest.py
==============================================================================
--- py/dist/py/compat/testing/test_doctest.py (original)
+++ py/dist/py/compat/testing/test_doctest.py Tue Jan 9 17:55:49 2007
@@ -2,14 +2,15 @@
Test script for doctest.
"""
+from test import test_support
+import warnings
+
import py
doctest = py.compat.doctest
import sys
sys.modules['doctest'] = py.compat.doctest
-from test import test_support
-import warnings
######################################################################
## Sample Objects (used by test cases)
@@ -424,7 +425,6 @@
>>> finder = doctest.DocTestFinder()
>>> tests = finder.find(SampleClass)
- >>> tests.sort()
>>> for t in tests:
... print '%2s %s' % (len(t.examples), t.name)
3 SampleClass
@@ -440,7 +440,6 @@
New-style classes are also supported:
>>> tests = finder.find(SampleNewStyleClass)
- >>> tests.sort()
>>> for t in tests:
... print '%2s %s' % (len(t.examples), t.name)
1 SampleNewStyleClass
@@ -480,7 +479,6 @@
>>> # ignoring the objects since they weren't defined in m.
>>> import test.test_doctest
>>> tests = finder.find(m, module=test.test_doctest)
- >>> tests.sort()
>>> for t in tests:
... print '%2s %s' % (len(t.examples), t.name)
1 some_module
@@ -504,7 +502,6 @@
>>> from test import doctest_aliases
>>> tests = excl_empty_finder.find(doctest_aliases)
- >>> tests.sort()
>>> print len(tests)
2
>>> print tests[0].name
@@ -526,7 +523,6 @@
>>> def namefilter(prefix, base):
... return base.startswith('a_')
>>> tests = doctest.DocTestFinder(_namefilter=namefilter).find(SampleClass)
- >>> tests.sort()
>>> for t in tests:
... print '%2s %s' % (len(t.examples), t.name)
3 SampleClass
@@ -543,7 +539,6 @@
>>> tests = doctest.DocTestFinder(_namefilter=namefilter,
... exclude_empty=False).find(SampleClass)
- >>> tests.sort()
>>> for t in tests:
... print '%2s %s' % (len(t.examples), t.name)
3 SampleClass
@@ -587,7 +582,6 @@
using the `recurse` flag:
>>> tests = doctest.DocTestFinder(recurse=False).find(SampleClass)
- >>> tests.sort()
>>> for t in tests:
... print '%2s %s' % (len(t.examples), t.name)
3 SampleClass
@@ -1286,6 +1280,26 @@
ValueError: 2
(3, 5)
+New option flags can also be registered, via register_optionflag(). Here
+we reach into doctest's internals a bit.
+
+ >>> unlikely = "UNLIKELY_OPTION_NAME"
+ >>> unlikely in doctest.OPTIONFLAGS_BY_NAME
+ False
+ >>> new_flag_value = doctest.register_optionflag(unlikely)
+ >>> unlikely in doctest.OPTIONFLAGS_BY_NAME
+ True
+
+Before 2.4.4/2.5, registering a name more than once erroneously created
+more than one flag value. Here we verify that's fixed:
+
+ >>> redundant_flag_value = doctest.register_optionflag(unlikely)
+ >>> redundant_flag_value == new_flag_value
+ True
+
+Clean up.
+ >>> del doctest.OPTIONFLAGS_BY_NAME[unlikely]
+
"""
def option_directives(): r"""
@@ -1522,6 +1536,7 @@
## 44
#
# Yee ha!
+ <BLANKLINE>
>>> name = 'test.test_doctest.SampleNewStyleClass'
>>> print doctest.testsource(test.test_doctest, name)
@@ -1530,6 +1545,7 @@
## 1
## 2
## 3
+ <BLANKLINE>
>>> name = 'test.test_doctest.SampleClass.a_classmethod'
>>> print doctest.testsource(test.test_doctest, name)
@@ -1539,6 +1555,7 @@
print SampleClass(0).a_classmethod(10)
# Expected:
## 12
+ <BLANKLINE>
"""
def test_debug(): r"""
Modified: py/dist/py/compat/testing/test_doctest2.py
==============================================================================
--- py/dist/py/compat/testing/test_doctest2.py (original)
+++ py/dist/py/compat/testing/test_doctest2.py Tue Jan 9 17:55:49 2007
@@ -11,8 +11,6 @@
ЉЊЈЁЂ
"""
-from _findpy import py
-print py.__file__
from test import test_support
Modified: py/dist/py/compat/testing/test_optparse.py
==============================================================================
--- py/dist/py/compat/testing/test_optparse.py (original)
+++ py/dist/py/compat/testing/test_optparse.py Tue Jan 9 17:55:49 2007
@@ -5,7 +5,7 @@
# (taradino at softhome.net) -- translated from the original Optik
# test suite to this PyUnit-based version.
#
-# $Id: test_optparse.py,v 1.10 2004/10/27 02:43:25 tim_one Exp $
+# $Id: test_optparse.py 46506 2006-05-28 18:15:43Z armin.rigo $
#
import sys
@@ -19,21 +19,13 @@
import py
optparse = py.compat.optparse
-make_option = optparse.make_option
-Option = optparse.Option
-IndentedHelpFormatter = optparse.IndentedHelpFormatter
-TitledHelpFormatter = optparse.TitledHelpFormatter
-OptionParser = optparse.OptionParser
-OptionContainer = optparse.OptionContainer
-OptionGroup = optparse.OptionGroup
-SUPPRESS_HELP = optparse.SUPPRESS_HELP
-SUPPRESS_USAGE = optparse.SUPPRESS_USAGE
-OptionError = optparse.OptionError
-OptionConflictError = optparse.OptionConflictError
-BadOptionError = optparse.BadOptionError
-OptionValueError = optparse.OptionValueError
-Values = optparse.Values
-_match_abbrev = optparse._match_abbrev
+import sys
+sys.modules['optparse'] = optparse
+
+from optparse import make_option, Option, IndentedHelpFormatter, \
+ TitledHelpFormatter, OptionParser, OptionContainer, OptionGroup, \
+ SUPPRESS_HELP, SUPPRESS_USAGE, OptionError, OptionConflictError, \
+ BadOptionError, OptionValueError, Values, _match_abbrev
# Do the right thing with boolean values for all known Python versions.
try:
@@ -225,7 +217,7 @@
def test_attr_invalid(self):
self.assertOptionError(
- "option -b: invalid keyword arguments: foo, bar",
+ "option -b: invalid keyword arguments: bar, foo",
["-b"], {'foo': None, 'bar': None})
def test_action_invalid(self):
@@ -688,9 +680,8 @@
def test_ambiguous_option(self):
self.parser.add_option("--foz", action="store",
type="string", dest="foo")
- possibilities = ", ".join({"--foz": None, "--foo": None}.keys())
self.assertParseFail(["--f=bar"],
- "ambiguous option: --f (%s?)" % possibilities)
+ "ambiguous option: --f (--foo, --foz?)")
def test_short_and_long_option_split(self):
@@ -1490,10 +1481,9 @@
def test_match_abbrev_error(self):
s = "--f"
wordmap = {"--foz": None, "--foo": None, "--fie": None}
- possibilities = ", ".join(wordmap.keys())
self.assertRaises(
_match_abbrev, (s, wordmap), None,
- BadOptionError, "ambiguous option: --f (%s?)" % possibilities)
+ BadOptionError, "ambiguous option: --f (--fie, --foo, --foz?)")
def _testclasses():
Added: py/dist/py/compat/testing/test_subprocess.py
==============================================================================
--- (empty file)
+++ py/dist/py/compat/testing/test_subprocess.py Tue Jan 9 17:55:49 2007
@@ -0,0 +1,563 @@
+import unittest
+from test import test_support
+from py.compat import subprocess
+import sys
+import signal
+import os
+import tempfile
+import time
+import re
+
+mswindows = (sys.platform == "win32")
+
+#
+# Depends on the following external programs: Python
+#
+
+if mswindows:
+ SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), '
+ 'os.O_BINARY);')
+else:
+ SETBINARY = ''
+
+# In a debug build, stuff like "[6580 refs]" is printed to stderr at
+# shutdown time. That frustrates tests trying to check stderr produced
+# from a spawned Python process.
+def remove_stderr_debug_decorations(stderr):
+ return re.sub(r"\[\d+ refs\]\r?\n?$", "", stderr)
+
+class ProcessTestCase(unittest.TestCase):
+ def mkstemp(self):
+ """wrapper for mkstemp, calling mktemp if mkstemp is not available"""
+ if hasattr(tempfile, "mkstemp"):
+ return tempfile.mkstemp()
+ else:
+ fname = tempfile.mktemp()
+ return os.open(fname, os.O_RDWR|os.O_CREAT), fname
+
+ #
+ # Generic tests
+ #
+ def test_call_seq(self):
+ # call() function with sequence argument
+ rc = subprocess.call([sys.executable, "-c",
+ "import sys; sys.exit(47)"])
+ self.assertEqual(rc, 47)
+
+ def test_call_kwargs(self):
+ # call() function with keyword args
+ newenv = os.environ.copy()
+ newenv["FRUIT"] = "banana"
+ rc = subprocess.call([sys.executable, "-c",
+ 'import sys, os;' \
+ 'sys.exit(os.getenv("FRUIT")=="banana")'],
+ env=newenv)
+ self.assertEqual(rc, 1)
+
+ def test_stdin_none(self):
+ # .stdin is None when not redirected
+ p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ p.wait()
+ self.assertEqual(p.stdin, None)
+
+ def test_stdout_none(self):
+ # .stdout is None when not redirected
+ p = subprocess.Popen([sys.executable, "-c",
+ 'print " this bit of output is from a '
+ 'test of stdout in a different '
+ 'process ..."'],
+ stdin=subprocess.PIPE, stderr=subprocess.PIPE)
+ p.wait()
+ self.assertEqual(p.stdout, None)
+
+ def test_stderr_none(self):
+ # .stderr is None when not redirected
+ p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE)
+ p.wait()
+ self.assertEqual(p.stderr, None)
+
+ def test_executable(self):
+ p = subprocess.Popen(["somethingyoudonthave",
+ "-c", "import sys; sys.exit(47)"],
+ executable=sys.executable)
+ p.wait()
+ self.assertEqual(p.returncode, 47)
+
+ def test_stdin_pipe(self):
+ # stdin redirection
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys; sys.exit(sys.stdin.read() == "pear")'],
+ stdin=subprocess.PIPE)
+ p.stdin.write("pear")
+ p.stdin.close()
+ p.wait()
+ self.assertEqual(p.returncode, 1)
+
+ def test_stdin_filedes(self):
+ # stdin is set to open file descriptor
+ tf = tempfile.TemporaryFile()
+ d = tf.fileno()
+ os.write(d, "pear")
+ os.lseek(d, 0, 0)
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys; sys.exit(sys.stdin.read() == "pear")'],
+ stdin=d)
+ p.wait()
+ self.assertEqual(p.returncode, 1)
+
+ def test_stdin_fileobj(self):
+ # stdin is set to open file object
+ tf = tempfile.TemporaryFile()
+ tf.write("pear")
+ tf.seek(0)
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys; sys.exit(sys.stdin.read() == "pear")'],
+ stdin=tf)
+ p.wait()
+ self.assertEqual(p.returncode, 1)
+
+ def test_stdout_pipe(self):
+ # stdout redirection
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys; sys.stdout.write("orange")'],
+ stdout=subprocess.PIPE)
+ self.assertEqual(p.stdout.read(), "orange")
+
+ def test_stdout_filedes(self):
+ # stdout is set to open file descriptor
+ tf = tempfile.TemporaryFile()
+ d = tf.fileno()
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys; sys.stdout.write("orange")'],
+ stdout=d)
+ p.wait()
+ os.lseek(d, 0, 0)
+ self.assertEqual(os.read(d, 1024), "orange")
+
+ def test_stdout_fileobj(self):
+ # stdout is set to open file object
+ tf = tempfile.TemporaryFile()
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys; sys.stdout.write("orange")'],
+ stdout=tf)
+ p.wait()
+ tf.seek(0)
+ self.assertEqual(tf.read(), "orange")
+
+ def test_stderr_pipe(self):
+ # stderr redirection
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys; sys.stderr.write("strawberry")'],
+ stderr=subprocess.PIPE)
+ self.assertEqual(remove_stderr_debug_decorations(p.stderr.read()),
+ "strawberry")
+
+ def test_stderr_filedes(self):
+ # stderr is set to open file descriptor
+ tf = tempfile.TemporaryFile()
+ d = tf.fileno()
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys; sys.stderr.write("strawberry")'],
+ stderr=d)
+ p.wait()
+ os.lseek(d, 0, 0)
+ self.assertEqual(remove_stderr_debug_decorations(os.read(d, 1024)),
+ "strawberry")
+
+ def test_stderr_fileobj(self):
+ # stderr is set to open file object
+ tf = tempfile.TemporaryFile()
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys; sys.stderr.write("strawberry")'],
+ stderr=tf)
+ p.wait()
+ tf.seek(0)
+ self.assertEqual(remove_stderr_debug_decorations(tf.read()),
+ "strawberry")
+
+ def test_stdout_stderr_pipe(self):
+ # capture stdout and stderr to the same pipe
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys;' \
+ 'sys.stdout.write("apple");' \
+ 'sys.stdout.flush();' \
+ 'sys.stderr.write("orange")'],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
+ output = p.stdout.read()
+ stripped = remove_stderr_debug_decorations(output)
+ self.assertEqual(stripped, "appleorange")
+
+ def test_stdout_stderr_file(self):
+ # capture stdout and stderr to the same open file
+ tf = tempfile.TemporaryFile()
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys;' \
+ 'sys.stdout.write("apple");' \
+ 'sys.stdout.flush();' \
+ 'sys.stderr.write("orange")'],
+ stdout=tf,
+ stderr=tf)
+ p.wait()
+ tf.seek(0)
+ output = tf.read()
+ stripped = remove_stderr_debug_decorations(output)
+ self.assertEqual(stripped, "appleorange")
+
+ def test_cwd(self):
+ tmpdir = os.getenv("TEMP", "/tmp")
+ # We cannot use os.path.realpath to canonicalize the path,
+ # since it doesn't expand Tru64 {memb} strings. See bug 1063571.
+ cwd = os.getcwd()
+ os.chdir(tmpdir)
+ tmpdir = os.getcwd()
+ os.chdir(cwd)
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys,os;' \
+ 'sys.stdout.write(os.getcwd())'],
+ stdout=subprocess.PIPE,
+ cwd=tmpdir)
+ normcase = os.path.normcase
+ self.assertEqual(normcase(p.stdout.read()), normcase(tmpdir))
+
+ def test_env(self):
+ newenv = os.environ.copy()
+ newenv["FRUIT"] = "orange"
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys,os;' \
+ 'sys.stdout.write(os.getenv("FRUIT"))'],
+ stdout=subprocess.PIPE,
+ env=newenv)
+ self.assertEqual(p.stdout.read(), "orange")
+
+ def test_communicate(self):
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys,os;' \
+ 'sys.stderr.write("pineapple");' \
+ 'sys.stdout.write(sys.stdin.read())'],
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ (stdout, stderr) = p.communicate("banana")
+ self.assertEqual(stdout, "banana")
+ self.assertEqual(remove_stderr_debug_decorations(stderr),
+ "pineapple")
+
+ def test_communicate_returns(self):
+ # communicate() should return None if no redirection is active
+ p = subprocess.Popen([sys.executable, "-c",
+ "import sys; sys.exit(47)"])
+ (stdout, stderr) = p.communicate()
+ self.assertEqual(stdout, None)
+ self.assertEqual(stderr, None)
+
+ def test_communicate_pipe_buf(self):
+ # communicate() with writes larger than pipe_buf
+ # This test will probably deadlock rather than fail, if
+ # communicate() does not work properly.
+ x, y = os.pipe()
+ if mswindows:
+ pipe_buf = 512
+ else:
+ pipe_buf = os.fpathconf(x, "PC_PIPE_BUF")
+ os.close(x)
+ os.close(y)
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys,os;'
+ 'sys.stdout.write(sys.stdin.read(47));' \
+ 'sys.stderr.write("xyz"*%d);' \
+ 'sys.stdout.write(sys.stdin.read())' % pipe_buf],
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ string_to_write = "abc"*pipe_buf
+ (stdout, stderr) = p.communicate(string_to_write)
+ self.assertEqual(stdout, string_to_write)
+
+ def test_writes_before_communicate(self):
+ # stdin.write before communicate()
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys,os;' \
+ 'sys.stdout.write(sys.stdin.read())'],
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ p.stdin.write("banana")
+ (stdout, stderr) = p.communicate("split")
+ self.assertEqual(stdout, "bananasplit")
+ self.assertEqual(remove_stderr_debug_decorations(stderr), "")
+
+ def test_universal_newlines(self):
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys,os;' + SETBINARY +
+ 'sys.stdout.write("line1\\n");'
+ 'sys.stdout.flush();'
+ 'sys.stdout.write("line2\\r");'
+ 'sys.stdout.flush();'
+ 'sys.stdout.write("line3\\r\\n");'
+ 'sys.stdout.flush();'
+ 'sys.stdout.write("line4\\r");'
+ 'sys.stdout.flush();'
+ 'sys.stdout.write("\\nline5");'
+ 'sys.stdout.flush();'
+ 'sys.stdout.write("\\nline6");'],
+ stdout=subprocess.PIPE,
+ universal_newlines=1)
+ stdout = p.stdout.read()
+ if hasattr(open, 'newlines'):
+ # Interpreter with universal newline support
+ self.assertEqual(stdout,
+ "line1\nline2\nline3\nline4\nline5\nline6")
+ else:
+ # Interpreter without universal newline support
+ self.assertEqual(stdout,
+ "line1\nline2\rline3\r\nline4\r\nline5\nline6")
+
+ def test_universal_newlines_communicate(self):
+ # universal newlines through communicate()
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys,os;' + SETBINARY +
+ 'sys.stdout.write("line1\\n");'
+ 'sys.stdout.flush();'
+ 'sys.stdout.write("line2\\r");'
+ 'sys.stdout.flush();'
+ 'sys.stdout.write("line3\\r\\n");'
+ 'sys.stdout.flush();'
+ 'sys.stdout.write("line4\\r");'
+ 'sys.stdout.flush();'
+ 'sys.stdout.write("\\nline5");'
+ 'sys.stdout.flush();'
+ 'sys.stdout.write("\\nline6");'],
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ universal_newlines=1)
+ (stdout, stderr) = p.communicate()
+ if hasattr(open, 'newlines'):
+ # Interpreter with universal newline support
+ self.assertEqual(stdout,
+ "line1\nline2\nline3\nline4\nline5\nline6")
+ else:
+ # Interpreter without universal newline support
+ self.assertEqual(stdout, "line1\nline2\rline3\r\nline4\r\nline5\nline6")
+
+ def test_no_leaking(self):
+ # Make sure we leak no resources
+ if test_support.is_resource_enabled("subprocess") and not mswindows:
+ max_handles = 1026 # too much for most UNIX systems
+ else:
+ max_handles = 65
+ for i in range(max_handles):
+ p = subprocess.Popen([sys.executable, "-c",
+ "import sys;sys.stdout.write(sys.stdin.read())"],
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ data = p.communicate("lime")[0]
+ self.assertEqual(data, "lime")
+
+
+ def test_list2cmdline(self):
+ self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']),
+ '"a b c" d e')
+ self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']),
+ 'ab\\"c \\ d')
+ self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']),
+ 'a\\\\\\b "de fg" h')
+ self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']),
+ 'a\\\\\\"b c d')
+ self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']),
+ '"a\\\\b c" d e')
+ self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']),
+ '"a\\\\b\\ c" d e')
+
+
+ def test_poll(self):
+ p = subprocess.Popen([sys.executable,
+ "-c", "import time; time.sleep(1)"])
+ count = 0
+ while p.poll() is None:
+ time.sleep(0.1)
+ count += 1
+ # We expect that the poll loop probably went around about 10 times,
+ # but, based on system scheduling we can't control, it's possible
+ # poll() never returned None. It "should be" very rare that it
+ # didn't go around at least twice.
+ self.assert_(count >= 2)
+ # Subsequent invocations should just return the returncode
+ self.assertEqual(p.poll(), 0)
+
+
+ def test_wait(self):
+ p = subprocess.Popen([sys.executable,
+ "-c", "import time; time.sleep(2)"])
+ self.assertEqual(p.wait(), 0)
+ # Subsequent invocations should just return the returncode
+ self.assertEqual(p.wait(), 0)
+
+
+ def test_invalid_bufsize(self):
+ # an invalid type of the bufsize argument should raise
+ # TypeError.
+ try:
+ subprocess.Popen([sys.executable, "-c", "pass"], "orange")
+ except TypeError:
+ pass
+ else:
+ self.fail("Expected TypeError")
+
+ #
+ # POSIX tests
+ #
+ if not mswindows:
+ def test_exceptions(self):
+ # catched & re-raised exceptions
+ try:
+ p = subprocess.Popen([sys.executable, "-c", ""],
+ cwd="/this/path/does/not/exist")
+ except OSError, e:
+ # The attribute child_traceback should contain "os.chdir"
+ # somewhere.
+ self.assertNotEqual(e.child_traceback.find("os.chdir"), -1)
+ else:
+ self.fail("Expected OSError")
+
+ def test_run_abort(self):
+ # returncode handles signal termination
+ p = subprocess.Popen([sys.executable,
+ "-c", "import os; os.abort()"])
+ p.wait()
+ self.assertEqual(-p.returncode, signal.SIGABRT)
+
+ def test_preexec(self):
+ # preexec function
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys,os;' \
+ 'sys.stdout.write(os.getenv("FRUIT"))'],
+ stdout=subprocess.PIPE,
+ preexec_fn=lambda: os.putenv("FRUIT", "apple"))
+ self.assertEqual(p.stdout.read(), "apple")
+
+ def test_args_string(self):
+ # args is a string
+ f, fname = self.mkstemp()
+ os.write(f, "#!/bin/sh\n")
+ os.write(f, "exec %s -c 'import sys; sys.exit(47)'\n" %
+ sys.executable)
+ os.close(f)
+ os.chmod(fname, 0700)
+ p = subprocess.Popen(fname)
+ p.wait()
+ os.remove(fname)
+ self.assertEqual(p.returncode, 47)
+
+ def test_invalid_args(self):
+ # invalid arguments should raise ValueError
+ self.assertRaises(ValueError, subprocess.call,
+ [sys.executable,
+ "-c", "import sys; sys.exit(47)"],
+ startupinfo=47)
+ self.assertRaises(ValueError, subprocess.call,
+ [sys.executable,
+ "-c", "import sys; sys.exit(47)"],
+ creationflags=47)
+
+ def test_shell_sequence(self):
+ # Run command through the shell (sequence)
+ newenv = os.environ.copy()
+ newenv["FRUIT"] = "apple"
+ p = subprocess.Popen(["echo $FRUIT"], shell=1,
+ stdout=subprocess.PIPE,
+ env=newenv)
+ self.assertEqual(p.stdout.read().strip(), "apple")
+
+ def test_shell_string(self):
+ # Run command through the shell (string)
+ newenv = os.environ.copy()
+ newenv["FRUIT"] = "apple"
+ p = subprocess.Popen("echo $FRUIT", shell=1,
+ stdout=subprocess.PIPE,
+ env=newenv)
+ self.assertEqual(p.stdout.read().strip(), "apple")
+
+ def test_call_string(self):
+ # call() function with string argument on UNIX
+ f, fname = self.mkstemp()
+ os.write(f, "#!/bin/sh\n")
+ os.write(f, "exec %s -c 'import sys; sys.exit(47)'\n" %
+ sys.executable)
+ os.close(f)
+ os.chmod(fname, 0700)
+ rc = subprocess.call(fname)
+ os.remove(fname)
+ self.assertEqual(rc, 47)
+
+
+ #
+ # Windows tests
+ #
+ if mswindows:
+ def test_startupinfo(self):
+ # startupinfo argument
+ # We uses hardcoded constants, because we do not want to
+ # depend on win32all.
+ STARTF_USESHOWWINDOW = 1
+ SW_MAXIMIZE = 3
+ startupinfo = subprocess.STARTUPINFO()
+ startupinfo.dwFlags = STARTF_USESHOWWINDOW
+ startupinfo.wShowWindow = SW_MAXIMIZE
+ # Since Python is a console process, it won't be affected
+ # by wShowWindow, but the argument should be silently
+ # ignored
+ subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"],
+ startupinfo=startupinfo)
+
+ def test_creationflags(self):
+ # creationflags argument
+ CREATE_NEW_CONSOLE = 16
+ sys.stderr.write(" a DOS box should flash briefly ...\n")
+ subprocess.call(sys.executable +
+ ' -c "import time; time.sleep(0.25)"',
+ creationflags=CREATE_NEW_CONSOLE)
+
+ def test_invalid_args(self):
+ # invalid arguments should raise ValueError
+ self.assertRaises(ValueError, subprocess.call,
+ [sys.executable,
+ "-c", "import sys; sys.exit(47)"],
+ preexec_fn=lambda: 1)
+ self.assertRaises(ValueError, subprocess.call,
+ [sys.executable,
+ "-c", "import sys; sys.exit(47)"],
+ close_fds=True)
+
+ def test_shell_sequence(self):
+ # Run command through the shell (sequence)
+ newenv = os.environ.copy()
+ newenv["FRUIT"] = "physalis"
+ p = subprocess.Popen(["set"], shell=1,
+ stdout=subprocess.PIPE,
+ env=newenv)
+ self.assertNotEqual(p.stdout.read().find("physalis"), -1)
+
+ def test_shell_string(self):
+ # Run command through the shell (string)
+ newenv = os.environ.copy()
+ newenv["FRUIT"] = "physalis"
+ p = subprocess.Popen("set", shell=1,
+ stdout=subprocess.PIPE,
+ env=newenv)
+ self.assertNotEqual(p.stdout.read().find("physalis"), -1)
+
+ def test_call_string(self):
+ # call() function with string argument on Windows
+ rc = subprocess.call(sys.executable +
+ ' -c "import sys; sys.exit(47)"')
+ self.assertEqual(rc, 47)
+
+
+def test_main():
+ test_support.run_unittest(ProcessTestCase)
+
+if __name__ == "__main__":
+ test_main()
Modified: py/dist/py/compat/testing/test_textwrap.py
==============================================================================
--- py/dist/py/compat/testing/test_textwrap.py (original)
+++ py/dist/py/compat/testing/test_textwrap.py Tue Jan 9 17:55:49 2007
@@ -5,7 +5,7 @@
# Converted to PyUnit by Peter Hansen <peter at engcorp.com>.
# Currently maintained by Greg Ward.
#
-# $Id: test_textwrap.py,v 1.27.4.1 2005/03/05 02:38:33 gward Exp $
+# $Id: test_textwrap.py 38573 2005-03-05 02:38:33Z gward $
#
import unittest
@@ -18,7 +18,6 @@
fill = textwrap.fill
dedent = textwrap.dedent
-
class BaseTestCase(unittest.TestCase):
'''Parent class with utility methods for textwrap tests.'''
Modified: py/dist/py/compat/textwrap.py
==============================================================================
--- py/dist/py/compat/textwrap.py (original)
+++ py/dist/py/compat/textwrap.py Tue Jan 9 17:55:49 2007
@@ -161,7 +161,7 @@
else:
i += 1
- def _handle_long_word(self, chunks, cur_line, cur_len, width):
+ def _handle_long_word(self, reversed_chunks, cur_line, cur_len, width):
"""_handle_long_word(chunks : [string],
cur_line : [string],
cur_len : int, width : int)
@@ -174,14 +174,14 @@
# If we're allowed to break long words, then do so: put as much
# of the next chunk onto the current line as will fit.
if self.break_long_words:
- cur_line.append(chunks[0][0:space_left])
- chunks[0] = chunks[0][space_left:]
+ cur_line.append(reversed_chunks[-1][:space_left])
+ reversed_chunks[-1] = reversed_chunks[-1][space_left:]
# Otherwise, we have to preserve the long word intact. Only add
# it to the current line if there's nothing already there --
# that minimizes how much we violate the width constraint.
elif not cur_line:
- cur_line.append(chunks.pop(0))
+ cur_line.append(reversed_chunks.pop())
# If we're not allowed to break long words, and there's already
# text on the current line, do nothing. Next time through the
@@ -206,6 +206,10 @@
if self.width <= 0:
raise ValueError("invalid width %r (must be > 0)" % self.width)
+ # Arrange in reverse order so items can be efficiently popped
+ # from a stack of chucks.
+ chunks.reverse()
+
while chunks:
# Start the list of chunks that will make up the current line.
@@ -224,15 +228,15 @@
# First chunk on line is whitespace -- drop it, unless this
# is the very beginning of the text (ie. no lines started yet).
- if chunks[0].strip() == '' and lines:
- del chunks[0]
+ if chunks[-1].strip() == '' and lines:
+ del chunks[-1]
while chunks:
- l = len(chunks[0])
+ l = len(chunks[-1])
# Can at least squeeze this chunk onto the current line.
if cur_len + l <= width:
- cur_line.append(chunks.pop(0))
+ cur_line.append(chunks.pop())
cur_len += l
# Nope, this line is full.
@@ -241,7 +245,7 @@
# The current line is full, and the next chunk is too big to
# fit on *any* line (not just this one).
- if chunks and len(chunks[0]) > width:
+ if chunks and len(chunks[-1]) > width:
self._handle_long_word(chunks, cur_line, cur_len, width)
# If the last chunk on this line is all whitespace, drop it.
@@ -268,7 +272,6 @@
converted to space.
"""
text = self._munge_whitespace(text)
- indent = self.initial_indent
chunks = self._split(text)
if self.fix_sentence_endings:
self._fix_sentence_endings(chunks)
More information about the pytest-commit
mailing list