[Python-checkins] r60528 - in python/trunk: Lib/test/test_queue.py Misc/ACKS

georg.brandl python-checkins at python.org
Sat Feb 2 12:39:30 CET 2008


Author: georg.brandl
Date: Sat Feb  2 12:39:29 2008
New Revision: 60528

Modified:
   python/trunk/Lib/test/test_queue.py
   python/trunk/Misc/ACKS
Log:
Rewrite test_queue as unittest. Written for GHOP by Ian Seyer.


Modified: python/trunk/Lib/test/test_queue.py
==============================================================================
--- python/trunk/Lib/test/test_queue.py	(original)
+++ python/trunk/Lib/test/test_queue.py	Sat Feb  2 12:39:29 2008
@@ -4,8 +4,8 @@
 import sys
 import threading
 import time
-
-from test.test_support import verify, TestFailed, verbose
+import unittest
+from test import test_support
 
 QUEUE_SIZE = 5
 
@@ -30,50 +30,179 @@
         self.startedEvent.set()
         self.fn(*self.args)
 
+
 # Execute a function that blocks, and in a separate thread, a function that
-# triggers the release.  Returns the result of the blocking function.
-# Caution:  block_func must guarantee to block until trigger_func is
-# called, and trigger_func must guarantee to change queue state so that
-# block_func can make enough progress to return.  In particular, a
-# block_func that just raises an exception regardless of whether trigger_func
-# is called will lead to timing-dependent sporadic failures, and one of
-# those went rarely seen but undiagnosed for years.  Now block_func
-# must be unexceptional.  If block_func is supposed to raise an exception,
-# call _doExceptionalBlockingTest() instead.
-def _doBlockingTest(block_func, block_args, trigger_func, trigger_args):
-    t = _TriggerThread(trigger_func, trigger_args)
-    t.start()
-    result = block_func(*block_args)
-    # If block_func returned before our thread made the call, we failed!
-    if not t.startedEvent.isSet():
-        raise TestFailed("blocking function '%r' appeared not to block" %
-                         block_func)
-    t.join(10) # make sure the thread terminates
-    if t.isAlive():
-        raise TestFailed("trigger function '%r' appeared to not return" %
-                         trigger_func)
-    return result
-
-# Call this instead if block_func is supposed to raise an exception.
-def _doExceptionalBlockingTest(block_func, block_args, trigger_func,
-                               trigger_args, expected_exception_class):
-    t = _TriggerThread(trigger_func, trigger_args)
-    t.start()
-    try:
-        try:
-            block_func(*block_args)
-        except expected_exception_class:
-            raise
+# triggers the release.  Returns the result of the blocking function.  Caution:
+# block_func must guarantee to block until trigger_func is called, and
+# trigger_func must guarantee to change queue state so that block_func can make
+# enough progress to return.  In particular, a block_func that just raises an
+# exception regardless of whether trigger_func is called will lead to
+# timing-dependent sporadic failures, and one of those went rarely seen but
+# undiagnosed for years.  Now block_func must be unexceptional.  If block_func
+# is supposed to raise an exception, call do_exceptional_blocking_test()
+# instead.
+
+class BlockingTestMixin:
+
+    def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args):
+        self.t = _TriggerThread(trigger_func, trigger_args)
+        self.t.start()
+        self.result = block_func(*block_args)
+        # If block_func returned before our thread made the call, we failed!
+        if not self.t.startedEvent.isSet():
+            self.fail("blocking function '%r' appeared not to block" %
+                      block_func)
+        self.t.join(10) # make sure the thread terminates
+        if self.t.isAlive():
+            self.fail("trigger function '%r' appeared to not return" %
+                      trigger_func)
+        return self.result
+
+    # Call this instead if block_func is supposed to raise an exception.
+    def do_exceptional_blocking_test(self,block_func, block_args, trigger_func,
+                                   trigger_args, expected_exception_class):
+        self.t = _TriggerThread(trigger_func, trigger_args)
+        self.t.start()
+        try:
+            try:
+                block_func(*block_args)
+            except expected_exception_class:
+                raise
+            else:
+                self.fail("expected exception of kind %r" %
+                                 expected_exception_class)
+        finally:
+            self.t.join(10) # make sure the thread terminates
+            if self.t.isAlive():
+                self.fail("trigger function '%r' appeared to not return" %
+                                 trigger_func)
+            if not self.t.startedEvent.isSet():
+                self.fail("trigger thread ended but event never set")
+
+
+class BaseQueueTest(unittest.TestCase, BlockingTestMixin):
+    def setUp(self):
+        self.cum = 0
+        self.cumlock = threading.Lock()
+
+    def simple_queue_test(self, q):
+        if not q.empty():
+            raise RuntimeError, "Call this function with an empty queue"
+        # I guess we better check things actually queue correctly a little :)
+        q.put(111)
+        q.put(333)
+        q.put(222)
+        target_order = dict(Queue = [111, 333, 222],
+                            LifoQueue = [222, 333, 111],
+                            PriorityQueue = [111, 222, 333])
+        actual_order = [q.get(), q.get(), q.get()]
+        self.assertEquals(actual_order, target_order[q.__class__.__name__],
+                          "Didn't seem to queue the correct data!")
+        for i in range(QUEUE_SIZE-1):
+            q.put(i)
+            self.assert_(not q.empty(), "Queue should not be empty")
+        self.assert_(not q.full(), "Queue should not be full")
+        q.put("last")
+        self.assert_(q.full(), "Queue should be full")
+        try:
+            q.put("full", block=0)
+            self.fail("Didn't appear to block with a full queue")
+        except Queue.Full:
+            pass
+        try:
+            q.put("full", timeout=0.01)
+            self.fail("Didn't appear to time-out with a full queue")
+        except Queue.Full:
+            pass
+        # Test a blocking put
+        self.do_blocking_test(q.put, ("full",), q.get, ())
+        self.do_blocking_test(q.put, ("full", True, 10), q.get, ())
+        # Empty it
+        for i in range(QUEUE_SIZE):
+            q.get()
+        self.assert_(q.empty(), "Queue should be empty")
+        try:
+            q.get(block=0)
+            self.fail("Didn't appear to block with an empty queue")
+        except Queue.Empty:
+            pass
+        try:
+            q.get(timeout=0.01)
+            self.fail("Didn't appear to time-out with an empty queue")
+        except Queue.Empty:
+            pass
+        # Test a blocking get
+        self.do_blocking_test(q.get, (), q.put, ('empty',))
+        self.do_blocking_test(q.get, (True, 10), q.put, ('empty',))
+
+
+    def worker(self, q):
+        while True:
+            self.x = q.get()
+            if self.x is None:
+                q.task_done()
+                return
+            self.cumlock.acquire()
+            try:
+                self.cum += self.x
+            finally:
+                self.cumlock.release()
+            q.task_done()
+
+    def queue_join_test(self, q):
+        self.cum = 0
+        for i in (0,1):
+            threading.Thread(target=self.worker, args=(q,)).start()
+        for i in xrange(100):
+            q.put(i)
+        q.join()
+        self.assertEquals(self.cum, sum(range(100)),
+                        "q.join() did not block until all tasks were done")
+        for i in (0,1):
+            q.put(None)         # instruct the threads to close
+        q.join()                # verify that you can join twice
+
+    def test_queue_task_done(self):
+        # Test to make sure a queue task completed successfully.
+        q = self.type2test()
+        try:
+            q.task_done()
+        except ValueError:
+            pass
         else:
