[Python-checkins] cpython (merge 3.2 -> default): (merge 3.2) Issue #12469: Run wakeup and pending signal tests in a subprocess

victor.stinner python-checkins at python.org
Mon Jul 4 17:49:50 CEST 2011


http://hg.python.org/cpython/rev/b9de5e55f798
changeset:   71196:b9de5e55f798
parent:      71194:4f14050a963f
parent:      71195:e07b331bf489
user:        Victor Stinner <victor.stinner at haypocalc.com>
date:        Mon Jul 04 17:49:40 2011 +0200
summary:
  (merge 3.2) Issue #12469: Run wakeup and pending signal tests in a subprocess
to run the test in a fresh process with only one thread and to not change
signal handling of the parent process.

files:
  Lib/test/test_signal.py |  496 ++++++++++++++-------------
  Misc/NEWS               |    4 +
  2 files changed, 261 insertions(+), 239 deletions(-)


diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py
--- a/Lib/test/test_signal.py
+++ b/Lib/test/test_signal.py
@@ -224,117 +224,115 @@
 
 @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
 class WakeupSignalTests(unittest.TestCase):
-    TIMEOUT_FULL = 10
-    TIMEOUT_HALF = 5
+    def check_wakeup(self, test_body, *signals):
+        # use a subprocess to have only one thread
+        code = """if 1:
+        import fcntl
+        import os
+        import signal
+        import struct
 
-    def handler(self, signum, frame):
-        pass
+        signals = {!r}
 
-    def check_signum(self, *signals):
-        data = os.read(self.read, len(signals)+1)
-        raised = struct.unpack('%uB' % len(data), data)
-        # We don't care of the signal delivery order (it's not portable or
-        # reliable)
-        raised = set(raised)
-        signals = set(signals)
-        self.assertEqual(raised, signals)
+        def handler(signum, frame):
+            pass
+
+        def check_signum(signals):
+            data = os.read(read, len(signals)+1)
+            raised = struct.unpack('%uB' % len(data), data)
+            # We don't care of the signal delivery order (it's not portable or
+            # reliable)
+            raised = set(raised)
+            signals = set(signals)
+            assert raised == signals, "%r != %r" % (raised, signals)
+
+        {}
+
+        signal.signal(signal.SIGALRM, handler)
+        read, write = os.pipe()
+        for fd in (read, write):
+            flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
+            flags = flags | os.O_NONBLOCK
+            fcntl.fcntl(fd, fcntl.F_SETFL, flags)
+        signal.set_wakeup_fd(write)
+
+        test()
+        check_signum(signals)
+
+        os.close(read)
+        os.close(write)
+        """.format(signals, test_body)
+
+        assert_python_ok('-c', code)
 
     def test_wakeup_fd_early(self):
-        import select
+        self.check_wakeup("""def test():
+            import select
+            import time
 
-        signal.alarm(1)
-        before_time = time.time()
-        # We attempt to get a signal during the sleep,
-        # before select is called
-        time.sleep(self.TIMEOUT_FULL)
-        mid_time = time.time()
-        self.assertTrue(mid_time - before_time < self.TIMEOUT_HALF)
-        select.select([self.read], [], [], self.TIMEOUT_FULL)
-        after_time = time.time()
-        self.assertTrue(after_time - mid_time < self.TIMEOUT_HALF)
-        self.check_signum(signal.SIGALRM)
+            TIMEOUT_FULL = 10
+            TIMEOUT_HALF = 5
+
+            signal.alarm(1)
+            before_time = time.time()
+            # We attempt to get a signal during the sleep,
+            # before select is called
+            time.sleep(TIMEOUT_FULL)
+            mid_time = time.time()
+            dt = mid_time - before_time
+            assert dt < TIMEOUT_HALF, dt
+            select.select([read], [], [], TIMEOUT_FULL)
+            after_time = time.time()
+            dt = after_time - mid_time
+            assert dt < TIMEOUT_HALF, dt
+        """, signal.SIGALRM)
 
     def test_wakeup_fd_during(self):
-        import select
+        self.check_wakeup("""def test():
+            import select
+            import time
 
-        signal.alarm(1)
-        before_time = time.time()
-        # We attempt to get a signal during the select call
-        self.assertRaises(select.error, select.select,
-            [self.read], [], [], self.TIMEOUT_FULL)
-        after_time = time.time()
-        self.assertTrue(after_time - before_time < self.TIMEOUT_HALF)
-        self.check_signum(signal.SIGALRM)
+            TIMEOUT_FULL = 10
+            TIMEOUT_HALF = 5
+
+            signal.alarm(1)
+            before_time = time.time()
+            # We attempt to get a signal during the select call
+            try:
+                select.select([read], [], [], TIMEOUT_FULL)
+            except select.error:
+                pass
+            else:
+                raise Exception("select.error not raised")
+            after_time = time.time()
+            dt = after_time - before_time
+            assert dt < TIMEOUT_HALF, dt
+        """, signal.SIGALRM)
 
     def test_signum(self):
