[Python-checkins] python/dist/src/Lib/test test_threading.py, 1.4, 1.5

tim_one at users.sourceforge.net tim_one at users.sourceforge.net
Sat Jan 8 08:30:45 CET 2005


Update of /cvsroot/python/python/dist/src/Lib/test
In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv21399/Lib/test

Modified Files:
	test_threading.py 
Log Message:
threading._DummyThread.__init__():  document obscure new code.

test_threading.test_foreign_thread():  new test does a basic check that
"foreign" threads can using the threading module, and that they create
a _DummyThread instance in at least one use case.  This isn't a very
good test, since a thread created by thread.start_new_thread() isn't
particularly "foreign".


Index: test_threading.py
===================================================================
RCS file: /cvsroot/python/python/dist/src/Lib/test/test_threading.py,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -d -r1.4 -r1.5
--- test_threading.py	8 Jan 2005 06:03:17 -0000	1.4
+++ test_threading.py	8 Jan 2005 07:30:42 -0000	1.5
@@ -4,6 +4,7 @@
 from test.test_support import verbose
 import random
 import threading
+import thread
 import time
 import unittest
 
@@ -78,11 +79,31 @@
         if verbose:
             print 'waiting for all tasks to complete'
         for t in threads:
-            t.join()
+            t.join(NUMTASKS)
+            self.assert_(not t.isAlive())
         if verbose:
             print 'all tasks done'
         self.assertEqual(numrunning.get(), 0)
 
+    def test_foreign_thread(self):
+        # Check that a "foreign" thread can use the threading module.
+        def f(mutex):
+            # Acquiring an RLock forces an entry for the foreign
+            # thread to get made in the threading._active map.
+            r = threading.RLock()
+            r.acquire()
+            r.release()
+            mutex.release()
+
+        mutex = threading.Lock()
+        mutex.acquire()
+        tid = thread.start_new_thread(f, (mutex,))
+        # Wait for the thread to finish.
+        mutex.acquire()
+        self.assert_(tid in threading._active)
+        self.assert_(isinstance(threading._active[tid],
+                                threading._DummyThread))
+        del threading._active[tid]
 
 def test_main():
     test.test_support.run_unittest(ThreadTests)



More information about the Python-checkins mailing list