-            raise TestFailed("expected exception of kind %r" %
-                             expected_exception_class)
-    finally:
-        t.join(10) # make sure the thread terminates
-        if t.isAlive():
-            raise TestFailed("trigger function '%r' appeared to not return" %
-                             trigger_func)
-        if not t.startedEvent.isSet():
-            raise TestFailed("trigger thread ended but event never set")
+            self.fail("Did not detect task count going negative")
+
+    def test_queue_join(self):
+        # Test that a queue join()s successfully, and before anything else
+        # (done twice for insurance).
+        q = self.type2test()
+        self.queue_join_test(q)
+        self.queue_join_test(q)
+        try:
+            q.task_done()
+        except ValueError:
+            pass
+        else:
+            self.fail("Did not detect task count going negative")
+
+    def test_simple_queue(self):
+        # Do it a couple of times on the same queue.
+        # Done twice to make sure works with same instance reused.
+        q = self.type2test(QUEUE_SIZE)
+        self.simple_queue_test(q)
+        self.simple_queue_test(q)
+
+
+class QueueTest(BaseQueueTest):
+    type2test = Queue.Queue
+
+class LifoQueueTest(BaseQueueTest):
+    type2test = Queue.LifoQueue
+
+class PriorityQueueTest(BaseQueueTest):
+    type2test = Queue.PriorityQueue
+
+
 
 # A Queue subclass that can provoke failure at a moment's notice :)
 class FailingQueueException(Exception):