-        old_handler = signal.signal(signal.SIGUSR1, self.handler)
-        self.addCleanup(signal.signal, signal.SIGUSR1, old_handler)
-        os.kill(os.getpid(), signal.SIGUSR1)
-        os.kill(os.getpid(), signal.SIGALRM)
-        self.check_signum(signal.SIGUSR1, signal.SIGALRM)
+        self.check_wakeup("""def test():
+            signal.signal(signal.SIGUSR1, handler)
+            os.kill(os.getpid(), signal.SIGUSR1)
+            os.kill(os.getpid(), signal.SIGALRM)
+        """, signal.SIGUSR1, signal.SIGALRM)
 
     @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
                          'need signal.pthread_sigmask()')
-    @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
-                         'need signal.pthread_kill()')
     def test_pending(self):
-        signum1 = signal.SIGUSR1
-        signum2 = signal.SIGUSR2
-        tid = threading.current_thread().ident
+        self.check_wakeup("""def test():
+            signum1 = signal.SIGUSR1
+            signum2 = signal.SIGUSR2
 
-        old_handler = signal.signal(signum1, self.handler)
-        self.addCleanup(signal.signal, signum1, old_handler)
-        old_handler = signal.signal(signum2, self.handler)
-        self.addCleanup(signal.signal, signum2, old_handler)
+            signal.signal(signum1, handler)
+            signal.signal(signum2, handler)
 
-        signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2))
-        signal.pthread_kill(tid, signum1)
-        signal.pthread_kill(tid, signum2)
-        # Unblocking the 2 signals calls the C signal handler twice
-        signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2))
+            signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2))
+            os.kill(os.getpid(), signum1)
+            os.kill(os.getpid(), signum2)
+            # Unblocking the 2 signals calls the C signal handler twice
+            signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2))
+        """,  signal.SIGUSR1, signal.SIGUSR2)
 
-        self.check_signum(signum1, signum2)
-
-    @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
-                         'need signal.pthread_kill()')
-    def test_pthread_kill_main_thread(self):
-        # Test that a signal can be sent to the main thread with pthread_kill()
-        # before any other thread has been created (see issue #12392).
-        code = """if True:
-            import threading
-            import signal
-            import sys
-
-            def handler(signum, frame):
-                sys.exit(3)
-
-            signal.signal(signal.SIGUSR1, handler)
-            signal.pthread_kill(threading.get_ident(), signal.SIGUSR1)
-            sys.exit(1)
-        """
-
-        with spawn_python('-c', code) as process:
-            stdout, stderr = process.communicate()
-            exitcode = process.wait()
-            if exitcode != 3:
-                raise Exception("Child error (exit code %s): %s" %
-                                (exitcode, stdout))
-
-    def setUp(self):
-        import fcntl
-
-        self.alrm = signal.signal(signal.SIGALRM, self.handler)
-        self.read, self.write = os.pipe()
-        flags = fcntl.fcntl(self.write, fcntl.F_GETFL, 0)
-        flags = flags | os.O_NONBLOCK
-        fcntl.fcntl(self.write, fcntl.F_SETFL, flags)
-        self.old_wakeup = signal.set_wakeup_fd(self.write)
-
-    def tearDown(self):
-        signal.set_wakeup_fd(self.old_wakeup)
-        os.close(self.read)
-        os.close(self.write)
-        signal.signal(signal.SIGALRM, self.alrm)
 
 @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
 class SiginterruptTest(unittest.TestCase):
