[Python-checkins] [2.7] bpo-31234: Join threads explicitly in tests (#7406)
Victor Stinner
webhook-mailer at python.org
Mon Jun 4 17:53:55 EDT 2018
https://github.com/python/cpython/commit/146351860a34b3cde387930a360e57391e7b99f3
commit: 146351860a34b3cde387930a360e57391e7b99f3
branch: 2.7
author: Victor Stinner <vstinner at redhat.com>
committer: GitHub <noreply at github.com>
date: 2018-06-04T23:53:52+02:00
summary:
[2.7] bpo-31234: Join threads explicitly in tests (#7406)
* Add support.wait_threads_exit(): context manager looping at exit
until the number of threads decreases to its original number.
* Add some missing thread.join()
* test_asyncore.test_send(): call explicitly t.join() because the cleanup
function is only called outside the test method, whereas the method
has a @test_support.reap_threads decorator
* test_hashlib: replace threading.Event with thread.join()
* test_thread:
* Use wait_threads_exit() context manager
* Replace test_support with support
* test_forkinthread(): check child process exit status in the
main thread to better handle error.
files:
M Lib/test/support/__init__.py
M Lib/test/test_asyncore.py
M Lib/test/test_hashlib.py
M Lib/test/test_httpservers.py
M Lib/test/test_smtplib.py
M Lib/test/test_thread.py
diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py
index 3e44c5a35d92..47af6b39465f 100644
--- a/Lib/test/support/__init__.py
+++ b/Lib/test/support/__init__.py
@@ -1722,6 +1722,43 @@ def decorator(*args):
threading_cleanup(*key)
return decorator
+
+ at contextlib.contextmanager
+def wait_threads_exit(timeout=60.0):
+ """
+ bpo-31234: Context manager to wait until all threads created in the with
+ statement exit.
+
+ Use thread.count() to check if threads exited. Indirectly, wait until
+ threads exit the internal t_bootstrap() C function of the thread module.
+
+ threading_setup() and threading_cleanup() are designed to emit a warning
+ if a test leaves running threads in the background. This context manager
+ is designed to cleanup threads started by the thread.start_new_thread()
+ which doesn't allow to wait for thread exit, whereas thread.Thread has a
+ join() method.
+ """
+ old_count = thread._count()
+ try:
+ yield
+ finally:
+ start_time = time.time()
+ deadline = start_time + timeout
+ while True:
+ count = thread._count()
+ if count <= old_count:
+ break
+ if time.time() > deadline:
+ dt = time.time() - start_time
+ msg = ("wait_threads() failed to cleanup %s "
+ "threads after %.1f seconds "
+ "(count: %s, old count: %s)"
+ % (count - old_count, dt, count, old_count))
+ raise AssertionError(msg)
+ time.sleep(0.010)
+ gc_collect()
+
+
def reap_children():
"""Use this function at the end of test_main() whenever sub-processes
are started. This will help ensure that no extra children (zombies)
diff --git a/Lib/test/test_asyncore.py b/Lib/test/test_asyncore.py
index 693d67cd8a3d..4b347a3a6dd6 100644
--- a/Lib/test/test_asyncore.py
+++ b/Lib/test/test_asyncore.py
@@ -727,19 +727,20 @@ def test_quick_connect(self):
server = TCPServer()
t = threading.Thread(target=lambda: asyncore.loop(timeout=0.1, count=500))
t.start()
- self.addCleanup(t.join)
-
- for x in xrange(20):
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.settimeout(.2)
- s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
- struct.pack('ii', 1, 0))
- try:
- s.connect(server.address)
- except socket.error:
- pass
- finally:
- s.close()
+ try:
+ for x in xrange(20):
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.settimeout(.2)
+ s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
+ struct.pack('ii', 1, 0))
+ try:
+ s.connect(server.address)
+ except socket.error:
+ pass
+ finally:
+ s.close()
+ finally:
+ t.join()
class TestAPI_UseSelect(BaseTestAPI):
diff --git a/Lib/test/test_hashlib.py b/Lib/test/test_hashlib.py
index 471ebb4dd17b..b8d6388feaf9 100644
--- a/Lib/test/test_hashlib.py
+++ b/Lib/test/test_hashlib.py
@@ -371,25 +371,25 @@ def test_threaded_hashing(self):
data = smallest_data*200000
expected_hash = hashlib.sha1(data*num_threads).hexdigest()
- def hash_in_chunks(chunk_size, event):
+ def hash_in_chunks(chunk_size):
index = 0
while index < len(data):
hasher.update(data[index:index+chunk_size])
index += chunk_size
- event.set()
- events = []
+ threads = []
for threadnum in xrange(num_threads):
chunk_size = len(data) // (10**threadnum)
assert chunk_size > 0
assert chunk_size % len(smallest_data) == 0
- event = threading.Event()
- events.append(event)
- threading.Thread(target=hash_in_chunks,
- args=(chunk_size, event)).start()
-
- for event in events:
- event.wait()
+ thread = threading.Thread(target=hash_in_chunks,
+ args=(chunk_size,))
+ threads.append(thread)
+
+ for thread in threads:
+ thread.start()
+ for thread in threads:
+ thread.join()
self.assertEqual(expected_hash, hasher.hexdigest())
diff --git a/Lib/test/test_httpservers.py b/Lib/test/test_httpservers.py
index 11f0d5d61439..93807c1959bb 100644
--- a/Lib/test/test_httpservers.py
+++ b/Lib/test/test_httpservers.py
@@ -66,6 +66,7 @@ def run(self):
def stop(self):
self.server.shutdown()
+ self.join()
class BaseTestCase(unittest.TestCase):
diff --git a/Lib/test/test_smtplib.py b/Lib/test/test_smtplib.py
index 1bb669018807..703b631c175b 100644
--- a/Lib/test/test_smtplib.py
+++ b/Lib/test/test_smtplib.py
@@ -306,12 +306,14 @@ def setUp(self):
self.sock.settimeout(15)
self.port = test_support.bind_port(self.sock)
servargs = (self.evt, self.respdata, self.sock)
- threading.Thread(target=server, args=servargs).start()
+ self.thread = threading.Thread(target=server, args=servargs)
+ self.thread.start()
self.evt.wait()
self.evt.clear()
def tearDown(self):
self.evt.wait()
+ self.thread.join()
sys.stdout = self.old_stdout
def testLineTooLong(self):
diff --git a/Lib/test/test_thread.py b/Lib/test/test_thread.py
index c8caa5ddff03..93690a60b2ff 100644
--- a/Lib/test/test_thread.py
+++ b/Lib/test/test_thread.py
@@ -1,8 +1,8 @@
import os
import unittest
import random
-from test import test_support
-thread = test_support.import_module('thread')
+from test import support
+thread = support.import_module('thread')
import time
import sys
import weakref
@@ -17,7 +17,7 @@
def verbose_print(arg):
"""Helper function for printing out debugging output."""
- if test_support.verbose:
+ if support.verbose:
with _print_mutex:
print arg
@@ -34,8 +34,8 @@ def setUp(self):
self.running = 0
self.next_ident = 0
- key = test_support.threading_setup()
- self.addCleanup(test_support.threading_cleanup, *key)
+ key = support.threading_setup()
+ self.addCleanup(support.threading_cleanup, *key)
class ThreadRunningTests(BasicThreadTest):
@@ -60,12 +60,13 @@ def task(self, ident):
self.done_mutex.release()
def test_starting_threads(self):
- # Basic test for thread creation.
- for i in range(NUMTASKS):
- self.newtask()
- verbose_print("waiting for tasks to complete...")
- self.done_mutex.acquire()
- verbose_print("all tasks done")
+ with support.wait_threads_exit():
+ # Basic test for thread creation.
+ for i in range(NUMTASKS):
+ self.newtask()
+ verbose_print("waiting for tasks to complete...")
+ self.done_mutex.acquire()
+ verbose_print("all tasks done")
def test_stack_size(self):
# Various stack size tests.
@@ -95,12 +96,13 @@ def test_nt_and_posix_stack_size(self):
verbose_print("trying stack_size = (%d)" % tss)
self.next_ident = 0
self.created = 0
- for i in range(NUMTASKS):
- self.newtask()
+ with support.wait_threads_exit():
+ for i in range(NUMTASKS):
+ self.newtask()
- verbose_print("waiting for all tasks to complete")
- self.done_mutex.acquire()
- verbose_print("all tasks done")
+ verbose_print("waiting for all tasks to complete")
+ self.done_mutex.acquire()
+ verbose_print("all tasks done")
thread.stack_size(0)
@@ -110,25 +112,28 @@ def test__count(self):
mut = thread.allocate_lock()
mut.acquire()
started = []
+
def task():
started.append(None)
mut.acquire()
mut.release()
- thread.start_new_thread(task, ())
- while not started:
- time.sleep(0.01)
- self.assertEqual(thread._count(), orig + 1)
- # Allow the task to finish.
- mut.release()
- # The only reliable way to be sure that the thread ended from the
- # interpreter's point of view is to wait for the function object to be
- # destroyed.
- done = []
- wr = weakref.ref(task, lambda _: done.append(None))
- del task
- while not done:
- time.sleep(0.01)
- self.assertEqual(thread._count(), orig)
+
+ with support.wait_threads_exit():
+ thread.start_new_thread(task, ())
+ while not started:
+ time.sleep(0.01)
+ self.assertEqual(thread._count(), orig + 1)
+ # Allow the task to finish.
+ mut.release()
+ # The only reliable way to be sure that the thread ended from the
+ # interpreter's point of view is to wait for the function object to be
+ # destroyed.
+ done = []
+ wr = weakref.ref(task, lambda _: done.append(None))
+ del task
+ while not done:
+ time.sleep(0.01)
+ self.assertEqual(thread._count(), orig)
def test_save_exception_state_on_error(self):
# See issue #14474
@@ -143,14 +148,13 @@ def mywrite(self, *args):
real_write(self, *args)
c = thread._count()
started = thread.allocate_lock()
- with test_support.captured_output("stderr") as stderr:
+ with support.captured_output("stderr") as stderr:
real_write = stderr.write
stderr.write = mywrite
started.acquire()
- thread.start_new_thread(task, ())
- started.acquire()
- while thread._count() > c:
- time.sleep(0.01)
+ with support.wait_threads_exit():
+ thread.start_new_thread(task, ())
+ started.acquire()
self.assertIn("Traceback", stderr.getvalue())
@@ -182,13 +186,14 @@ def enter(self):
class BarrierTest(BasicThreadTest):
def test_barrier(self):
- self.bar = Barrier(NUMTASKS)
- self.running = NUMTASKS
- for i in range(NUMTASKS):
- thread.start_new_thread(self.task2, (i,))
- verbose_print("waiting for tasks to end")
- self.done_mutex.acquire()
- verbose_print("tasks done")
+ with support.wait_threads_exit():
+ self.bar = Barrier(NUMTASKS)
+ self.running = NUMTASKS
+ for i in range(NUMTASKS):
+ thread.start_new_thread(self.task2, (i,))
+ verbose_print("waiting for tasks to end")
+ self.done_mutex.acquire()
+ verbose_print("tasks done")
def task2(self, ident):
for i in range(NUMTRIPS):
@@ -226,8 +231,9 @@ def setUp(self):
@unittest.skipIf(sys.platform.startswith('win'),
"This test is only appropriate for POSIX-like systems.")
- @test_support.reap_threads
+ @support.reap_threads
def test_forkinthread(self):
+ non_local = {'status': None}
def thread1():
try:
pid = os.fork() # fork in a thread
@@ -246,11 +252,13 @@ def thread1():
else: # parent
os.close(self.write_fd)
pid, status = os.waitpid(pid, 0)
- self.assertEqual(status, 0)
+ non_local['status'] = status
- thread.start_new_thread(thread1, ())
- self.assertEqual(os.read(self.read_fd, 2), "OK",
- "Unable to fork() in thread")
+ with support.wait_threads_exit():
+ thread.start_new_thread(thread1, ())
+ self.assertEqual(os.read(self.read_fd, 2), "OK",
+ "Unable to fork() in thread")
+ self.assertEqual(non_local['status'], 0)
def tearDown(self):
try:
@@ -265,7 +273,7 @@ def tearDown(self):
def test_main():
- test_support.run_unittest(ThreadRunningTests, BarrierTest, LockTests,
+ support.run_unittest(ThreadRunningTests, BarrierTest, LockTests,
TestForkInThread)
if __name__ == "__main__":
More information about the Python-checkins
mailing list