[Python-checkins] cpython: Issue #8407: Make signal.sigwait() tests more reliable

victor.stinner python-checkins at python.org
Fri Jun 10 12:49:50 CEST 2011


http://hg.python.org/cpython/rev/a17710e27ea2
changeset:   70753:a17710e27ea2
parent:      70751:9ebee3211be9
user:        Victor Stinner <victor.stinner at haypocalc.com>
date:        Fri Jun 10 12:48:13 2011 +0200
summary:
  Issue #8407: Make signal.sigwait() tests more reliable

Block the signal before calling sigwait(). Use os.fork() to ensure that we have
only one thread.

Initial patch written by Charles-François Natali.

files:
  Lib/test/test_signal.py |  67 ++++++++++++++++++++++------
  1 files changed, 53 insertions(+), 14 deletions(-)


diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py
--- a/Lib/test/test_signal.py
+++ b/Lib/test/test_signal.py
@@ -598,34 +598,73 @@
         with self.assertRaises(ZeroDivisionError):
             signal.pthread_kill(current, signum)
 
+    def check_sigwait(self, test, signum):
+        # sigwait must be called with the signal blocked: since the current
+        # process might have several threads running, we fork() a child process
+        # to have a single thread.
+        pid = os.fork()
+        if pid == 0:
+            # child: block and wait the signal
+            try:
+                signal.signal(signum, self.handler)
+                signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
+
+                # Do the tests
+                test(signum)
+
+                # The handler must not be called on unblock
+                try:
+                    signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
+                except ZeroDivisionError:
+                    print("the signal handler has been called",
+                          file=sys.stderr)
+                    os._exit(1)
+
+                os._exit(0)
+            finally:
+                os._exit(1)
+        else:
+            # parent: let the child some time to wait, send him the signal, and
+            # check it correcty received it
+            self.assertEqual(os.waitpid(pid, 0), (pid, 0))
+
     @unittest.skipUnless(hasattr(signal, 'sigwait'),
                          'need signal.sigwait()')
+    @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork()')
     def test_sigwait(self):
-        old_handler = signal.signal(signal.SIGALRM, self.handler)
-        self.addCleanup(signal.signal, signal.SIGALRM, old_handler)
+        def test(signum):
+            signal.alarm(1)
+            received = signal.sigwait([signum])
+            if received != signum:
+                print("sigwait() received %s, not %s"
+                      % (received, signum),
+                      file=sys.stderr)
+                os._exit(1)
 
-        signal.alarm(1)
-        self.assertEqual(signal.sigwait([signal.SIGALRM]), signal.SIGALRM)
+        self.check_sigwait(test, signal.SIGALRM)
 
     @unittest.skipUnless(hasattr(signal, 'sigwait'),
                          'need signal.sigwait()')
     @unittest.skipIf(threading is None, "test needs threading module")
+    @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork()')
     def test_sigwait_thread(self):
-        signum = signal.SIGUSR1
-        old_handler = signal.signal(signum, self.handler)
-        self.addCleanup(signal.signal, signum, old_handler)
-
-        def kill_later():
+        def kill_later(signum):
+            # wait until the main thread is waiting in sigwait()
             time.sleep(1)
             os.kill(os.getpid(), signum)
 
-        killer = threading.Thread(target=kill_later)
-        killer.start()
-        try:
-            self.assertEqual(signal.sigwait([signum]), signum)
-        finally:
+        def test(signum):
+            killer = threading.Thread(target=kill_later, args=(signum,))
+            killer.start()
+            received = signal.sigwait([signum])
+            if received != signum:
+                print("sigwait() received %s, not %s" % (received, signum),
+                      file=sys.stderr)
+                os._exit(1)
             killer.join()
 
+        self.check_sigwait(test, signal.SIGUSR1)
+
     @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
                          'need signal.pthread_sigmask()')
     def test_pthread_sigmask_arguments(self):

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list