[Python-checkins] cpython (merge 3.3 -> default): Issue #16843: Make concurrent tests for sched module deterministic.

serhiy.storchaka python-checkins at python.org
Tue Jan 8 23:16:10 CET 2013


http://hg.python.org/cpython/rev/f65eae38f71e
changeset:   81335:f65eae38f71e
parent:      81333:5e84d020d001
parent:      81334:ab36d3bb5996
user:        Serhiy Storchaka <storchaka at gmail.com>
date:        Wed Jan 09 00:15:14 2013 +0200
summary:
  Issue #16843: Make concurrent tests for sched module deterministic.

files:
  Lib/test/test_sched.py |  106 +++++++++++++++++++++++-----
  1 files changed, 85 insertions(+), 21 deletions(-)


diff --git a/Lib/test/test_sched.py b/Lib/test/test_sched.py
--- a/Lib/test/test_sched.py
+++ b/Lib/test/test_sched.py
@@ -1,5 +1,6 @@
 #!/usr/bin/env python
 
+import queue
 import sched
 import time
 import unittest
@@ -9,6 +10,37 @@
 except ImportError:
     threading = None
 
+TIMEOUT = 10
+
+
+class Timer:
+    def __init__(self):
+        self._cond = threading.Condition()
+        self._time = 0
+        self._stop = 0
+
+    def time(self):
+        with self._cond:
+            return self._time
+
+    # increase the time but not beyond the established limit
+    def sleep(self, t):
+        assert t >= 0
+        with self._cond:
+            t += self._time
+            while self._stop < t:
+                self._time = self._stop
+                self._cond.wait()
+            self._time = t
+
+    # advance time limit for user code
+    def advance(self, t):
+        assert t >= 0
+        with self._cond:
+            self._stop += t
+            self._cond.notify_all()
+
+
 class TestCase(unittest.TestCase):
 
     def test_enter(self):
@@ -31,17 +63,34 @@
 
     @unittest.skipUnless(threading, 'Threading required for this test.')
     def test_enter_concurrent(self):
-        l = []
-        fun = lambda x: l.append(x)
-        scheduler = sched.scheduler(time.time, time.sleep)
-        scheduler.enter(0.03, 1, fun, (0.03,))
+        q = queue.Queue()
+        fun = q.put
+        timer = Timer()
+        scheduler = sched.scheduler(timer.time, timer.sleep)
+        scheduler.enter(1, 1, fun, (1,))
+        scheduler.enter(3, 1, fun, (3,))
         t = threading.Thread(target=scheduler.run)
         t.start()
-        for x in [0.05, 0.04, 0.02, 0.01]:
-            z = scheduler.enter(x, 1, fun, (x,))
-        scheduler.run()
-        t.join()
-        self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05])
+        timer.advance(1)
+        self.assertEqual(q.get(timeout=TIMEOUT), 1)
+        self.assertTrue(q.empty())
+        for x in [4, 5, 2]:
+            z = scheduler.enter(x - 1, 1, fun, (x,))
+        timer.advance(2)
+        self.assertEqual(q.get(timeout=TIMEOUT), 2)
+        self.assertEqual(q.get(timeout=TIMEOUT), 3)
+        self.assertTrue(q.empty())
+        timer.advance(1)
+        self.assertEqual(q.get(timeout=TIMEOUT), 4)
+        self.assertTrue(q.empty())
+        timer.advance(1)
+        self.assertEqual(q.get(timeout=TIMEOUT), 5)
+        self.assertTrue(q.empty())
+        timer.advance(1000)
+        t.join(timeout=TIMEOUT)
+        self.assertFalse(t.is_alive())
+        self.assertTrue(q.empty())
+        self.assertEqual(timer.time(), 5)
 
     def test_priority(self):
         l = []
@@ -69,21 +118,36 @@
 
     @unittest.skipUnless(threading, 'Threading required for this test.')
     def test_cancel_concurrent(self):
-        l = []
-        fun = lambda x: l.append(x)
-        scheduler = sched.scheduler(time.time, time.sleep)
-        now = time.time()
-        event1 = scheduler.enterabs(now + 0.01, 1, fun, (0.01,))
-        event2 = scheduler.enterabs(now + 0.02, 1, fun, (0.02,))
-        event3 = scheduler.enterabs(now + 0.03, 1, fun, (0.03,))
-        event4 = scheduler.enterabs(now + 0.04, 1, fun, (0.04,))
-        event5 = scheduler.enterabs(now + 0.05, 1, fun, (0.05,))
+        q = queue.Queue()
+        fun = q.put
+        timer = Timer()
+        scheduler = sched.scheduler(timer.time, timer.sleep)
+        now = timer.time()
+        event1 = scheduler.enterabs(now + 1, 1, fun, (1,))
+        event2 = scheduler.enterabs(now + 2, 1, fun, (2,))
+        event4 = scheduler.enterabs(now + 4, 1, fun, (4,))
+        event5 = scheduler.enterabs(now + 5, 1, fun, (5,))
+        event3 = scheduler.enterabs(now + 3, 1, fun, (3,))
         t = threading.Thread(target=scheduler.run)
         t.start()
-        scheduler.cancel(event1)
+        timer.advance(1)
+        self.assertEqual(q.get(timeout=TIMEOUT), 1)
+        self.assertTrue(q.empty())
+        scheduler.cancel(event2)
         scheduler.cancel(event5)
-        t.join()
-        self.assertEqual(l, [0.02, 0.03, 0.04])
+        timer.advance(1)
+        self.assertTrue(q.empty())
+        timer.advance(1)
+        self.assertEqual(q.get(timeout=TIMEOUT), 3)
+        self.assertTrue(q.empty())
+        timer.advance(1)
+        self.assertEqual(q.get(timeout=TIMEOUT), 4)
+        self.assertTrue(q.empty())
+        timer.advance(1000)
+        t.join(timeout=TIMEOUT)
+        self.assertFalse(t.is_alive())
+        self.assertTrue(q.empty())
+        self.assertEqual(timer.time(), 4)
 
     def test_empty(self):
         l = []

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list