@@ -95,194 +224,101 @@
             raise FailingQueueException, "You Lose"
         return Queue.Queue._get(self)
 
-def FailingQueueTest(q):
-    if not q.empty():
-        raise RuntimeError, "Call this function with an empty queue"
-    for i in range(QUEUE_SIZE-1):
-        q.put(i)
-    # Test a failing non-blocking put.
-    q.fail_next_put = True
-    try:
-        q.put("oops", block=0)
-        raise TestFailed("The queue didn't fail when it should have")
-    except FailingQueueException:
-        pass
-    q.fail_next_put = True
-    try:
-        q.put("oops", timeout=0.1)
-        raise TestFailed("The queue didn't fail when it should have")
-    except FailingQueueException:
-        pass
-    q.put("last")
-    verify(q.full(), "Queue should be full")
-    # Test a failing blocking put
-    q.fail_next_put = True
-    try:
-        _doBlockingTest(q.put, ("full",), q.get, ())
-        raise TestFailed("The queue didn't fail when it should have")
-    except FailingQueueException:
-        pass
-    # Check the Queue isn't damaged.
-    # put failed, but get succeeded - re-add
-    q.put("last")
-    # Test a failing timeout put
-    q.fail_next_put = True
-    try:
-        _doExceptionalBlockingTest(q.put, ("full", True, 10), q.get, (),
-                                   FailingQueueException)
-        raise TestFailed("The queue didn't fail when it should have")
-    except FailingQueueException:
-        pass
-    # Check the Queue isn't damaged.
-    # put failed, but get succeeded - re-add
-    q.put("last")
-    verify(q.full(), "Queue should be full")
-    q.get()
-    verify(not q.full(), "Queue should not be full")
-    q.put("last")
-    verify(q.full(), "Queue should be full")
-    # Test a blocking put
-    _doBlockingTest( q.put, ("full",), q.get, ())
-    # Empty it
-    for i in range(QUEUE_SIZE):
-        q.get()
-    verify(q.empty(), "Queue should be empty")
-    q.put("first")
-    q.fail_next_get = True
-    try:
+class FailingQueueTest(unittest.TestCase, BlockingTestMixin):
+
+    def failing_queue_test(self, q):
+        if not q.empty():
+            raise RuntimeError, "Call this function with an empty queue"
+        for i in range(QUEUE_SIZE-1):
+            q.put(i)
+        # Test a failing non-blocking put.
+        q.fail_next_put = True
+        try:
+            q.put("oops", block=0)
+            self.fail("The queue didn't fail when it should have")
+        except FailingQueueException:
+            pass
+        q.fail_next_put = True
+        try:
+            q.put("oops", timeout=0.1)
+            self.fail("The queue didn't fail when it should have")
+        except FailingQueueException:
+            pass
+        q.put("last")
+        self.assert_(q.full(), "Queue should be full")
+        # Test a failing blocking put
+        q.fail_next_put = True
+        try:
+            self.do_blocking_test(q.put, ("full",), q.get, ())
+            self.fail("The queue didn't fail when it should have")
+        except FailingQueueException:
+            pass
+        # Check the Queue isn't damaged.
+        # put failed, but get succeeded - re-add
+        q.put("last")
+        # Test a failing timeout put
+        q.fail_next_put = True
+        try:
+            self.do_exceptional_blocking_test(q.put, ("full", True, 10), q.get, (),
+                                              FailingQueueException)
+            self.fail("The queue didn't fail when it should have")
+        except FailingQueueException:
+            pass
+        # Check the Queue isn't damaged.
+        # put failed, but get succeeded - re-add
+        q.put("last")
+        self.assert_(q.full(), "Queue should be full")
         q.get()
