[Python-checkins] cpython (2.7): Issue #23799: Added test.test_support.start_threads() for running and

serhiy.storchaka python-checkins at python.org
Wed Apr 1 12:07:23 CEST 2015


https://hg.python.org/cpython/rev/f8358f6e50e7
changeset:   95340:f8358f6e50e7
branch:      2.7
parent:      95311:051cf1aa6288
user:        Serhiy Storchaka <storchaka at gmail.com>
date:        Wed Apr 01 12:56:39 2015 +0300
summary:
  Issue #23799: Added test.test_support.start_threads() for running and
cleaning up multiple threads.

files:
  Lib/test/test_bz2.py              |  12 ++---
  Lib/test/test_capi.py             |  32 +++++++---------
  Lib/test/test_gc.py               |  12 +----
  Lib/test/test_io.py               |  32 ++++++----------
  Lib/test/test_support.py          |  35 ++++++++++++++++++-
  Lib/test/test_threadedtempfile.py |  27 +++----------
  Lib/test/test_threading_local.py  |  19 +++------
  Misc/NEWS                         |   3 +
  8 files changed, 85 insertions(+), 87 deletions(-)


diff --git a/Lib/test/test_bz2.py b/Lib/test/test_bz2.py
--- a/Lib/test/test_bz2.py
+++ b/Lib/test/test_bz2.py
@@ -1,4 +1,4 @@
-from test import test_support
+from test import test_support as support
 from test.test_support import TESTFN, _4G, bigmemtest, import_module, findfile
 
 import unittest
@@ -306,10 +306,8 @@
                 for i in range(5):
                     f.write(data)
             threads = [threading.Thread(target=comp) for i in range(nthreads)]
-            for t in threads:
-                t.start()
-            for t in threads:
-                t.join()
+            with support.start_threads(threads):
+                pass
 
     def testMixedIterationReads(self):
         # Issue #8397: mixed iteration and reads should be forbidden.
@@ -482,13 +480,13 @@
         self.assertEqual(text.strip("a"), "")
 
 def test_main():