@@ -519,60 +517,6 @@
     Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait()
     functions.
     """
-    def setUp(self):
-        self.has_pthread_kill = hasattr(signal, 'pthread_kill')
-
-    def handler(self, signum, frame):
-        1/0
-
-    def read_sigmask(self):
-        return signal.pthread_sigmask(signal.SIG_BLOCK, [])
-
-    def can_test_blocked_signals(self, skip):
-        """
-        Check if a blocked signal can be raised to the main thread without
-        calling its signal handler. We need pthread_kill() or exactly one
-        thread (the main thread).
-
-        Return True if it's possible. Otherwise, return False and print a
-        warning if skip is False, or raise a SkipTest exception if skip is
-        True.
-        """
-        if self.has_pthread_kill:
-            return True
-
-        # The fault handler timeout thread masks all signals. If the main
-        # thread masks also SIGUSR1, all threads mask this signal. In this
-        # case, if we send SIGUSR1 to the process, the signal is pending in the
-        # main or the faulthandler timeout thread.  Unblock SIGUSR1 in the main
-        # thread calls the signal handler only if the signal is pending for the
-        # main thread. Stop the faulthandler timeout thread to workaround this
-        # problem.
-        import faulthandler
-        faulthandler.cancel_dump_tracebacks_later()
-
-        # Issue #11998: The _tkinter module loads the Tcl library which
-        # creates a thread waiting events in select(). This thread receives
-        # signals blocked by all other threads. We cannot test blocked
-        # signals
-        if '_tkinter' in sys.modules:
-            message = ("_tkinter is loaded and pthread_kill() is missing, "
-                       "cannot test blocked signals (issue #11998)")
-            if skip:
-                self.skipTest(message)
-            else:
-                print("WARNING: %s" % message)
-            return False
-        return True
-
-    def kill(self, signum):
-        if self.has_pthread_kill:
-            tid = threading.get_ident()
-            signal.pthread_kill(tid, signum)
-        else:
-            pid = os.getpid()
-            os.kill(pid, signum)
-
     @unittest.skipUnless(hasattr(signal, 'sigpending'),
                          'need signal.sigpending()')
     def test_sigpending_empty(self):
@@ -583,70 +527,103 @@
     @unittest.skipUnless(hasattr(signal, 'sigpending'),
                          'need signal.sigpending()')
     def test_sigpending(self):
-        self.can_test_blocked_signals(True)
+        code = """if 1:
+            import os
+            import signal
 
-        signum = signal.SIGUSR1
-        old_handler = signal.signal(signum, self.handler)
-        self.addCleanup(signal.signal, signum, old_handler)
+            def handler(signum, frame):
+                1/0
 
-        signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
-        self.kill(signum)
-        self.assertEqual(signal.sigpending(), {signum})
-        with self.assertRaises(ZeroDivisionError):
-            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
+            signum = signal.SIGUSR1
+            signal.signal(signum, handler)
+
+            signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
+            os.kill(os.getpid(), signum)
+            pending = signal.sigpending()
+            assert pending == {signum}, '%s != {%s}' % (pending, signum)
+            try:
+                signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
+            except ZeroDivisionError:
+                pass
+            else:
+                raise Exception("ZeroDivisionError not raised")
+        """
+        assert_python_ok('-c', code)
 
     @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
                          'need signal.pthread_kill()')
     def test_pthread_kill(self):
-        signum = signal.SIGUSR1
-        current = threading.get_ident()
+        code = """if 1:
+            import signal
+            import threading
+            import sys
 
-        old_handler = signal.signal(signum, self.handler)
-        self.addCleanup(signal.signal, signum, old_handler)
+            signum = signal.SIGUSR1
 