-        raise TestFailed("The queue didn't fail when it should have")
-    except FailingQueueException:
-        pass
-    verify(not q.empty(), "Queue should not be empty")
-    q.fail_next_get = True
-    try:
-        q.get(timeout=0.1)
-        raise TestFailed("The queue didn't fail when it should have")
-    except FailingQueueException:
-        pass
-    verify(not q.empty(), "Queue should not be empty")
-    q.get()
-    verify(q.empty(), "Queue should be empty")
-    q.fail_next_get = True
-    try:
-        _doExceptionalBlockingTest(q.get, (), q.put, ('empty',),
-                                   FailingQueueException)
-        raise TestFailed("The queue didn't fail when it should have")
-    except FailingQueueException:
-        pass
-    # put succeeded, but get failed.
-    verify(not q.empty(), "Queue should not be empty")
-    q.get()
-    verify(q.empty(), "Queue should be empty")
-
-def SimpleQueueTest(q):
-    if not q.empty():
-        raise RuntimeError, "Call this function with an empty queue"
-    # I guess we better check things actually queue correctly a little :)
-    q.put(111)
-    q.put(333)
-    q.put(222)
-    target_order = dict(Queue = [111, 333, 222],
-                        LifoQueue = [222, 333, 111],
-                        PriorityQueue = [111, 222, 333])
-    actual_order = [q.get(), q.get(), q.get()]
-    verify(actual_order == target_order[q.__class__.__name__],
-           "Didn't seem to queue the correct data!")
-    for i in range(QUEUE_SIZE-1):
-        q.put(i)
-        verify(not q.empty(), "Queue should not be empty")
-    verify(not q.full(), "Queue should not be full")
-    q.put("last")
-    verify(q.full(), "Queue should be full")
-    try:
-        q.put("full", block=0)
-        raise TestFailed("Didn't appear to block with a full queue")
-    except Queue.Full:
-        pass
-    try:
-        q.put("full", timeout=0.01)
-        raise TestFailed("Didn't appear to time-out with a full queue")
-    except Queue.Full:
-        pass
-    # Test a blocking put
-    _doBlockingTest(q.put, ("full",), q.get, ())
-    _doBlockingTest(q.put, ("full", True, 10), q.get, ())
-    # Empty it
-    for i in range(QUEUE_SIZE):
+        self.assert_(not q.full(), "Queue should not be full")
+        q.put("last")
+        self.assert_(q.full(), "Queue should be full")
+        # Test a blocking put
+        self.do_blocking_test(q.put, ("full",), q.get, ())
+        # Empty it
+        for i in range(QUEUE_SIZE):
+            q.get()
+        self.assert_(q.empty(), "Queue should be empty")
+        q.put("first")
+        q.fail_next_get = True
+        try:
+            q.get()
+            self.fail("The queue didn't fail when it should have")
+        except FailingQueueException:
+            pass
+        self.assert_(not q.empty(), "Queue should not be empty")
+        q.fail_next_get = True
+        try:
+            q.get(timeout=0.1)
+            self.fail("The queue didn't fail when it should have")
+        except FailingQueueException:
+            pass
+        self.assert_(not q.empty(), "Queue should not be empty")
         q.get()