-    test_support.run_unittest(
+    support.run_unittest(
         BZ2FileTest,
         BZ2CompressorTest,
         BZ2DecompressorTest,
         FuncTest
     )
-    test_support.reap_children()
+    support.reap_children()
 
 if __name__ == '__main__':
     test_main()
diff --git a/Lib/test/test_capi.py b/Lib/test/test_capi.py
--- a/Lib/test/test_capi.py
+++ b/Lib/test/test_capi.py
@@ -6,7 +6,7 @@
 import time
 import random
 import unittest
-from test import test_support
+from test import test_support as support
 try:
     import thread
     import threading
@@ -14,7 +14,7 @@
     thread = None
     threading = None
 # Skip this test if the _testcapi module isn't available.
-_testcapi = test_support.import_module('_testcapi')
+_testcapi = support.import_module('_testcapi')
 
 
 @unittest.skipUnless(threading, 'Threading required for this test.')
@@ -42,7 +42,7 @@
             #this busy loop is where we expect to be interrupted to
             #run our callbacks.  Note that callbacks are only run on the
             #main thread
-            if False and test_support.verbose:
+            if False and support.verbose:
                 print "(%i)"%(len(l),),
             for i in xrange(1000):
                 a = i*i
@@ -51,7 +51,7 @@
             count += 1
             self.assertTrue(count < 10000,
                 "timeout waiting for %i callbacks, got %i"%(n, len(l)))
-        if False and test_support.verbose:
+        if False and support.verbose:
             print "(%i)"%(len(l),)
 
     def test_pendingcalls_threaded(self):
@@ -67,15 +67,11 @@
         context.lock = threading.Lock()
         context.event = threading.Event()
 
-        for i in range(context.nThreads):
-            t = threading.Thread(target=self.pendingcalls_thread, args = (context,))
-            t.start()
-            threads.append(t)
-
-        self.pendingcalls_wait(context.l, n, context)
-
-        for t in threads:
-            t.join()
+        threads = [threading.Thread(target=self.pendingcalls_thread,
+                                    args=(context,))
+                   for i in range(context.nThreads)]
+        with support.start_threads(threads):
+            self.pendingcalls_wait(context.l, n, context)
 
     def pendingcalls_thread(self, context):
         try:
@@ -84,7 +80,7 @@
             with context.lock:
                 context.nFinished += 1
                 nFinished = context.nFinished
-                if False and test_support.verbose:
+                if False and support.verbose:
                     print "finished threads: ", nFinished
             if nFinished == context.nThreads:
                 context.event.set()
@@ -103,7 +99,7 @@
 @unittest.skipUnless(threading and thread, 'Threading required for this test.')
 class TestThreadState(unittest.TestCase):
 
-    @test_support.reap_threads
+    @support.reap_threads
     def test_thread_state(self):
         # some extra thread-state tests driven via _testcapi
         def target():
@@ -129,14 +125,14 @@
     for name in dir(_testcapi):
         if name.startswith('test_'):
             test = getattr(_testcapi, name)
-            if test_support.verbose:
+            if support.verbose:
                 print "internal", name
             try:
                 test()
             except _testcapi.error:
-                raise test_support.TestFailed, sys.exc_info()[1]
+                raise support.TestFailed, sys.exc_info()[1]
 
-    test_support.run_unittest(TestPendingCalls, TestThreadState)
+    support.run_unittest(TestPendingCalls, TestThreadState)
 
 if __name__ == "__main__":
     test_main()
diff --git a/Lib/test/test_gc.py b/Lib/test/test_gc.py
--- a/Lib/test/test_gc.py
+++ b/Lib/test/test_gc.py
@@ -1,5 +1,5 @@
 import unittest
-from test.test_support import verbose, run_unittest
+from test.test_support import verbose, run_unittest, start_threads
 import sys
 import time
 import gc
@@ -352,19 +352,13 @@
         old_checkinterval = sys.getcheckinterval()
         sys.setcheckinterval(3)
         try:
-            exit = False
+            exit = []
             threads = []
             for i in range(N_THREADS):
                 t = threading.Thread(target=run_thread)
                 threads.append(t)
-            try:
-                for t in threads:
-                    t.start()
-            finally:
+            with start_threads(threads, lambda: exit.append(1)):
                 time.sleep(1.0)
-                exit = True
-            for t in threads:
-                t.join()
         finally:
             sys.setcheckinterval(old_checkinterval)
         gc.collect()
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py
--- a/Lib/test/test_io.py
+++ b/Lib/test/test_io.py
@@ -985,11 +985,8 @@
                         errors.append(e)
                         raise
                 threads = [threading.Thread(target=f) for x in range(20)]
-                for t in threads:
-                    t.start()
-                time.sleep(0.02) # yield
-                for t in threads:
-                    t.join()
+                with support.start_threads(threads):
+                    time.sleep(0.02) # yield
                 self.assertFalse(errors,
                     "the following exceptions were caught: %r" % errors)
                 s = b''.join(results)
@@ -1299,11 +1296,8 @@
                         errors.append(e)
                         raise
                 threads = [threading.Thread(target=f) for x in range(20)]
-                for t in threads:
-                    t.start()
-                time.sleep(0.02) # yield
-                for t in threads:
-                    t.join()
+                with support.start_threads(threads):
+                    time.sleep(0.02) # yield
                 self.assertFalse(errors,
                     "the following exceptions were caught: %r" % errors)
                 bufio.close()
@@ -2555,14 +2549,10 @@
                 text = "Thread%03d\n" % n
                 event.wait()
                 f.write(text)
-            threads = [threading.Thread(target=lambda n=x: run(n))
+            threads = [threading.Thread(target=run, args=(x,))
                        for x in range(20)]
-            for t in threads:
-                t.start()
-            time.sleep(0.02)
-            event.set()
-            for t in threads:
-                t.join()
+            with support.start_threads(threads, event.set):
+                time.sleep(0.02)
         with self.open(support.TESTFN) as f:
             content = f.read()
             for n in range(20):
@@ -3042,9 +3032,11 @@
             # return with a successful (partial) result rather than an EINTR.
             # The buffered IO layer must check for pending signal
             # handlers, which in this case will invoke alarm_interrupt().
-            self.assertRaises(ZeroDivisionError,
-                        wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1))
-            t.join()
+            try:
+                with self.assertRaises(ZeroDivisionError):
+                    wio.write(item * (support.PIPE_MAX_SIZE // len(item) + 1))
+            finally:
+                t.join()
             # We got one byte, get another one and check that it isn't a
             # repeat of the first one.
             read_results.append(os.read(r, 1))
diff --git a/Lib/test/test_support.py b/Lib/test/test_support.py
--- a/Lib/test/test_support.py
+++ b/Lib/test/test_support.py
@@ -37,7 +37,7 @@
            "captured_stdout", "TransientResource", "transient_internet",
            "run_with_locale", "set_memlimit", "bigmemtest", "bigaddrspacetest",
            "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup",
-           "threading_cleanup", "reap_children", "cpython_only",
+           "threading_cleanup", "reap_threads", "start_threads", "cpython_only",
            "check_impl_detail", "get_attribute", "py3k_bytes",
            "import_fresh_module", "threading_cleanup", "reap_children",
            "strip_python_stderr", "IPV6_ENABLED"]
@@ -1509,6 +1509,39 @@
                 break
 
 @contextlib.contextmanager
+def start_threads(threads, unlock=None):
+    threads = list(threads)
+    started = []
+    try:
+        try:
+            for t in threads:
+                t.start()
+                started.append(t)
+        except:
+            if verbose:
+                print("Can't start %d threads, only %d threads started" %
+                      (len(threads), len(started)))
+            raise
+        yield
+    finally:
+        if unlock:
+            unlock()
+        endtime = starttime = time.time()
+        for timeout in range(1, 16):
+            endtime += 60
+            for t in started:
+                t.join(max(endtime - time.time(), 0.01))
+            started = [t for t in started if t.isAlive()]
+            if not started:
+                break
+            if verbose:
+                print('Unable to join %d threads during a period of '
+                      '%d minutes' % (len(started), timeout))
+    started = [t for t in started if t.isAlive()]
+    if started:
+        raise AssertionError('Unable to join %d threads' % len(started))
+
+ at contextlib.contextmanager
 def swap_attr(obj, attr, new_val):
     """Temporary swap out an attribute with a new object.
 
diff --git a/Lib/test/test_threadedtempfile.py b/Lib/test/test_threadedtempfile.py
--- a/Lib/test/test_threadedtempfile.py
+++ b/Lib/test/test_threadedtempfile.py
@@ -18,7 +18,7 @@
 
 import tempfile
 
-from test.test_support import threading_setup, threading_cleanup, run_unittest, import_module
+from test.test_support import start_threads, run_unittest, import_module
 threading = import_module('threading')
 import unittest
 import StringIO
@@ -46,25 +46,12 @@
 
 class ThreadedTempFileTest(unittest.TestCase):
     def test_main(self):
-        threads = []
-        thread_info = threading_setup()
-
-        for i in range(NUM_THREADS):
-            t = TempFileGreedy()
-            threads.append(t)
-            t.start()
-
-        startEvent.set()
-
-        ok = 0
-        errors = []
-        for t in threads:
-            t.join()
-            ok += t.ok_count
-            if t.error_count:
-                errors.append(str(t.getName()) + str(t.errors.getvalue()))
-
-        threading_cleanup(*thread_info)
+        threads = [TempFileGreedy() for i in range(NUM_THREADS)]
+        with start_threads(threads, startEvent.set):
+            pass
+        ok = sum(t.ok_count for t in threads)
+        errors = [str(t.getName()) + str(t.errors.getvalue())
+                  for t in threads if t.error_count]
 
         msg = "Errors: errors %d ok %d\n%s" % (len(errors), ok,
             '\n'.join(errors))
diff --git a/Lib/test/test_threading_local.py b/Lib/test/test_threading_local.py
--- a/Lib/test/test_threading_local.py
+++ b/Lib/test/test_threading_local.py
@@ -1,12 +1,12 @@
 import unittest
 from doctest import DocTestSuite
-from test import test_support
+from test import test_support as support
 import weakref
 import gc
 
 # Modules under test
-_thread = test_support.import_module('thread')
-threading = test_support.import_module('threading')
+_thread = support.import_module('thread')
+threading = support.import_module('threading')
 import _threading_local
 
 
@@ -63,14 +63,9 @@
             # Simply check that the variable is correctly set
             self.assertEqual(local.x, i)
 
-        threads= []
-        for i in range(10):
-            t = threading.Thread(target=f, args=(i,))
-            t.start()
-            threads.append(t)
-
-        for t in threads:
-            t.join()
+        with support.start_threads(threading.Thread(target=f, args=(i,))
+                                   for i in range(10)):
+            pass
 
     def test_derived_cycle_dealloc(self):
         # http://bugs.python.org/issue6990
@@ -228,7 +223,7 @@
                                    setUp=setUp, tearDown=tearDown)
                       )
 
-    test_support.run_unittest(suite)
+    support.run_unittest(suite)
 
 if __name__ == '__main__':
     test_main()
diff --git a/Misc/NEWS b/Misc/NEWS
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -182,6 +182,9 @@
 Tests
 -----
 
+- Issue #23799: Added test.test_support.start_threads() for running and
+  cleaning up multiple threads.
+
 - Issue #22390: test.regrtest now emits a warning if temporary files or
   directories are left after running a test.
 

-- 
Repository URL: https://hg.python.org/cpython


More information about the Python-checkins mailing list