[Python-checkins] cpython: Issue #25220: Split the huge main() function of libregrtest.main into a class

victor.stinner python-checkins at python.org
Tue Sep 29 22:57:27 CEST 2015


https://hg.python.org/cpython/rev/817e25bd34d0
changeset:   98415:817e25bd34d0
user:        Victor Stinner <victor.stinner at gmail.com>
date:        Tue Sep 29 22:48:52 2015 +0200
summary:
  Issue #25220: Split the huge main() function of libregrtest.main into a class
with attributes and methods.

The --threshold command line option is now ignored if the gc module is missing.

* Convert main() variables to Regrtest attributes, document some attributes
* Convert accumulate_result() function to a method
* Create setup_python() function and setup_regrtest() method.
* Import gc at top level
* Move resource.setrlimit() and the code to make the module paths absolute into
  the new setup_python() function. So this code is no more executed when the
  module is imported, only when main() is executed. We have a better control on
  when the setup is done.
* Move textwrap import from printlist() to the top level.
* Some other minor cleanup.

files:
  Lib/test/libregrtest/main.py |  689 ++++++++++++----------
  1 files changed, 381 insertions(+), 308 deletions(-)


diff --git a/Lib/test/libregrtest/main.py b/Lib/test/libregrtest/main.py
--- a/Lib/test/libregrtest/main.py
+++ b/Lib/test/libregrtest/main.py
@@ -1,13 +1,14 @@
 import faulthandler
 import json
 import os
+import platform
+import random
 import re
+import signal
 import sys
+import sysconfig
 import tempfile