-    verify(q.empty(), "Queue should be empty")
-    try:
-        q.get(block=0)
-        raise TestFailed("Didn't appear to block with an empty queue")
-    except Queue.Empty:
-        pass
-    try:
-        q.get(timeout=0.01)
-        raise TestFailed("Didn't appear to time-out with an empty queue")
-    except Queue.Empty:
-        pass
-    # Test a blocking get
-    _doBlockingTest(q.get, (), q.put, ('empty',))
-    _doBlockingTest(q.get, (True, 10), q.put, ('empty',))
-
-cum = 0
-cumlock = threading.Lock()
-
-def worker(q):
-    global cum
-    while True:
-        x = q.get()
-        if x is None:
-            q.task_done()
-            return
-        cumlock.acquire()
+        self.assert_(q.empty(), "Queue should be empty")
+        q.fail_next_get = True
         try:
-            cum += x
-        finally:
-            cumlock.release()
-        q.task_done()
+            self.do_exceptional_blocking_test(q.get, (), q.put, ('empty',),
+                                              FailingQueueException)
+            self.fail("The queue didn't fail when it should have")
+        except FailingQueueException:
+            pass
+        # put succeeded, but get failed.
+        self.assert_(not q.empty(), "Queue should not be empty")
+        q.get()
+        self.assert_(q.empty(), "Queue should be empty")
+
+    def test_failing_queue(self):
+        # Test to make sure a queue is functioning correctly.
+        # Done twice to the same instance.
+        q = FailingQueue(QUEUE_SIZE)
+        self.failing_queue_test(q)
+        self.failing_queue_test(q)
+
+
+def test_main():
+    test_support.run_unittest(QueueTest, LifoQueueTest, PriorityQueueTest,
+                              FailingQueueTest)
 
-def QueueJoinTest(q):
-    global cum
-    cum = 0
-    for i in (0,1):
-        threading.Thread(target=worker, args=(q,)).start()
-    for i in xrange(100):
-        q.put(i)
-    q.join()
-    verify(cum==sum(range(100)), "q.join() did not block until all tasks were done")
-    for i in (0,1):
-        q.put(None)         # instruct the threads to close
-    q.join()                # verify that you can join twice
-
-def QueueTaskDoneTest(q):
-    try:
-        q.task_done()
-    except ValueError:
-        pass
-    else:
-        raise TestFailed("Did not detect task count going negative")
-
-def test():
-    for Q in Queue.Queue, Queue.LifoQueue, Queue.PriorityQueue:
-        q = Q()
-        QueueTaskDoneTest(q)
-        QueueJoinTest(q)
-        QueueJoinTest(q)
-        QueueTaskDoneTest(q)
-
-        q = Q(QUEUE_SIZE)
-        # Do it a couple of times on the same queue
-        SimpleQueueTest(q)
-        SimpleQueueTest(q)
-        if verbose:
-            print "Simple Queue tests seemed to work for", Q.__name__
-
-    q = FailingQueue(QUEUE_SIZE)
-    FailingQueueTest(q)
-    FailingQueueTest(q)
-    if verbose:
-        print "Failing Queue tests seemed to work"
 
-test()
+if __name__ == "__main__":
+    test_main()

Modified: python/trunk/Misc/ACKS
==============================================================================
--- python/trunk/Misc/ACKS	(original)
+++ python/trunk/Misc/ACKS	Sat Feb  2 12:39:29 2008
@@ -599,6 +599,7 @@
 Jiwon Seo
 Jerry Seutter
 Denis Severson
+Ian Seyer
 Ha Shao
 Bruce Sherwood
 Pete Shinners


More information about the Python-checkins mailing list