[Python-checkins] cpython (3.5): Issue #20556: Used specific assert methods in threading tests.

serhiy.storchaka python-checkins at python.org
Mon Mar 14 04:40:43 EDT 2016


https://hg.python.org/cpython/rev/791d7ef006d3
changeset:   100521:791d7ef006d3
branch:      3.5
parent:      100518:838f68a76ea1
user:        Serhiy Storchaka <storchaka at gmail.com>
date:        Mon Mar 14 10:28:59 2016 +0200
summary:
  Issue #20556: Used specific assert methods in threading tests.

files:
  Lib/test/test_dummy_thread.py    |  16 +++++-----
  Lib/test/test_threading.py       |  27 +++++++++----------
  Lib/test/test_threading_local.py |   2 +-
  3 files changed, 22 insertions(+), 23 deletions(-)


diff --git a/Lib/test/test_dummy_thread.py b/Lib/test/test_dummy_thread.py
--- a/Lib/test/test_dummy_thread.py
+++ b/Lib/test/test_dummy_thread.py
@@ -24,14 +24,14 @@
 
     def test_initlock(self):
         #Make sure locks start locked
-        self.assertTrue(not self.lock.locked(),
+        self.assertFalse(self.lock.locked(),
                         "Lock object is not initialized unlocked.")
 
     def test_release(self):
         # Test self.lock.release()
         self.lock.acquire()
         self.lock.release()
-        self.assertTrue(not self.lock.locked(),
+        self.assertFalse(self.lock.locked(),
                         "Lock object did not release properly.")
 
     def test_improper_release(self):
@@ -46,7 +46,7 @@
     def test_cond_acquire_fail(self):
         #Test acquiring locked lock returns False
         self.lock.acquire(0)
-        self.assertTrue(not self.lock.acquire(0),
+        self.assertFalse(self.lock.acquire(0),
                         "Conditional acquiring of a locked lock incorrectly "
                          "succeeded.")
 
@@ -58,9 +58,9 @@
 
     def test_uncond_acquire_return_val(self):
         #Make sure that an unconditional locking returns True.
-        self.assertTrue(self.lock.acquire(1) is True,
+        self.assertIs(self.lock.acquire(1), True,
                         "Unconditional locking did not return True.")
-        self.assertTrue(self.lock.acquire() is True)
+        self.assertIs(self.lock.acquire(), True)
 
     def test_uncond_acquire_blocking(self):
         #Make sure that unconditional acquiring of a locked lock blocks.
@@ -80,7 +80,7 @@
         end_time = int(time.time())
         if support.verbose:
             print("done")
-        self.assertTrue((end_time - start_time) >= DELAY,
+        self.assertGreaterEqual(end_time - start_time, DELAY,
                         "Blocking by unconditional acquiring failed.")
 
 class MiscTests(unittest.TestCase):
@@ -94,7 +94,7 @@
         #Test sanity of _thread.get_ident()
         self.assertIsInstance(_thread.get_ident(), int,
                               "_thread.get_ident() returned a non-integer")
-        self.assertTrue(_thread.get_ident() != 0,
+        self.assertNotEqual(_thread.get_ident(), 0,
                         "_thread.get_ident() returned 0")
 
     def test_LockType(self):
@@ -164,7 +164,7 @@
         time.sleep(DELAY)
         if support.verbose:
             print('done')
-        self.assertTrue(testing_queue.qsize() == thread_count,
+        self.assertEqual(testing_queue.qsize(), thread_count,
                         "Not all %s threads executed properly after %s sec." %
                         (thread_count, DELAY))
 
diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py
--- a/Lib/test/test_threading.py
+++ b/Lib/test/test_threading.py
@@ -58,7 +58,7 @@
                 self.nrunning.inc()
                 if verbose:
                     print(self.nrunning.get(), 'tasks are running')
-                self.testcase.assertTrue(self.nrunning.get() <= 3)
+                self.testcase.assertLessEqual(self.nrunning.get(), 3)
 
             time.sleep(delay)
             if verbose:
@@ -66,7 +66,7 @@
 
             with self.mutex:
                 self.nrunning.dec()
-                self.testcase.assertTrue(self.nrunning.get() >= 0)
+                self.testcase.assertGreaterEqual(self.nrunning.get(), 0)
                 if verbose:
                     print('%s is finished. %d tasks are running' %
                           (self.name, self.nrunning.get()))
@@ -100,26 +100,25 @@
         for i in range(NUMTASKS):
             t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning)
             threads.append(t)
-            self.assertEqual(t.ident, None)
-            self.assertTrue(re.match('<TestThread\(.*, initial\)>', repr(t)))
+            self.assertIsNone(t.ident)
+            self.assertRegex(repr(t), r'^<TestThread\(.*, initial\)>$')
             t.start()
 
         if verbose:
             print('waiting for all tasks to complete')
         for t in threads:
             t.join()
-            self.assertTrue(not t.is_alive())
+            self.assertFalse(t.is_alive())
             self.assertNotEqual(t.ident, 0)
-            self.assertFalse(t.ident is None)
-            self.assertTrue(re.match('<TestThread\(.*, stopped -?\d+\)>',
-                                     repr(t)))
+            self.assertIsNotNone(t.ident)
+            self.assertRegex(repr(t), r'^<TestThread\(.*, stopped -?\d+\)>$')
         if verbose:
             print('all tasks done')
         self.assertEqual(numrunning.get(), 0)
 
     def test_ident_of_no_threading_threads(self):
         # The ident still must work for the main thread and dummy threads.
-        self.assertFalse(threading.currentThread().ident is None)
+        self.assertIsNotNone(threading.currentThread().ident)
         def f():
             ident.append(threading.currentThread().ident)
             done.set()
@@ -127,7 +126,7 @@
         ident = []
         _thread.start_new_thread(f, ())
         done.wait()
-        self.assertFalse(ident[0] is None)
+        self.assertIsNotNone(ident[0])
         # Kill the "immortal" _DummyThread
         del threading._active[ident[0]]
 
@@ -244,7 +243,7 @@
         self.assertTrue(ret)
         if verbose:
             print("    verifying worker hasn't exited")
-        self.assertTrue(not t.finished)
+        self.assertFalse(t.finished)
         if verbose:
             print("    attempting to raise asynch exception in worker")
         result = set_async_exc(ctypes.c_long(t.id), exception)
@@ -415,9 +414,9 @@
 
     def test_repr_daemon(self):
         t = threading.Thread()
-        self.assertFalse('daemon' in repr(t))
+        self.assertNotIn('daemon', repr(t))
         t.daemon = True
-        self.assertTrue('daemon' in repr(t))
+        self.assertIn('daemon', repr(t))
 
     def test_deamon_param(self):
         t = threading.Thread()
@@ -569,7 +568,7 @@
         tstate_lock.release()
         self.assertFalse(t.is_alive())
         # And verify the thread disposed of _tstate_lock.
-        self.assertTrue(t._tstate_lock is None)
+        self.assertIsNone(t._tstate_lock)
 
     def test_repr_stopped(self):
         # Verify that "stopped" shows up in repr(Thread) appropriately.
diff --git a/Lib/test/test_threading_local.py b/Lib/test/test_threading_local.py
--- a/Lib/test/test_threading_local.py
+++ b/Lib/test/test_threading_local.py
@@ -189,7 +189,7 @@
         wr = weakref.ref(x)
         del x
         gc.collect()
-        self.assertIs(wr(), None)
+        self.assertIsNone(wr())
 
 
 class ThreadLocalTest(unittest.TestCase, BaseLocalTest):

-- 
Repository URL: https://hg.python.org/cpython


More information about the Python-checkins mailing list