[Python-checkins] bpo-46709: check eval breaker in specialized `CALL` opcodes (GH-31404)
markshannon
webhook-mailer at python.org
Fri Feb 18 07:56:31 EST 2022
https://github.com/python/cpython/commit/e2c28616ce6c3cdb1013c415125220a0b86b86a1
commit: e2c28616ce6c3cdb1013c415125220a0b86b86a1
branch: main
author: Nikita Sobolev <mail at sobolevn.me>
committer: markshannon <mark at hotpy.org>
date: 2022-02-18T12:56:23Z
summary:
bpo-46709: check eval breaker in specialized `CALL` opcodes (GH-31404)
files:
M Lib/unittest/test/test_break.py
M Python/ceval.c
diff --git a/Lib/unittest/test/test_break.py b/Lib/unittest/test/test_break.py
index eebd2b610ce11..33cbdd2661c17 100644
--- a/Lib/unittest/test/test_break.py
+++ b/Lib/unittest/test/test_break.py
@@ -4,14 +4,18 @@
import sys
import signal
import weakref
-
import unittest
+from test import support
+
@unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
@unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
class TestBreak(unittest.TestCase):
int_handler = None
+ # This number was smart-guessed, previously tests were failing
+ # after 7th run. So, we take `x * 2 + 1` to be sure.
+ default_repeats = 15
def setUp(self):
self._default_handler = signal.getsignal(signal.SIGINT)
@@ -24,6 +28,27 @@ def tearDown(self):
unittest.signals._interrupt_handler = None
+ def withRepeats(self, test_function, repeats=None):
+ if not support.check_impl_detail(cpython=True):
+ # Override repeats count on non-cpython to execute only once.
+ # Because this test only makes sense to be repeated on CPython.
+ repeats = 1
+ elif repeats is None:
+ repeats = self.default_repeats
+
+ for repeat in range(repeats):
+ with self.subTest(repeat=repeat):
+ # We don't run `setUp` for the very first repeat
+ # and we don't run `tearDown` for the very last one,
+ # because they are handled by the test class itself.
+ if repeat != 0:
+ self.setUp()
+ try:
+ test_function()
+ finally:
+ if repeat != repeats - 1:
+ self.tearDown()
+
def testInstallHandler(self):
default_handler = signal.getsignal(signal.SIGINT)
unittest.installHandler()
@@ -48,35 +73,34 @@ def testRegisterResult(self):
unittest.removeResult(result)
def testInterruptCaught(self):
- default_handler = signal.getsignal(signal.SIGINT)
-
- result = unittest.TestResult()
- unittest.installHandler()
- unittest.registerResult(result)
-
- self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
-
def test(result):
pid = os.getpid()
os.kill(pid, signal.SIGINT)
result.breakCaught = True
self.assertTrue(result.shouldStop)
- try:
- test(result)
- except KeyboardInterrupt:
- self.fail("KeyboardInterrupt not handled")
- self.assertTrue(result.breakCaught)
+ def test_function():
+ result = unittest.TestResult()
+ unittest.installHandler()
+ unittest.registerResult(result)
+ self.assertNotEqual(
+ signal.getsignal(signal.SIGINT),
+ self._default_handler,
+ )
+
+ try:
+ test(result)
+ except KeyboardInterrupt:
+ self.fail("KeyboardInterrupt not handled")
+ self.assertTrue(result.breakCaught)
+ self.withRepeats(test_function)
def testSecondInterrupt(self):
# Can't use skipIf decorator because the signal handler may have
# been changed after defining this method.
if signal.getsignal(signal.SIGINT) == signal.SIG_IGN:
self.skipTest("test requires SIGINT to not be ignored")
- result = unittest.TestResult()
- unittest.installHandler()
- unittest.registerResult(result)
def test(result):
pid = os.getpid()
@@ -86,40 +110,40 @@ def test(result):
os.kill(pid, signal.SIGINT)
self.fail("Second KeyboardInterrupt not raised")
- try:
- test(result)
- except KeyboardInterrupt:
- pass
- else:
- self.fail("Second KeyboardInterrupt not raised")
- self.assertTrue(result.breakCaught)
+ def test_function():
+ result = unittest.TestResult()
+ unittest.installHandler()
+ unittest.registerResult(result)
+ with self.assertRaises(KeyboardInterrupt):
+ test(result)
+ self.assertTrue(result.breakCaught)
+ self.withRepeats(test_function)
- def testTwoResults(self):
- unittest.installHandler()
- result = unittest.TestResult()
- unittest.registerResult(result)
- new_handler = signal.getsignal(signal.SIGINT)
+ def testTwoResults(self):
+ def test_function():
+ unittest.installHandler()
- result2 = unittest.TestResult()
- unittest.registerResult(result2)
- self.assertEqual(signal.getsignal(signal.SIGINT), new_handler)
+ result = unittest.TestResult()
+ unittest.registerResult(result)
+ new_handler = signal.getsignal(signal.SIGINT)
- result3 = unittest.TestResult()
+ result2 = unittest.TestResult()
+ unittest.registerResult(result2)
+ self.assertEqual(signal.getsignal(signal.SIGINT), new_handler)
- def test(result):
- pid = os.getpid()
- os.kill(pid, signal.SIGINT)
+ result3 = unittest.TestResult()
- try:
- test(result)
- except KeyboardInterrupt:
- self.fail("KeyboardInterrupt not handled")
+ try:
+ os.kill(os.getpid(), signal.SIGINT)
+ except KeyboardInterrupt:
+ self.fail("KeyboardInterrupt not handled")
- self.assertTrue(result.shouldStop)
- self.assertTrue(result2.shouldStop)
- self.assertFalse(result3.shouldStop)
+ self.assertTrue(result.shouldStop)
+ self.assertTrue(result2.shouldStop)
+ self.assertFalse(result3.shouldStop)
+ self.withRepeats(test_function)
def testHandlerReplacedButCalled(self):
@@ -127,23 +151,25 @@ def testHandlerReplacedButCalled(self):
# been changed after defining this method.
if signal.getsignal(signal.SIGINT) == signal.SIG_IGN:
self.skipTest("test requires SIGINT to not be ignored")
- # If our handler has been replaced (is no longer installed) but is
- # called by the *new* handler, then it isn't safe to delay the
- # SIGINT and we should immediately delegate to the default handler
- unittest.installHandler()
-
- handler = signal.getsignal(signal.SIGINT)
- def new_handler(frame, signum):
- handler(frame, signum)
- signal.signal(signal.SIGINT, new_handler)
- try:
- pid = os.getpid()
- os.kill(pid, signal.SIGINT)
- except KeyboardInterrupt:
- pass
- else:
- self.fail("replaced but delegated handler doesn't raise interrupt")
+ def test_function():
+ # If our handler has been replaced (is no longer installed) but is
+ # called by the *new* handler, then it isn't safe to delay the
+ # SIGINT and we should immediately delegate to the default handler
+ unittest.installHandler()
+
+ handler = signal.getsignal(signal.SIGINT)
+ def new_handler(frame, signum):
+ handler(frame, signum)
+ signal.signal(signal.SIGINT, new_handler)
+
+ try:
+ os.kill(os.getpid(), signal.SIGINT)
+ except KeyboardInterrupt:
+ pass
+ else:
+ self.fail("replaced but delegated handler doesn't raise interrupt")
+ self.withRepeats(test_function)
def testRunner(self):
# Creating a TextTestRunner with the appropriate argument should
diff --git a/Python/ceval.c b/Python/ceval.c
index b900de53c8c92..5a6de5bce497f 100644
--- a/Python/ceval.c
+++ b/Python/ceval.c
@@ -4742,6 +4742,7 @@ _PyEval_EvalFrameDefault(PyThreadState *tstate, InterpreterFrame *frame, int thr
if (res == NULL) {
goto error;
}
+ CHECK_EVAL_BREAKER();
DISPATCH();
}
@@ -4761,6 +4762,7 @@ _PyEval_EvalFrameDefault(PyThreadState *tstate, InterpreterFrame *frame, int thr
if (res == NULL) {
goto error;
}
+ CHECK_EVAL_BREAKER();
DISPATCH();
}
@@ -4785,6 +4787,7 @@ _PyEval_EvalFrameDefault(PyThreadState *tstate, InterpreterFrame *frame, int thr
if (res == NULL) {
goto error;
}
+ CHECK_EVAL_BREAKER();
DISPATCH();
}
@@ -4816,6 +4819,7 @@ _PyEval_EvalFrameDefault(PyThreadState *tstate, InterpreterFrame *frame, int thr
if (res == NULL) {
goto error;
}
+ CHECK_EVAL_BREAKER();
DISPATCH();
}
@@ -4854,6 +4858,7 @@ _PyEval_EvalFrameDefault(PyThreadState *tstate, InterpreterFrame *frame, int thr
*/
goto error;
}
+ CHECK_EVAL_BREAKER();
DISPATCH();
}
@@ -4896,6 +4901,7 @@ _PyEval_EvalFrameDefault(PyThreadState *tstate, InterpreterFrame *frame, int thr
if (res == NULL) {
goto error;
}
+ CHECK_EVAL_BREAKER();
DISPATCH();
}
@@ -5013,6 +5019,7 @@ _PyEval_EvalFrameDefault(PyThreadState *tstate, InterpreterFrame *frame, int thr
if (res == NULL) {
goto error;
}
+ CHECK_EVAL_BREAKER();
DISPATCH();
}
@@ -5040,6 +5047,7 @@ _PyEval_EvalFrameDefault(PyThreadState *tstate, InterpreterFrame *frame, int thr
if (res == NULL) {
goto error;
}
+ CHECK_EVAL_BREAKER();
DISPATCH();
}
@@ -5067,6 +5075,7 @@ _PyEval_EvalFrameDefault(PyThreadState *tstate, InterpreterFrame *frame, int thr
if (res == NULL) {
goto error;
}
+ CHECK_EVAL_BREAKER();
DISPATCH();
}
More information about the Python-checkins
mailing list