[Python-checkins] cpython: Issue #25220: Create libregrtest/runtest_mp.py
victor.stinner
python-checkins at python.org
Tue Sep 29 23:25:04 CEST 2015
https://hg.python.org/cpython/rev/12c666eea556
changeset: 98416:12c666eea556
user: Victor Stinner <victor.stinner at gmail.com>
date: Tue Sep 29 23:15:38 2015 +0200
summary:
Issue #25220: Create libregrtest/runtest_mp.py
Move the code to run tests in multiple processes using threading and subprocess
to a new submodule.
Move also slave_runner() (renamed to run_tests_slave()) and
run_test_in_subprocess() (renamed to run_tests_in_subprocess()) there.
files:
Lib/test/libregrtest/main.py | 120 +------------
Lib/test/libregrtest/runtest.py | 33 ---
Lib/test/libregrtest/runtest_mp.py | 158 +++++++++++++++++
3 files changed, 164 insertions(+), 147 deletions(-)
diff --git a/Lib/test/libregrtest/main.py b/Lib/test/libregrtest/main.py
--- a/Lib/test/libregrtest/main.py
+++ b/Lib/test/libregrtest/main.py
@@ -1,5 +1,4 @@
import faulthandler
-import json
import os
import platform
import random
@@ -9,13 +8,10 @@
import sysconfig
import tempfile
import textwrap
-import traceback
import unittest
from test.libregrtest.runtest import (
- findtests, runtest, run_test_in_subprocess,
- STDTESTS, NOTTESTS,
- PASSED, FAILED, ENV_CHANGED, SKIPPED,
- RESOURCE_DENIED, INTERRUPTED, CHILD_ERROR)
+ findtests, runtest,
+ STDTESTS, NOTTESTS, PASSED, FAILED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED)
from test.libregrtest.refleak import warm_caches
from test.libregrtest.cmdline import _parse_args
from test import support
@@ -39,23 +35,6 @@
TEMPDIR = os.path.abspath(TEMPDIR)
-def slave_runner(slaveargs):
- args, kwargs = json.loads(slaveargs)
- if kwargs.get('huntrleaks'):
- unittest.BaseTestSuite._cleanup = False
- try:
- result = runtest(*args, **kwargs)
- except KeyboardInterrupt:
- result = INTERRUPTED, ''
- except BaseException as e:
- traceback.print_exc()
- result = CHILD_ERROR, str(e)
- sys.stdout.flush()
- print() # Force a newline (just in case)
- print(json.dumps(result))
- sys.exit(0)
-
-
def setup_python():
# Display the Python traceback on fatal errors (e.g. segfault)
faulthandler.enable(all_threads=True)
@@ -367,75 +346,6 @@
print(count(len(self.bad), 'test'), "failed again:")
printlist(self.bad)
- def _run_tests_mp(self):
- try:
- from threading import Thread
- except ImportError:
- print("Multiprocess option requires thread support")
- sys.exit(2)
- from queue import Queue
-
- debug_output_pat = re.compile(r"\[\d+ refs, \d+ blocks\]$")
- output = Queue()
- pending = MultiprocessTests(self.tests)
-
- def work():
- # A worker thread.
- try:
- while True:
- try:
- test = next(pending)
- except StopIteration:
- output.put((None, None, None, None))
- return
- retcode, stdout, stderr = run_test_in_subprocess(test, self.ns)
- # Strip last refcount output line if it exists, since it
- # comes from the shutdown of the interpreter in the subcommand.
- stderr = debug_output_pat.sub("", stderr)
- stdout, _, result = stdout.strip().rpartition("\n")
- if retcode != 0:
- result = (CHILD_ERROR, "Exit code %s" % retcode)
- output.put((test, stdout.rstrip(), stderr.rstrip(), result))
- return
- if not result:
- output.put((None, None, None, None))
- return
- result = json.loads(result)
- output.put((test, stdout.rstrip(), stderr.rstrip(), result))
- except BaseException:
- output.put((None, None, None, None))
- raise
-
- workers = [Thread(target=work) for i in range(self.ns.use_mp)]
- for worker in workers:
- worker.start()
- finished = 0
- test_index = 1
- try:
- while finished < self.ns.use_mp:
- test, stdout, stderr, result = output.get()
- if test is None:
- finished += 1
- continue
- self.accumulate_result(test, result)
- self.display_progress(test_index, test)
- if stdout:
- print(stdout)
- if stderr:
- print(stderr, file=sys.stderr)
- sys.stdout.flush()
- sys.stderr.flush()
- if result[0] == INTERRUPTED:
- raise KeyboardInterrupt
- if result[0] == CHILD_ERROR:
- raise Exception("Child error on {}: {}".format(test, result[1]))
- test_index += 1
- except KeyboardInterrupt:
- self.interrupted = True
- pending.interrupted = True
- for worker in workers:
- worker.join()
-
def _run_tests_sequential(self):
save_modules = sys.modules.keys()
@@ -491,7 +401,8 @@
self.test_count_width = len(self.test_count) - 1
if self.ns.use_mp:
- self._run_tests_mp()
+ from test.libregrtest.runtest_mp import run_tests_multiprocess
+ run_tests_multiprocess(self)
else:
self._run_tests_sequential()
@@ -518,7 +429,8 @@
if self.ns.wait:
input("Press any key to continue...")
if self.ns.slaveargs is not None:
- slave_runner(self.ns.slaveargs)
+ from test.libregrtest.runtest_mp import run_tests_slave
+ run_tests_slave(self.ns.slaveargs)
self.find_tests(tests)
self.run_tests()
self.display_result()
@@ -526,26 +438,6 @@
sys.exit(len(self.bad) > 0 or self.interrupted)
-# We do not use a generator so multiple threads can call next().
-class MultiprocessTests(object):
-
- """A thread-safe iterator over tests for multiprocess mode."""
-
- def __init__(self, tests):
- self.interrupted = False
- self.lock = threading.Lock()
- self.tests = tests
-
- def __iter__(self):
- return self
-
- def __next__(self):
- with self.lock:
- if self.interrupted:
- raise StopIteration('tests interrupted')
- return next(self.tests)
-
-
def replace_stdout():
"""Set stdout encoder error handler to backslashreplace (as stderr error
handler) to avoid UnicodeEncodeError when printing a traceback"""
diff --git a/Lib/test/libregrtest/runtest.py b/Lib/test/libregrtest/runtest.py
--- a/Lib/test/libregrtest/runtest.py
+++ b/Lib/test/libregrtest/runtest.py
@@ -1,7 +1,6 @@
import faulthandler
import importlib
import io
-import json
import os
import sys
import time
@@ -22,38 +21,6 @@
CHILD_ERROR = -5 # error in a child process
-def run_test_in_subprocess(testname, ns):
- """Run the given test in a subprocess with --slaveargs.
-
- ns is the option Namespace parsed from command-line arguments. regrtest
- is invoked in a subprocess with the --slaveargs argument; when the
- subprocess exits, its return code, stdout and stderr are returned as a
- 3-tuple.
- """
- from subprocess import Popen, PIPE
- base_cmd = ([sys.executable] + support.args_from_interpreter_flags() +
- ['-X', 'faulthandler', '-m', 'test.regrtest'])
-
- slaveargs = (
- (testname, ns.verbose, ns.quiet),
- dict(huntrleaks=ns.huntrleaks,
- use_resources=ns.use_resources,
- output_on_failure=ns.verbose3,
- timeout=ns.timeout, failfast=ns.failfast,
- match_tests=ns.match_tests))
- # Running the child from the same working directory as regrtest's original
- # invocation ensures that TEMPDIR for the child is the same when
- # sysconfig.is_python_build() is true. See issue 15300.
- popen = Popen(base_cmd + ['--slaveargs', json.dumps(slaveargs)],
- stdout=PIPE, stderr=PIPE,
- universal_newlines=True,
- close_fds=(os.name != 'nt'),
- cwd=support.SAVEDCWD)
- stdout, stderr = popen.communicate()
- retcode = popen.wait()
- return retcode, stdout, stderr
-
-
# small set of tests to determine if we have a basically functioning interpreter
# (i.e. if any of these fail, then anything else is likely to follow)
STDTESTS = [
diff --git a/Lib/test/libregrtest/runtest_mp.py b/Lib/test/libregrtest/runtest_mp.py
new file mode 100644
--- /dev/null
+++ b/Lib/test/libregrtest/runtest_mp.py
@@ -0,0 +1,158 @@
+import json
+import os
+import re
+import sys
+import traceback
+import unittest
+from queue import Queue
+from test import support
+try:
+ import threading
+except ImportError:
+ print("Multiprocess option requires thread support")
+ sys.exit(2)
+
+from test.libregrtest.runtest import runtest, INTERRUPTED, CHILD_ERROR
+
+
+debug_output_pat = re.compile(r"\[\d+ refs, \d+ blocks\]$")
+
+
+def run_tests_in_subprocess(testname, ns):
+ """Run the given test in a subprocess with --slaveargs.
+
+ ns is the option Namespace parsed from command-line arguments. regrtest
+ is invoked in a subprocess with the --slaveargs argument; when the
+ subprocess exits, its return code, stdout and stderr are returned as a
+ 3-tuple.
+ """
+ from subprocess import Popen, PIPE
+ base_cmd = ([sys.executable] + support.args_from_interpreter_flags() +
+ ['-X', 'faulthandler', '-m', 'test.regrtest'])
+
+ slaveargs = (
+ (testname, ns.verbose, ns.quiet),
+ dict(huntrleaks=ns.huntrleaks,
+ use_resources=ns.use_resources,
+ output_on_failure=ns.verbose3,
+ timeout=ns.timeout, failfast=ns.failfast,
+ match_tests=ns.match_tests))
+ # Running the child from the same working directory as regrtest's original
+ # invocation ensures that TEMPDIR for the child is the same when
+ # sysconfig.is_python_build() is true. See issue 15300.
+ popen = Popen(base_cmd + ['--slaveargs', json.dumps(slaveargs)],
+ stdout=PIPE, stderr=PIPE,
+ universal_newlines=True,
+ close_fds=(os.name != 'nt'),
+ cwd=support.SAVEDCWD)
+ stdout, stderr = popen.communicate()
+ retcode = popen.wait()
+ return retcode, stdout, stderr
+
+
+def run_tests_slave(slaveargs):
+ args, kwargs = json.loads(slaveargs)
+ if kwargs.get('huntrleaks'):
+ unittest.BaseTestSuite._cleanup = False
+ try:
+ result = runtest(*args, **kwargs)
+ except KeyboardInterrupt:
+ result = INTERRUPTED, ''
+ except BaseException as e:
+ traceback.print_exc()
+ result = CHILD_ERROR, str(e)
+ sys.stdout.flush()
+ print() # Force a newline (just in case)
+ print(json.dumps(result))
+ sys.exit(0)
+
+
+# We do not use a generator so multiple threads can call next().
+class MultiprocessIterator:
+
+ """A thread-safe iterator over tests for multiprocess mode."""
+
+ def __init__(self, tests):
+ self.interrupted = False
+ self.lock = threading.Lock()
+ self.tests = tests
+
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ with self.lock:
+ if self.interrupted:
+ raise StopIteration('tests interrupted')
+ return next(self.tests)
+
+
+class MultiprocessThread(threading.Thread):
+ def __init__(self, pending, output, ns):
+ super().__init__()
+ self.pending = pending
+ self.output = output
+ self.ns = ns
+
+ def run(self):
+ # A worker thread.
+ try:
+ while True:
+ try:
+ test = next(self.pending)
+ except StopIteration:
+ self.output.put((None, None, None, None))
+ return
+ retcode, stdout, stderr = run_tests_in_subprocess(test, self.ns)
+ # Strip last refcount output line if it exists, since it
+ # comes from the shutdown of the interpreter in the subcommand.
+ stderr = debug_output_pat.sub("", stderr)
+ stdout, _, result = stdout.strip().rpartition("\n")
+ if retcode != 0:
+ result = (CHILD_ERROR, "Exit code %s" % retcode)
+ self.output.put((test, stdout.rstrip(), stderr.rstrip(), result))
+ return
+ if not result:
+ self.output.put((None, None, None, None))
+ return
+ result = json.loads(result)
+ self.output.put((test, stdout.rstrip(), stderr.rstrip(), result))
+ except BaseException:
+ self.output.put((None, None, None, None))
+ raise
+
+
+def run_tests_multiprocess(regrtest):
+ output = Queue()
+ pending = MultiprocessIterator(regrtest.tests)
+
+ workers = [MultiprocessThread(pending, output, regrtest.ns)
+ for i in range(regrtest.ns.use_mp)]
+ for worker in workers:
+ worker.start()
+ finished = 0
+ test_index = 1
+ try:
+ while finished < regrtest.ns.use_mp:
+ test, stdout, stderr, result = output.get()
+ if test is None:
+ finished += 1
+ continue
+ regrtest.accumulate_result(test, result)
+ regrtest.display_progress(test_index, test)
+ if stdout:
+ print(stdout)
+ if stderr:
+ print(stderr, file=sys.stderr)
+ sys.stdout.flush()
+ sys.stderr.flush()
+ if result[0] == INTERRUPTED:
+ raise KeyboardInterrupt
+ if result[0] == CHILD_ERROR:
+ raise Exception("Child error on {}: {}".format(test, result[1]))
+ test_index += 1
+ except KeyboardInterrupt:
+ regrtest.interrupted = True
+ pending.interrupted = True
+ for worker in workers:
+ worker.join()
--
Repository URL: https://hg.python.org/cpython
More information about the Python-checkins
mailing list