-import sysconfig
-import signal
-import random
-import platform
+import textwrap
 import traceback
 import unittest
 from test.libregrtest.runtest import (
@@ -19,45 +20,15 @@
 from test.libregrtest.cmdline import _parse_args
 from test import support
 try:
+    import gc
+except ImportError:
+    gc = None
+try:
     import threading
 except ImportError:
     threading = None
 
 
-# Some times __path__ and __file__ are not absolute (e.g. while running from
-# Lib/) and, if we change the CWD to run the tests in a temporary dir, some
-# imports might fail.  This affects only the modules imported before os.chdir().
-# These modules are searched first in sys.path[0] (so '' -- the CWD) and if
-# they are found in the CWD their __file__ and __path__ will be relative (this
-# happens before the chdir).  All the modules imported after the chdir, are
-# not found in the CWD, and since the other paths in sys.path[1:] are absolute
-# (site.py absolutize them), the __file__ and __path__ will be absolute too.
-# Therefore it is necessary to absolutize manually the __file__ and __path__ of
-# the packages to prevent later imports to fail when the CWD is different.
-for module in sys.modules.values():
-    if hasattr(module, '__path__'):
-        module.__path__ = [os.path.abspath(path) for path in module.__path__]
-    if hasattr(module, '__file__'):
-        module.__file__ = os.path.abspath(module.__file__)
-
-
-# MacOSX (a.k.a. Darwin) has a default stack size that is too small
-# for deeply recursive regular expressions.  We see this as crashes in
-# the Python test suite when running test_re.py and test_sre.py.  The
-# fix is to set the stack limit to 2048.
-# This approach may also be useful for other Unixy platforms that
-# suffer from small default stack limits.
-if sys.platform == 'darwin':
-    try:
-        import resource
-    except ImportError:
-        pass
-    else:
-        soft, hard = resource.getrlimit(resource.RLIMIT_STACK)
-        newsoft = min(hard, max(soft, 1024*2048))
-        resource.setrlimit(resource.RLIMIT_STACK, (newsoft, hard))
-
-
 # When tests are run from the Python build directory, it is best practice
 # to keep the test files in a subfolder.  This eases the cleanup of leftover
 # files using the "make distclean" command.
@@ -68,7 +39,73 @@
 TEMPDIR = os.path.abspath(TEMPDIR)
 
 
-def main(tests=None, **kwargs):
+def slave_runner(slaveargs):
+    args, kwargs = json.loads(slaveargs)
+    if kwargs.get('huntrleaks'):
+        unittest.BaseTestSuite._cleanup = False
+    try:
+        result = runtest(*args, **kwargs)
+    except KeyboardInterrupt:
+        result = INTERRUPTED, ''
+    except BaseException as e:
+        traceback.print_exc()
+        result = CHILD_ERROR, str(e)
+    sys.stdout.flush()
+    print()   # Force a newline (just in case)
+    print(json.dumps(result))
+    sys.exit(0)
+
+
+def setup_python():
+    # Display the Python traceback on fatal errors (e.g. segfault)
+    faulthandler.enable(all_threads=True)
+
+    # Display the Python traceback on SIGALRM or SIGUSR1 signal
+    signals = []
+    if hasattr(signal, 'SIGALRM'):
+        signals.append(signal.SIGALRM)
+    if hasattr(signal, 'SIGUSR1'):
+        signals.append(signal.SIGUSR1)
+    for signum in signals:
+        faulthandler.register(signum, chain=True)
+
+    replace_stdout()
+    support.record_original_stdout(sys.stdout)
+
+    # Some times __path__ and __file__ are not absolute (e.g. while running from
+    # Lib/) and, if we change the CWD to run the tests in a temporary dir, some
+    # imports might fail.  This affects only the modules imported before os.chdir().
+    # These modules are searched first in sys.path[0] (so '' -- the CWD) and if
+    # they are found in the CWD their __file__ and __path__ will be relative (this
+    # happens before the chdir).  All the modules imported after the chdir, are
+    # not found in the CWD, and since the other paths in sys.path[1:] are absolute
+    # (site.py absolutize them), the __file__ and __path__ will be absolute too.
+    # Therefore it is necessary to absolutize manually the __file__ and __path__ of
+    # the packages to prevent later imports to fail when the CWD is different.
+    for module in sys.modules.values():
+        if hasattr(module, '__path__'):
+            module.__path__ = [os.path.abspath(path) for path in module.__path__]
+        if hasattr(module, '__file__'):
+            module.__file__ = os.path.abspath(module.__file__)
+
+    # MacOSX (a.k.a. Darwin) has a default stack size that is too small
+    # for deeply recursive regular expressions.  We see this as crashes in
+    # the Python test suite when running test_re.py and test_sre.py.  The
+    # fix is to set the stack limit to 2048.
+    # This approach may also be useful for other Unixy platforms that
+    # suffer from small default stack limits.
+    if sys.platform == 'darwin':
+        try:
+            import resource
+        except ImportError:
+            pass
+        else:
+            soft, hard = resource.getrlimit(resource.RLIMIT_STACK)
+            newsoft = min(hard, max(soft, 1024*2048))
+            resource.setrlimit(resource.RLIMIT_STACK, (newsoft, hard))
+
+
+class Regrtest:
     """Execute a test suite.
 
     This also parses command-line options and modifies its behavior
@@ -91,210 +128,257 @@
     directly to set the values that would normally be set by flags
     on the command line.
     """
-    # Display the Python traceback on fatal errors (e.g. segfault)
-    faulthandler.enable(all_threads=True)
+    def __init__(self):
+        # Namespace of command line options
+        self.ns = None
 
-    # Display the Python traceback on SIGALRM or SIGUSR1 signal
-    signals = []
-    if hasattr(signal, 'SIGALRM'):
-        signals.append(signal.SIGALRM)
-    if hasattr(signal, 'SIGUSR1'):
-        signals.append(signal.SIGUSR1)
-    for signum in signals:
-        faulthandler.register(signum, chain=True)
+        # tests
+        self.tests = []
+        self.selected = []
 
-    replace_stdout()
+        # test results
+        self.good = []
+        self.bad = []
+        self.skipped = []
+        self.resource_denieds = []
+        self.environment_changed = []
+        self.interrupted = False
 
-    support.record_original_stdout(sys.stdout)
+        # used by --slow
+        self.test_times = []
 
-    ns = _parse_args(sys.argv[1:], **kwargs)
+        # used by --coverage, trace.Trace instance
+        self.tracer = None
 
-    if ns.huntrleaks:
-        # Avoid false positives due to various caches
-        # filling slowly with random data:
-        warm_caches()
-    if ns.memlimit is not None:
-        support.set_memlimit(ns.memlimit)
-    if ns.threshold is not None:
-        import gc
-        gc.set_threshold(ns.threshold)
-    if ns.nowindows:
-        import msvcrt
-        msvcrt.SetErrorMode(msvcrt.SEM_FAILCRITICALERRORS|
-                            msvcrt.SEM_NOALIGNMENTFAULTEXCEPT|
-                            msvcrt.SEM_NOGPFAULTERRORBOX|
-                            msvcrt.SEM_NOOPENFILEERRORBOX)
-        try:
-            msvcrt.CrtSetReportMode
-        except AttributeError:
-            # release build
-            pass
+        # used by --findleaks, store for gc.garbage
+        self.found_garbage = []
+
+        # used to display the progress bar "[ 3/100]"
+        self.test_count = ''
+        self.test_count_width = 1
+
+        # used by --single
+        self.next_single_test = None
+        self.next_single_filename = None
+
+    def accumulate_result(self, test, result):
+        ok, test_time = result
+        self.test_times.append((test_time, test))
+        if ok == PASSED:
+            self.good.append(test)
+        elif ok == FAILED:
+            self.bad.append(test)
+        elif ok == ENV_CHANGED:
+            self.environment_changed.append(test)
+        elif ok == SKIPPED:
+            self.skipped.append(test)
+        elif ok == RESOURCE_DENIED:
+            self.skipped.append(test)
+            self.resource_denieds.append(test)
+
+    def display_progress(self, test_index, test):
+        if self.ns.quiet:
+            return
+        fmt = "[{1:{0}}{2}/{3}] {4}" if self.bad else "[{1:{0}}{2}] {4}"
+        print(fmt.format(
+            self.test_count_width, test_index, self.test_count, len(self.bad), test))
+        sys.stdout.flush()
+
+    def setup_regrtest(self):
+        if self.ns.huntrleaks:
+            # Avoid false positives due to various caches
+            # filling slowly with random data:
+            warm_caches()
+
+        if self.ns.memlimit is not None:
+            support.set_memlimit(self.ns.memlimit)
+
+        if self.ns.threshold is not None:
+            if gc is not None:
+                gc.set_threshold(self.ns.threshold)
+            else:
+                print('No GC available, ignore --threshold.')
+
+        if self.ns.nowindows:
+            import msvcrt
+            msvcrt.SetErrorMode(msvcrt.SEM_FAILCRITICALERRORS|
+                                msvcrt.SEM_NOALIGNMENTFAULTEXCEPT|
+                                msvcrt.SEM_NOGPFAULTERRORBOX|
+                                msvcrt.SEM_NOOPENFILEERRORBOX)
+            try:
+                msvcrt.CrtSetReportMode
+            except AttributeError:
+                # release build
+                pass
+            else:
+                for m in [msvcrt.CRT_WARN, msvcrt.CRT_ERROR, msvcrt.CRT_ASSERT]:
+                    msvcrt.CrtSetReportMode(m, msvcrt.CRTDBG_MODE_FILE)
+                    msvcrt.CrtSetReportFile(m, msvcrt.CRTDBG_FILE_STDERR)
+
+        if self.ns.findleaks:
+            if gc is not None:
+                # Uncomment the line below to report garbage that is not
+                # freeable by reference counting alone.  By default only
+                # garbage that is not collectable by the GC is reported.
+                pass
+                #gc.set_debug(gc.DEBUG_SAVEALL)
+            else:
+                print('No GC available, disabling --findleaks')
+                self.ns.findleaks = False
+
+        if self.ns.huntrleaks:
+            unittest.BaseTestSuite._cleanup = False
+
+        # Strip .py extensions.
+        removepy(self.ns.args)
+
+        if self.ns.trace:
+            import trace
+            self.tracer = trace.Trace(ignoredirs=[sys.base_prefix,
+                                                  sys.base_exec_prefix,
+                                                  tempfile.gettempdir()],
+                                      trace=False, count=True)
+
+    def find_tests(self, tests):
+        self.tests = tests
+
+        if self.ns.single:
+            self.next_single_filename = os.path.join(TEMPDIR, 'pynexttest')
+            try:
+                with open(self.next_single_filename, 'r') as fp:
+                    next_test = fp.read().strip()
+                    self.tests = [next_test]
+            except OSError:
+                pass
+
+        if self.ns.fromfile:
+            self.tests = []
+            with open(os.path.join(support.SAVEDCWD, self.ns.fromfile)) as fp:
+                count_pat = re.compile(r'\[\s*\d+/\s*\d+\]')
+                for line in fp:
+                    line = count_pat.sub('', line)
+                    guts = line.split() # assuming no test has whitespace in its name
+                    if guts and not guts[0].startswith('#'):
+                        self.tests.extend(guts)
+
+        removepy(self.tests)
+
+        stdtests = STDTESTS[:]
+        nottests = NOTTESTS.copy()
+        if self.ns.exclude:
+            for arg in self.ns.args:
+                if arg in stdtests:
+                    stdtests.remove(arg)
+                nottests.add(arg)
+            self.ns.args = []
+
+        # For a partial run, we do not need to clutter the output.
+        if self.ns.verbose or self.ns.header or not (self.ns.quiet or self.ns.single or self.tests or self.ns.args):
+            # Print basic platform information
+            print("==", platform.python_implementation(), *sys.version.split())
+            print("==  ", platform.platform(aliased=True),
+                          "%s-endian" % sys.byteorder)
+            print("==  ", "hash algorithm:", sys.hash_info.algorithm,
+                  "64bit" if sys.maxsize > 2**32 else "32bit")
+            print("==  ", os.getcwd())
+            print("Testing with flags:", sys.flags)
+
+        # if testdir is set, then we are not running the python tests suite, so
+        # don't add default tests to be executed or skipped (pass empty values)
+        if self.ns.testdir:
+            alltests = findtests(self.ns.testdir, list(), set())
         else:
-            for m in [msvcrt.CRT_WARN, msvcrt.CRT_ERROR, msvcrt.CRT_ASSERT]:
-                msvcrt.CrtSetReportMode(m, msvcrt.CRTDBG_MODE_FILE)
-                msvcrt.CrtSetReportFile(m, msvcrt.CRTDBG_FILE_STDERR)
-    if ns.wait:
-        input("Press any key to continue...")
+            alltests = findtests(self.ns.testdir, stdtests, nottests)
 
-    if ns.slaveargs is not None:
-        args, kwargs = json.loads(ns.slaveargs)
-        if kwargs.get('huntrleaks'):
-            unittest.BaseTestSuite._cleanup = False
-        try:
-            result = runtest(*args, **kwargs)
-        except KeyboardInterrupt:
-            result = INTERRUPTED, ''
-        except BaseException as e:
-            traceback.print_exc()
-            result = CHILD_ERROR, str(e)
-        sys.stdout.flush()
-        print()   # Force a newline (just in case)
-        print(json.dumps(result))
-        sys.exit(0)
+        self.selected = self.tests or self.ns.args or alltests
+        if self.ns.single:
+            self.selected = self.selected[:1]
+            try:
+                pos = alltests.index(self.selected[0])
+                self.next_single_test = alltests[pos + 1]
+            except IndexError:
+                pass
 
-    good = []
-    bad = []
-    skipped = []
-    resource_denieds = []
-    environment_changed = []
-    interrupted = False
+        # Remove all the self.selected tests that precede start if it's set.
+        if self.ns.start:
+            try:
+                del self.selected[:self.selected.index(self.ns.start)]
+            except ValueError:
+                print("Couldn't find starting test (%s), using all tests" % self.ns.start)
 
-    if ns.findleaks:
-        try:
-            import gc
-        except ImportError:
-            print('No GC available, disabling findleaks.')
-            ns.findleaks = False
-        else:
-            # Uncomment the line below to report garbage that is not
-            # freeable by reference counting alone.  By default only
-            # garbage that is not collectable by the GC is reported.
-            #gc.set_debug(gc.DEBUG_SAVEALL)
-            found_garbage = []
+        if self.ns.randomize:
+            if self.ns.random_seed is None:
+                self.ns.random_seed = random.randrange(10000000)
+            random.seed(self.ns.random_seed)
+            print("Using random seed", self.ns.random_seed)
+            random.shuffle(self.selected)
 
-    if ns.huntrleaks:
-        unittest.BaseTestSuite._cleanup = False
+    def display_result(self):
+        if self.interrupted:
+            # print a newline after ^C
+            print()
+            print("Test suite interrupted by signal SIGINT.")
+            omitted = set(self.selected) - set(self.good) - set(self.bad) - set(self.skipped)
+            print(count(len(omitted), "test"), "omitted:")
+            printlist(omitted)
 
-    if ns.single:
-        filename = os.path.join(TEMPDIR, 'pynexttest')
-        try:
-            with open(filename, 'r') as fp:
-                next_test = fp.read().strip()
-                tests = [next_test]
-        except OSError:
-            pass
+        if self.good and not self.ns.quiet:
+            if not self.bad and not self.skipped and not self.interrupted and len(self.good) > 1:
+                print("All", end=' ')
+            print(count(len(self.good), "test"), "OK.")
 
-    if ns.fromfile:
-        tests = []
-        with open(os.path.join(support.SAVEDCWD, ns.fromfile)) as fp:
-            count_pat = re.compile(r'\[\s*\d+/\s*\d+\]')
-            for line in fp:
-                line = count_pat.sub('', line)
-                guts = line.split() # assuming no test has whitespace in its name
-                if guts and not guts[0].startswith('#'):
-                    tests.extend(guts)
+        if self.ns.print_slow:
+            self.test_times.sort(reverse=True)
+            print("10 slowest tests:")
+            for time, test in self.test_times[:10]:
+                print("%s: %.1fs" % (test, time))
 
-    # Strip .py extensions.
-    removepy(ns.args)
-    removepy(tests)
+        if self.bad:
+            print(count(len(self.bad), "test"), "failed:")
+            printlist(self.bad)
 
-    stdtests = STDTESTS[:]
-    nottests = NOTTESTS.copy()
-    if ns.exclude:
-        for arg in ns.args:
-            if arg in stdtests:
-                stdtests.remove(arg)
-            nottests.add(arg)
-        ns.args = []
+        if self.environment_changed:
+            print("{} altered the execution environment:".format(
+                     count(len(self.environment_changed), "test")))
+            printlist(self.environment_changed)
 
-    # For a partial run, we do not need to clutter the output.
-    if ns.verbose or ns.header or not (ns.quiet or ns.single or tests or ns.args):
-        # Print basic platform information
-        print("==", platform.python_implementation(), *sys.version.split())
-        print("==  ", platform.platform(aliased=True),
-                      "%s-endian" % sys.byteorder)
-        print("==  ", "hash algorithm:", sys.hash_info.algorithm,
-              "64bit" if sys.maxsize > 2**32 else "32bit")
-        print("==  ", os.getcwd())
-        print("Testing with flags:", sys.flags)
+        if self.skipped and not self.ns.quiet:
+            print(count(len(self.skipped), "test"), "skipped:")
+            printlist(self.skipped)
 
-    # if testdir is set, then we are not running the python tests suite, so
-    # don't add default tests to be executed or skipped (pass empty values)
-    if ns.testdir:
-        alltests = findtests(ns.testdir, list(), set())
-    else:
-        alltests = findtests(ns.testdir, stdtests, nottests)
+        if self.ns.verbose2 and self.bad:
+            print("Re-running failed tests in verbose mode")
+            for test in self.bad[:]:
+                print("Re-running test %r in verbose mode" % test)
+                sys.stdout.flush()
+                try:
+                    self.ns.verbose = True
+                    ok = runtest(test, True, self.ns.quiet, self.ns.huntrleaks,
+                                 timeout=self.ns.timeout)
+                except KeyboardInterrupt:
+                    # print a newline separate from the ^C
+                    print()
+                    break
+                else:
+                    if ok[0] in {PASSED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED}:
+                        self.bad.remove(test)
+            else:
+                if self.bad:
+                    print(count(len(self.bad), 'test'), "failed again:")
+                    printlist(self.bad)
 
-    selected = tests or ns.args or alltests
-    if ns.single:
-        selected = selected[:1]
-        try:
-            next_single_test = alltests[alltests.index(selected[0])+1]
-        except IndexError:
-            next_single_test = None
-    # Remove all the selected tests that precede start if it's set.
-    if ns.start:
-        try:
-            del selected[:selected.index(ns.start)]
-        except ValueError:
-            print("Couldn't find starting test (%s), using all tests" % ns.start)
-    if ns.randomize:
-        if ns.random_seed is None:
-            ns.random_seed = random.randrange(10000000)
-        random.seed(ns.random_seed)
-        print("Using random seed", ns.random_seed)
-        random.shuffle(selected)
-    if ns.trace:
-        import trace, tempfile
-        tracer = trace.Trace(ignoredirs=[sys.base_prefix, sys.base_exec_prefix,
-                                         tempfile.gettempdir()],
-                             trace=False, count=True)
-
-    test_times = []
-    support.verbose = ns.verbose      # Tell tests to be moderately quiet
-    support.use_resources = ns.use_resources
-    save_modules = sys.modules.keys()
-
-    def accumulate_result(test, result):
-        ok, test_time = result
-        test_times.append((test_time, test))
-        if ok == PASSED:
-            good.append(test)
-        elif ok == FAILED:
-            bad.append(test)
-        elif ok == ENV_CHANGED:
-            environment_changed.append(test)
-        elif ok == SKIPPED:
-            skipped.append(test)
-        elif ok == RESOURCE_DENIED:
-            skipped.append(test)
-            resource_denieds.append(test)
-
-    if ns.forever:
-        def test_forever(tests=list(selected)):
-            while True:
-                for test in tests:
-                    yield test
-                    if bad:
-                        return
-        tests = test_forever()
-        test_count = ''
-        test_count_width = 3
-    else:
-        tests = iter(selected)
-        test_count = '/{}'.format(len(selected))
-        test_count_width = len(test_count) - 1
-
-    if ns.use_mp:
+    def _run_tests_mp(self):
         try:
             from threading import Thread
         except ImportError:
             print("Multiprocess option requires thread support")
             sys.exit(2)
         from queue import Queue
+
         debug_output_pat = re.compile(r"\[\d+ refs, \d+ blocks\]$")
         output = Queue()
-        pending = MultiprocessTests(tests)
+        pending = MultiprocessTests(self.tests)
+
         def work():
             # A worker thread.
             try:
@@ -304,7 +388,7 @@
                     except StopIteration:
                         output.put((None, None, None, None))
                         return
-                    retcode, stdout, stderr = run_test_in_subprocess(test, ns)
+                    retcode, stdout, stderr = run_test_in_subprocess(test, self.ns)
                     # Strip last refcount output line if it exists, since it
                     # comes from the shutdown of the interpreter in the subcommand.
                     stderr = debug_output_pat.sub("", stderr)
@@ -321,23 +405,20 @@
             except BaseException:
                 output.put((None, None, None, None))
                 raise
-        workers = [Thread(target=work) for i in range(ns.use_mp)]
+
+        workers = [Thread(target=work) for i in range(self.ns.use_mp)]
         for worker in workers:
             worker.start()
         finished = 0
         test_index = 1
         try:
-            while finished < ns.use_mp:
+            while finished < self.ns.use_mp:
                 test, stdout, stderr, result = output.get()
                 if test is None:
                     finished += 1
                     continue
-                accumulate_result(test, result)
-                if not ns.quiet:
-                    fmt = "[{1:{0}}{2}/{3}] {4}" if bad else "[{1:{0}}{2}] {4}"
-                    print(fmt.format(
-                        test_count_width, test_index, test_count,
-                        len(bad), test))
+                self.accumulate_result(test, result)
+                self.display_progress(test_index, test)
                 if stdout:
                     print(stdout)
                 if stderr:
@@ -350,110 +431,99 @@
                     raise Exception("Child error on {}: {}".format(test, result[1]))
                 test_index += 1
         except KeyboardInterrupt:
-            interrupted = True
+            self.interrupted = True
             pending.interrupted = True
         for worker in workers:
             worker.join()
-    else:
-        for test_index, test in enumerate(tests, 1):
-            if not ns.quiet:
-                fmt = "[{1:{0}}{2}/{3}] {4}" if bad else "[{1:{0}}{2}] {4}"
-                print(fmt.format(
-                    test_count_width, test_index, test_count, len(bad), test))
-                sys.stdout.flush()
-            if ns.trace:
+
+    def _run_tests_sequential(self):
+        save_modules = sys.modules.keys()
+
+        for test_index, test in enumerate(self.tests, 1):
+            self.display_progress(test_index, test)
+            if self.ns.trace:
                 # If we're tracing code coverage, then we don't exit with status
                 # if on a false return value from main.
-                tracer.runctx('runtest(test, ns.verbose, ns.quiet, timeout=ns.timeout)',
-                              globals=globals(), locals=vars())
+                cmd = 'runtest(test, self.ns.verbose, self.ns.quiet, timeout=self.ns.timeout)'
+                self.tracer.runctx(cmd, globals=globals(), locals=vars())
             else:
                 try:
-                    result = runtest(test, ns.verbose, ns.quiet,
-                                     ns.huntrleaks,
-                                     output_on_failure=ns.verbose3,
-                                     timeout=ns.timeout, failfast=ns.failfast,
-                                     match_tests=ns.match_tests)
-                    accumulate_result(test, result)
+                    result = runtest(test, self.ns.verbose, self.ns.quiet,
+                                     self.ns.huntrleaks,
+                                     output_on_failure=self.ns.verbose3,
+                                     timeout=self.ns.timeout, failfast=self.ns.failfast,
+                                     match_tests=self.ns.match_tests)
+                    self.accumulate_result(test, result)
                 except KeyboardInterrupt:
-                    interrupted = True
+                    self.interrupted = True
                     break
-            if ns.findleaks:
+            if self.ns.findleaks:
                 gc.collect()
                 if gc.garbage:
                     print("Warning: test created", len(gc.garbage), end=' ')
                     print("uncollectable object(s).")
                     # move the uncollectable objects somewhere so we don't see
                     # them again
-                    found_garbage.extend(gc.garbage)
+                    self.found_garbage.extend(gc.garbage)
                     del gc.garbage[:]
             # Unload the newly imported modules (best effort finalization)
             for module in sys.modules.keys():
                 if module not in save_modules and module.startswith("test."):
                     support.unload(module)
 
-    if interrupted:
-        # print a newline after ^C
-        print()
-        print("Test suite interrupted by signal SIGINT.")
-        omitted = set(selected) - set(good) - set(bad) - set(skipped)
-        print(count(len(omitted), "test"), "omitted:")
-        printlist(omitted)
-    if good and not ns.quiet:
-        if not bad and not skipped and not interrupted and len(good) > 1:
-            print("All", end=' ')
-        print(count(len(good), "test"), "OK.")
-    if ns.print_slow:
-        test_times.sort(reverse=True)
-        print("10 slowest tests:")
-        for time, test in test_times[:10]:
-            print("%s: %.1fs" % (test, time))
-    if bad:
-        print(count(len(bad), "test"), "failed:")
-        printlist(bad)
-    if environment_changed:
-        print("{} altered the execution environment:".format(
-                 count(len(environment_changed), "test")))
-        printlist(environment_changed)
-    if skipped and not ns.quiet:
-        print(count(len(skipped), "test"), "skipped:")
-        printlist(skipped)
+    def run_tests(self):
+        support.verbose = self.ns.verbose      # Tell tests to be moderately quiet
+        support.use_resources = self.ns.use_resources
 
-    if ns.verbose2 and bad:
-        print("Re-running failed tests in verbose mode")
-        for test in bad[:]:
-            print("Re-running test %r in verbose mode" % test)
-            sys.stdout.flush()
-            try:
-                ns.verbose = True
-                ok = runtest(test, True, ns.quiet, ns.huntrleaks,
-                             timeout=ns.timeout)
-            except KeyboardInterrupt:
-                # print a newline separate from the ^C
-                print()
-                break
+        if self.ns.forever:
+            def test_forever(tests):
+                while True:
+                    for test in tests:
+                        yield test
+                        if self.bad:
+                            return
+            self.tests = test_forever(list(self.selected))
+            self.test_count = ''
+            self.test_count_width = 3
+        else:
+            self.tests = iter(self.selected)
+            self.test_count = '/{}'.format(len(self.selected))
+            self.test_count_width = len(self.test_count) - 1
+
+        if self.ns.use_mp:
+            self._run_tests_mp()
+        else:
+            self._run_tests_sequential()
+
+    def finalize(self):
+        if self.next_single_filename:
+            if self.next_single_test:
+                with open(self.next_single_filename, 'w') as fp:
+                    fp.write(self.next_single_test + '\n')
             else:
-                if ok[0] in {PASSED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED}:
-                    bad.remove(test)
-        else:
-            if bad:
-                print(count(len(bad), 'test'), "failed again:")
-                printlist(bad)
+                os.unlink(self.next_single_filename)
 
-    if ns.single:
-        if next_single_test:
-            with open(filename, 'w') as fp:
-                fp.write(next_single_test + '\n')
-        else:
-            os.unlink(filename)
+        if self.ns.trace:
+            r = self.tracer.results()
+            r.write_results(show_missing=True, summary=True,
+                            coverdir=self.ns.coverdir)
 
-    if ns.trace:
-        r = tracer.results()
-        r.write_results(show_missing=True, summary=True, coverdir=ns.coverdir)
+        if self.ns.runleaks:
+            os.system("leaks %d" % os.getpid())
 
-    if ns.runleaks:
-        os.system("leaks %d" % os.getpid())
-
-    sys.exit(len(bad) > 0 or interrupted)
+    def main(self, tests=None, **kwargs):
+        setup_python()
+        self.ns = _parse_args(sys.argv[1:], **kwargs)
+        self.setup_regrtest()
+        if self.ns.wait:
+            input("Press any key to continue...")
+        if self.ns.slaveargs is not None:
+            slave_runner(self.ns.slaveargs)
+        self.find_tests(tests)
+        self.run_tests()
+        self.display_result()
+        self.finalize()
+        sys.exit(len(self.bad) > 0 or self.interrupted)
 
 
 # We do not use a generator so multiple threads can call next().
@@ -518,11 +588,14 @@
     begin each line.
     """
 
-    from textwrap import fill
     blanks = ' ' * indent
     # Print the sorted list: 'x' may be a '--random' list or a set()
-    print(fill(' '.join(str(elt) for elt in sorted(x)), width,
-               initial_indent=blanks, subsequent_indent=blanks))
+    print(textwrap.fill(' '.join(str(elt) for elt in sorted(x)), width,
+                        initial_indent=blanks, subsequent_indent=blanks))
+
+
+def main(tests=None, **kwargs):
+    Regrtest().main(tests=tests, **kwargs)
 
 
 def main_in_temp_cwd():

-- 
Repository URL: https://hg.python.org/cpython


More information about the Python-checkins mailing list