-        with self.assertRaises(ZeroDivisionError):
-            signal.pthread_kill(current, signum)
+            def handler(signum, frame):
+                1/0
+
+            signal.signal(signum, handler)
+
+            if sys.platform == 'freebsd6':
+                # Issue #12392 and #12469: send a signal to the main thread
+                # doesn't work before the creation of the first thread on
+                # FreeBSD 6
+                def noop():
+                    pass
+                thread = threading.Thread(target=noop)
+                thread.start()
+                thread.join()
+
+            tid = threading.get_ident()
+            try:
+                signal.pthread_kill(tid, signum)
+            except ZeroDivisionError:
+                pass
+            else:
+                raise Exception("ZeroDivisionError not raised")
+        """
+        assert_python_ok('-c', code)
 
     @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
                          'need signal.pthread_sigmask()')
-    def wait_helper(self, test, blocked):
+    def wait_helper(self, blocked, test):
         """
         test: body of the "def test(signum):" function.
         blocked: number of the blocked signal
         """
-        code = '''
-import signal
-import sys
+        code = '''if 1:
+        import signal
+        import sys
 
-def handler(signum, frame):
-    1/0
+        def handler(signum, frame):
+            1/0
 
-def test(signum):
-%s
+        %s
 
-blocked = %s
-signum = signal.SIGALRM
+        blocked = %s
+        signum = signal.SIGALRM
 
-# child: block and wait the signal
-try:
-    signal.signal(signum, handler)
-    signal.pthread_sigmask(signal.SIG_BLOCK, [blocked])
+        # child: block and wait the signal
+        try:
+            signal.signal(signum, handler)
+            signal.pthread_sigmask(signal.SIG_BLOCK, [blocked])
 
-    # Do the tests
-    test(signum)
+            # Do the tests
+            test(signum)
 
-    # The handler must not be called on unblock
-    try:
-        signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked])
-    except ZeroDivisionError:
-        print("the signal handler has been called",
-              file=sys.stderr)
-        sys.exit(1)
-except BaseException as err:
-    print("error: {}".format(err), file=sys.stderr)
-    sys.stderr.flush()
-    sys.exit(1)
-''' % (test, blocked)
+            # The handler must not be called on unblock
+            try:
+                signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked])
+            except ZeroDivisionError:
+                print("the signal handler has been called",
+                      file=sys.stderr)
+                sys.exit(1)
+        except BaseException as err:
+            print("error: {}".format(err), file=sys.stderr)
+            sys.stderr.flush()
+            sys.exit(1)
+        ''' % (test.strip(), blocked)
 
         # sig*wait* must be called with the signal blocked: since the current
         # process might have several threads running, use a subprocess to have
@@ -656,61 +633,56 @@
     @unittest.skipUnless(hasattr(signal, 'sigwait'),
                          'need signal.sigwait()')
     def test_sigwait(self):
-        test = '''
+        self.wait_helper(signal.SIGALRM, '''
+        def test(signum):
             signal.alarm(1)
             received = signal.sigwait([signum])
             assert received == signum , 'received %s, not %s' % (received, signum)
-        '''
-
-        self.wait_helper(test, signal.SIGALRM)
+        ''')
 
     @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'),
                          'need signal.sigwaitinfo()')
     def test_sigwaitinfo(self):
-        test = '''
+        self.wait_helper(signal.SIGALRM, '''
+        def test(signum):
             signal.alarm(1)
             info = signal.sigwaitinfo([signum])
             assert info.si_signo == signum, "info.si_signo != %s" % signum
-        '''
-
-        self.wait_helper(test, signal.SIGALRM)
+        ''')
 
     @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
                          'need signal.sigtimedwait()')
     def test_sigtimedwait(self):
-        test = '''
+        self.wait_helper(signal.SIGALRM, '''
+        def test(signum):
             signal.alarm(1)
             info = signal.sigtimedwait([signum], (10, 1000))
             assert info.si_signo == signum, 'info.si_signo != %s' % signum
-        '''
-
-        self.wait_helper(test, signal.SIGALRM)
+        ''')
 
     @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
                          'need signal.sigtimedwait()')
     # issue #12303: sigtimedwait() takes 30 seconds on FreeBSD 6 (kernel bug)
     @unittest.skipIf(sys.platform =='freebsd6',
-        'sigtimedwait() with a null timeout doens\'t work on FreeBSD 6')
+        "sigtimedwait() with a null timeout doens't work on FreeBSD 6")
     def test_sigtimedwait_poll(self):
         # check that polling with sigtimedwait works
-        test = '''
+        self.wait_helper(signal.SIGALRM, '''
+        def test(signum):
             import os
             os.kill(os.getpid(), signum)
             info = signal.sigtimedwait([signum], (0, 0))
             assert info.si_signo == signum, 'info.si_signo != %s' % signum
-        '''
-
-        self.wait_helper(test, signal.SIGALRM)
+        ''')
 
     @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
                          'need signal.sigtimedwait()')
     def test_sigtimedwait_timeout(self):
-        test = '''
+        self.wait_helper(signal.SIGALRM, '''
+        def test(signum):
             received = signal.sigtimedwait([signum], (1, 0))
             assert received is None, "received=%r" % (received,)
-        '''
-
-        self.wait_helper(test, signal.SIGALRM)
+        ''')
 
     @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
                          'need signal.sigtimedwait()')
@@ -723,7 +695,8 @@
     @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'),
                          'need signal.sigwaitinfo()')
     def test_sigwaitinfo_interrupted(self):
-        test = '''
+        self.wait_helper(signal.SIGUSR1, '''
+        def test(signum):
             import errno
 
             hndl_called = True
@@ -741,9 +714,7 @@
                     raise Exception("Expected EINTR to be raised by sigwaitinfo")
             else:
                 raise Exception("Expected EINTR to be raised by sigwaitinfo")
-        '''
-
-        self.wait_helper(test, signal.SIGUSR1)
+        ''')
 
     @unittest.skipUnless(hasattr(signal, 'sigwait'),
                          'need signal.sigwait()')
@@ -791,46 +762,93 @@
     @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
                          'need signal.pthread_sigmask()')
     def test_pthread_sigmask(self):
-        test_blocked_signals = self.can_test_blocked_signals(False)
+        code = """if 1:
+        import signal
+        import os; import threading
+
+        def handler(signum, frame):
+            1/0
+
+        def kill(signum):
+            os.kill(os.getpid(), signum)
+
+        def read_sigmask():
+            return signal.pthread_sigmask(signal.SIG_BLOCK, [])
+
         signum = signal.SIGUSR1
 
         # Install our signal handler
