[pypy-commit] pypy default: Backport from the py3k branch: add lock._py3k_acquire() with the

arigo noreply at buildbot.pypy.org
Sun Feb 23 08:43:59 CET 2014


Author: Armin Rigo <arigo at tunes.org>
Branch: 
Changeset: r69277:9b4e9b797ba1
Date: 2014-02-23 08:43 +0100
http://bitbucket.org/pypy/pypy/changeset/9b4e9b797ba1/

Log:	Backport from the py3k branch: add lock._py3k_acquire() with the
	same semantics as in Python 3.

diff --git a/pypy/module/thread/os_lock.py b/pypy/module/thread/os_lock.py
--- a/pypy/module/thread/os_lock.py
+++ b/pypy/module/thread/os_lock.py
@@ -2,11 +2,57 @@
 Python locks, based on true threading locks provided by the OS.
 """
 
+import time
 from rpython.rlib import rthread
 from pypy.module.thread.error import wrap_thread_error
 from pypy.interpreter.baseobjspace import W_Root
 from pypy.interpreter.gateway import interp2app, unwrap_spec
 from pypy.interpreter.typedef import TypeDef
+from rpython.rlib.rarithmetic import r_longlong
+
+
+LONGLONG_MAX = r_longlong(2 ** (r_longlong.BITS-1) - 1)
+TIMEOUT_MAX = LONGLONG_MAX
+
+RPY_LOCK_FAILURE, RPY_LOCK_ACQUIRED, RPY_LOCK_INTR = range(3)
+
+def parse_acquire_args(space, blocking, timeout):
+    if not blocking and timeout != -1.0:
+        raise OperationError(space.w_ValueError, space.wrap(
+                "can't specify a timeout for a non-blocking call"))
+    if timeout < 0.0 and timeout != -1.0:
+        raise OperationError(space.w_ValueError, space.wrap(
+                "timeout value must be strictly positive"))
+    if not blocking:
+        microseconds = 0
+    elif timeout == -1.0:
+        microseconds = -1
+    else:
+        timeout *= 1e6
+        if timeout > float(TIMEOUT_MAX):
+            raise OperationError(space.w_OverflowError, space.wrap(
+                    "timeout value is too large"))
+        microseconds = r_longlong(timeout)
+    return microseconds
+
+
+def acquire_timed(space, lock, microseconds):
+    """Helper to acquire an interruptible lock with a timeout."""
+    endtime = (time.time() * 1e6) + microseconds
+    while True:
+        result = lock.acquire_timed(microseconds)
+        if result == RPY_LOCK_INTR:
+            # Run signal handlers if we were interrupted
+            space.getexecutioncontext().checksignals()
+            if microseconds >= 0:
+                microseconds = r_longlong(endtime - (time.time() * 1e6))
+                # Check for negative values, since those mean block
+                # forever
+                if microseconds <= 0:
+                    result = RPY_LOCK_FAILURE
+        if result != RPY_LOCK_INTR:
+            break
+    return result
 
 
 class Lock(W_Root):
@@ -21,8 +67,8 @@
         except rthread.error:
             raise wrap_thread_error(space, "out of resources")
 
-    @unwrap_spec(waitflag=int)
-    def descr_lock_acquire(self, space, waitflag=1):
+    @unwrap_spec(blocking=int)
+    def descr_lock_acquire(self, space, blocking=1):
         """Lock the lock.  With the default argument of True, this blocks
 if the lock is already locked (even by the same thread), waiting for
 another thread to release the lock, and returns True once the lock is
@@ -30,9 +76,24 @@
 and the return value reflects whether the lock is acquired.
 The blocking operation is not interruptible."""
         mylock = self.lock
-        result = mylock.acquire(bool(waitflag))
+        result = mylock.acquire(bool(blocking))
         return space.newbool(result)
 
+    @unwrap_spec(blocking=int, timeout=float)
+    def descr_lock_py3k_acquire(self, space, blocking=1, timeout=-1.0):
+        """(Backport of a Python 3 API for PyPy.  This version takes
+a timeout argument and handles signals, like Ctrl-C.)
+
+Lock the lock.  Without argument, this blocks if the lock is already
+locked (even by the same thread), waiting for another thread to release
+the lock, and return None once the lock is acquired.
+With an argument, this will only block if the argument is true,
+and the return value reflects whether the lock is acquired.
+The blocking operation is interruptible."""
+        microseconds = parse_acquire_args(space, blocking, timeout)
+        result = acquire_timed(space, self.lock, microseconds)
+        return space.newbool(result == RPY_LOCK_ACQUIRED)
+
     def descr_lock_release(self, space):
         """Release the lock, allowing another thread that is blocked waiting for
 the lock to acquire the lock.  The lock must be in the locked state,
