[pypy-commit] pypy default: Backport from the py3k branch: add lock._py3k_acquire() with the
arigo
noreply at buildbot.pypy.org
Sun Feb 23 08:43:59 CET 2014
Author: Armin Rigo <arigo at tunes.org>
Branch:
Changeset: r69277:9b4e9b797ba1
Date: 2014-02-23 08:43 +0100
http://bitbucket.org/pypy/pypy/changeset/9b4e9b797ba1/
Log: Backport from the py3k branch: add lock._py3k_acquire() with the
same semantics as in Python 3.
diff --git a/pypy/module/thread/os_lock.py b/pypy/module/thread/os_lock.py
--- a/pypy/module/thread/os_lock.py
+++ b/pypy/module/thread/os_lock.py
@@ -2,11 +2,57 @@
Python locks, based on true threading locks provided by the OS.
"""
+import time
from rpython.rlib import rthread
from pypy.module.thread.error import wrap_thread_error
from pypy.interpreter.baseobjspace import W_Root
from pypy.interpreter.gateway import interp2app, unwrap_spec
from pypy.interpreter.typedef import TypeDef
+from rpython.rlib.rarithmetic import r_longlong
+
+
+LONGLONG_MAX = r_longlong(2 ** (r_longlong.BITS-1) - 1)
+TIMEOUT_MAX = LONGLONG_MAX
+
+RPY_LOCK_FAILURE, RPY_LOCK_ACQUIRED, RPY_LOCK_INTR = range(3)
+
+def parse_acquire_args(space, blocking, timeout):
+ if not blocking and timeout != -1.0:
+ raise OperationError(space.w_ValueError, space.wrap(
+ "can't specify a timeout for a non-blocking call"))
+ if timeout < 0.0 and timeout != -1.0:
+ raise OperationError(space.w_ValueError, space.wrap(
+ "timeout value must be strictly positive"))
+ if not blocking:
+ microseconds = 0
+ elif timeout == -1.0:
+ microseconds = -1
+ else:
+ timeout *= 1e6
+ if timeout > float(TIMEOUT_MAX):
+ raise OperationError(space.w_OverflowError, space.wrap(
+ "timeout value is too large"))
+ microseconds = r_longlong(timeout)
+ return microseconds
+
+
+def acquire_timed(space, lock, microseconds):
+ """Helper to acquire an interruptible lock with a timeout."""
+ endtime = (time.time() * 1e6) + microseconds
+ while True:
+ result = lock.acquire_timed(microseconds)
+ if result == RPY_LOCK_INTR:
+ # Run signal handlers if we were interrupted
+ space.getexecutioncontext().checksignals()
+ if microseconds >= 0:
+ microseconds = r_longlong(endtime - (time.time() * 1e6))
+ # Check for negative values, since those mean block
+ # forever
+ if microseconds <= 0:
+ result = RPY_LOCK_FAILURE
+ if result != RPY_LOCK_INTR:
+ break
+ return result
class Lock(W_Root):
@@ -21,8 +67,8 @@
except rthread.error:
raise wrap_thread_error(space, "out of resources")
- @unwrap_spec(waitflag=int)
- def descr_lock_acquire(self, space, waitflag=1):
+ @unwrap_spec(blocking=int)
+ def descr_lock_acquire(self, space, blocking=1):
"""Lock the lock. With the default argument of True, this blocks
if the lock is already locked (even by the same thread), waiting for
another thread to release the lock, and returns True once the lock is
@@ -30,9 +76,24 @@
and the return value reflects whether the lock is acquired.
The blocking operation is not interruptible."""
mylock = self.lock
- result = mylock.acquire(bool(waitflag))
+ result = mylock.acquire(bool(blocking))
return space.newbool(result)
+ @unwrap_spec(blocking=int, timeout=float)
+ def descr_lock_py3k_acquire(self, space, blocking=1, timeout=-1.0):
+ """(Backport of a Python 3 API for PyPy. This version takes
+a timeout argument and handles signals, like Ctrl-C.)
+
+Lock the lock. Without argument, this blocks if the lock is already
+locked (even by the same thread), waiting for another thread to release
+the lock, and return None once the lock is acquired.
+With an argument, this will only block if the argument is true,
+and the return value reflects whether the lock is acquired.
+The blocking operation is interruptible."""
+ microseconds = parse_acquire_args(space, blocking, timeout)
+ result = acquire_timed(space, self.lock, microseconds)
+ return space.newbool(result == RPY_LOCK_ACQUIRED)
+
def descr_lock_release(self, space):
"""Release the lock, allowing another thread that is blocked waiting for
the lock to acquire the lock. The lock must be in the locked state,
@@ -69,6 +130,7 @@
descr_locked = interp2app(Lock.descr_lock_locked)
descr__enter__ = interp2app(Lock.descr__enter__)
descr__exit__ = interp2app(Lock.descr__exit__)
+descr_py3k_acquire = interp2app(Lock.descr_lock_py3k_acquire)
Lock.typedef = TypeDef("thread.lock",
@@ -84,6 +146,7 @@
unlock it. A thread attempting to lock a lock that it has already locked
will block until another thread unlocks it. Deadlocks may ensue.""",
acquire = descr_acquire,
+ _py3k_acquire = descr_py3k_acquire,
release = descr_release,
locked = descr_locked,
__enter__ = descr__enter__,
diff --git a/pypy/module/thread/test/test_lock.py b/pypy/module/thread/test/test_lock.py
--- a/pypy/module/thread/test/test_lock.py
+++ b/pypy/module/thread/test/test_lock.py
@@ -1,4 +1,6 @@
from __future__ import with_statement
+import py
+import sys, os
from pypy.module.thread.test.support import GenericTestThread
from rpython.translator.c.test.test_genc import compile
@@ -49,6 +51,16 @@
assert feedback == [42]
assert lock.locked() is False
+ def test_timeout(self):
+ import thread
+ lock = thread.allocate_lock()
+ assert lock.acquire() is True
+ assert lock.acquire(False) is False
+ raises(TypeError, lock.acquire, True, timeout=.1)
+ lock._py3k_acquire(True, timeout=.01)
+ lock._py3k_acquire(True, .01)
+
+
def test_compile_lock():
from rpython.rlib import rgc
from rpython.rlib.rthread import allocate_lock
@@ -73,3 +85,78 @@
class AppTestLockAgain(GenericTestThread):
# test it at app-level again to detect strange interactions
test_lock_again = AppTestLock.test_lock.im_func
+
+
+class AppTestLockSignals(GenericTestThread):
+ pytestmark = py.test.mark.skipif("os.name != 'posix'")
+
+ def setup_class(cls):
+ cls.w_using_pthread_cond = cls.space.wrap(sys.platform == 'freebsd6')
+
+ def w_acquire_retries_on_intr(self, lock):
+ import thread, os, signal, time
+ self.sig_recvd = False
+ def my_handler(signal, frame):
+ self.sig_recvd = True
+ old_handler = signal.signal(signal.SIGUSR1, my_handler)
+ try:
+ def other_thread():
+ # Acquire the lock in a non-main thread, so this test works for
+ # RLocks.
+ lock.acquire()
+ # Wait until the main thread is blocked in the lock acquire, and
+ # then wake it up with this.
+ time.sleep(0.5)
+ os.kill(os.getpid(), signal.SIGUSR1)
+ # Let the main thread take the interrupt, handle it, and retry
+ # the lock acquisition. Then we'll let it run.
+ time.sleep(0.5)
+ lock.release()
+ thread.start_new_thread(other_thread, ())
+ # Wait until we can't acquire it without blocking...
+ while lock.acquire(blocking=False):
+ lock.release()
+ time.sleep(0.01)
+ result = lock.acquire() # Block while we receive a signal.
+ assert self.sig_recvd
+ assert result
+ finally:
+ signal.signal(signal.SIGUSR1, old_handler)
+
+ def test_lock_acquire_retries_on_intr(self):
+ import thread
+ self.acquire_retries_on_intr(thread.allocate_lock())
+
+ def w_alarm_interrupt(self, sig, frame):
+ raise KeyboardInterrupt
+
+ def test_lock_acquire_interruption(self):
+ if self.using_pthread_cond:
+ skip('POSIX condition variables cannot be interrupted')
+ import thread, signal, time
+ # Mimic receiving a SIGINT (KeyboardInterrupt) with SIGALRM while stuck
+ # in a deadlock.
+ # XXX this test can fail when the legacy (non-semaphore) implementation
+ # of locks is used in thread_pthread.h, see issue #11223.
+ oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
+ try:
+ lock = thread.allocate_lock()
+ lock.acquire()
+ signal.alarm(1)
+ t1 = time.time()
+ # XXX: raises doesn't work here?
+ #raises(KeyboardInterrupt, lock.acquire, timeout=5)
+ try:
+ lock._py3k_acquire(timeout=10)
+ except KeyboardInterrupt:
+ pass
+ else:
+ assert False, 'Expected KeyboardInterrupt'
+ dt = time.time() - t1
+ # Checking that KeyboardInterrupt was raised is not sufficient.
+ # We want to assert that lock.acquire() was interrupted because
+ # of the signal, not that the signal handler was called immediately
+ # after timeout return of lock.acquire() (which can fool assertRaises).
+ assert dt < 8.0
+ finally:
+ signal.signal(signal.SIGALRM, oldalrm)
More information about the pypy-commit
mailing list