-        old_handler = signal.signal(signum, self.handler)
-        self.addCleanup(signal.signal, signum, old_handler)
+        old_handler = signal.signal(signum, handler)
 
         # Unblock SIGUSR1 (and copy the old mask) to test our signal handler
         old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
-        self.addCleanup(signal.pthread_sigmask, signal.SIG_SETMASK, old_mask)
-        with self.assertRaises(ZeroDivisionError):
-            self.kill(signum)
+        try:
+            kill(signum)
+        except ZeroDivisionError:
+            pass
+        else:
+            raise Exception("ZeroDivisionError not raised")
 
         # Block and then raise SIGUSR1. The signal is blocked: the signal
         # handler is not called, and the signal is now pending
         signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
-        if test_blocked_signals:
-            self.kill(signum)
+        kill(signum)
 
         # Check the new mask
-        blocked = self.read_sigmask()
-        self.assertIn(signum, blocked)
-        self.assertEqual(old_mask ^ blocked, {signum})
+        blocked = read_sigmask()
+        assert signum in blocked, "%s not in %s" % (signum, blocked)
+        assert old_mask ^ blocked == {signum}, "%s ^ %s != {%s}" % (old_mask, blocked, signum)
 
         # Unblock SIGUSR1
-        if test_blocked_signals:
-            with self.assertRaises(ZeroDivisionError):
-                # unblock the pending signal calls immediatly the signal handler
-                signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
+        try:
+            # unblock the pending signal calls immediatly the signal handler
+            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
+        except ZeroDivisionError:
+            pass
         else:
-            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
-        with self.assertRaises(ZeroDivisionError):
-            self.kill(signum)
+            raise Exception("ZeroDivisionError not raised")
+        try:
+            kill(signum)
+        except ZeroDivisionError:
+            pass
+        else:
+            raise Exception("ZeroDivisionError not raised")
 
         # Check the new mask
-        unblocked = self.read_sigmask()
-        self.assertNotIn(signum, unblocked)
-        self.assertEqual(blocked ^ unblocked, {signum})
-        self.assertSequenceEqual(old_mask, unblocked)
-        # Finally, restore the previous signal handler and the signal mask
+        unblocked = read_sigmask()
+        assert signum not in unblocked, "%s in %s" % (signum, unblocked)
+        assert blocked ^ unblocked == {signum}, "%s ^ %s != {%s}" % (blocked, unblocked, signum)
+        assert old_mask == unblocked, "%s != %s" % (old_mask, unblocked)
+        """
+        assert_python_ok('-c', code)
+
+    @unittest.skipIf(sys.platform == 'freebsd6',
+        "issue #12392: send a signal to the main thread doesn't work "
+        "before the creation of the first thread on FreeBSD 6")
+    @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
+                         'need signal.pthread_kill()')
+    def test_pthread_kill_main_thread(self):
+        # Test that a signal can be sent to the main thread with pthread_kill()
+        # before any other thread has been created (see issue #12392).
+        code = """if True:
+            import threading
+            import signal
+            import sys
+
+            def handler(signum, frame):
+                sys.exit(3)
+
+            signal.signal(signal.SIGUSR1, handler)
+            signal.pthread_kill(threading.get_ident(), signal.SIGUSR1)
+            sys.exit(2)
+        """
+
+        with spawn_python('-c', code) as process:
+            stdout, stderr = process.communicate()
+            exitcode = process.wait()
+            if exitcode != 3:
+                raise Exception("Child error (exit code %s): %s" %
+                                (exitcode, stdout))
 
 
 def test_main():
diff --git a/Misc/NEWS b/Misc/NEWS
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -981,6 +981,10 @@
 Tests
 -----
 
+- Issue #12469: Run wakeup and pending signal tests in a subprocess to run the
+  test in a fresh process with only one thread and to not change signal
+  handling of the parent process.
+
 - Issue #8716: Avoid crashes caused by Aqua Tk on OSX when attempting to run
   test_tk or test_ttk_guionly under a username that is not currently logged
   in to the console windowserver (as may be the case under buildbot or ssh).

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list