@@ -69,6 +130,7 @@
 descr_locked  = interp2app(Lock.descr_lock_locked)
 descr__enter__ = interp2app(Lock.descr__enter__)
 descr__exit__ = interp2app(Lock.descr__exit__)
+descr_py3k_acquire = interp2app(Lock.descr_lock_py3k_acquire)
 
 
 Lock.typedef = TypeDef("thread.lock",
@@ -84,6 +146,7 @@
 unlock it.  A thread attempting to lock a lock that it has already locked
 will block until another thread unlocks it.  Deadlocks may ensue.""",
     acquire = descr_acquire,
+    _py3k_acquire = descr_py3k_acquire,
     release = descr_release,
     locked  = descr_locked,
     __enter__ = descr__enter__,
diff --git a/pypy/module/thread/test/test_lock.py b/pypy/module/thread/test/test_lock.py
--- a/pypy/module/thread/test/test_lock.py
+++ b/pypy/module/thread/test/test_lock.py
@@ -1,4 +1,6 @@
 from __future__ import with_statement
+import py
+import sys, os
 from pypy.module.thread.test.support import GenericTestThread
 from rpython.translator.c.test.test_genc import compile
 
@@ -49,6 +51,16 @@
             assert feedback == [42]
         assert lock.locked() is False
 
+    def test_timeout(self):
+        import thread
+        lock = thread.allocate_lock()
+        assert lock.acquire() is True
+        assert lock.acquire(False) is False
+        raises(TypeError, lock.acquire, True, timeout=.1)
+        lock._py3k_acquire(True, timeout=.01)
+        lock._py3k_acquire(True, .01)
+
+
 def test_compile_lock():
     from rpython.rlib import rgc
     from rpython.rlib.rthread import allocate_lock
@@ -73,3 +85,78 @@
 class AppTestLockAgain(GenericTestThread):
     # test it at app-level again to detect strange interactions
     test_lock_again = AppTestLock.test_lock.im_func
+
+
+class AppTestLockSignals(GenericTestThread):
+    pytestmark = py.test.mark.skipif("os.name != 'posix'")
+
+    def setup_class(cls):
+        cls.w_using_pthread_cond = cls.space.wrap(sys.platform == 'freebsd6')
+
+    def w_acquire_retries_on_intr(self, lock):
+        import thread, os, signal, time
+        self.sig_recvd = False
+        def my_handler(signal, frame):
+            self.sig_recvd = True
+        old_handler = signal.signal(signal.SIGUSR1, my_handler)
+        try:
+            def other_thread():
+                # Acquire the lock in a non-main thread, so this test works for
+                # RLocks.
+                lock.acquire()
+                # Wait until the main thread is blocked in the lock acquire, and
+                # then wake it up with this.
+                time.sleep(0.5)
+                os.kill(os.getpid(), signal.SIGUSR1)
+                # Let the main thread take the interrupt, handle it, and retry
+                # the lock acquisition.  Then we'll let it run.
+                time.sleep(0.5)
+                lock.release()
+            thread.start_new_thread(other_thread, ())
+            # Wait until we can't acquire it without blocking...
+            while lock.acquire(blocking=False):
+                lock.release()
+                time.sleep(0.01)
+            result = lock.acquire()  # Block while we receive a signal.
+            assert self.sig_recvd
+            assert result
+        finally:
+            signal.signal(signal.SIGUSR1, old_handler)
+
+    def test_lock_acquire_retries_on_intr(self):
+        import thread
+        self.acquire_retries_on_intr(thread.allocate_lock())
+
+    def w_alarm_interrupt(self, sig, frame):
+        raise KeyboardInterrupt
+
+    def test_lock_acquire_interruption(self):
+        if self.using_pthread_cond:
+            skip('POSIX condition variables cannot be interrupted')
+        import thread, signal, time
+        # Mimic receiving a SIGINT (KeyboardInterrupt) with SIGALRM while stuck
+        # in a deadlock.
+        # XXX this test can fail when the legacy (non-semaphore) implementation
+        # of locks is used in thread_pthread.h, see issue #11223.
+        oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
+        try:
+            lock = thread.allocate_lock()
+            lock.acquire()
+            signal.alarm(1)
+            t1 = time.time()
+            # XXX: raises doesn't work here?
+            #raises(KeyboardInterrupt, lock.acquire, timeout=5)
+            try:
+                lock._py3k_acquire(timeout=10)
+            except KeyboardInterrupt:
+                pass
+            else:
+                assert False, 'Expected KeyboardInterrupt'
+            dt = time.time() - t1
+            # Checking that KeyboardInterrupt was raised is not sufficient.
+            # We want to assert that lock.acquire() was interrupted because
+            # of the signal, not that the signal handler was called immediately
+            # after timeout return of lock.acquire() (which can fool assertRaises).
+            assert dt < 8.0
+        finally:
+            signal.signal(signal.SIGALRM, oldalrm)


More information about the pypy-commit mailing list