[pypy-commit] pypy default: fix semaphore deadlock in issue 2953
mattip
pypy.commits at gmail.com
Tue Apr 30 09:42:51 EDT 2019
Author: Matti Picus <matti.picus at gmail.com>
Branch:
Changeset: r96566:6187f28f2baf
Date: 2019-04-30 09:32 -0400
http://bitbucket.org/pypy/pypy/changeset/6187f28f2baf/
Log: fix semaphore deadlock in issue 2953
diff --git a/pypy/doc/whatsnew-head.rst b/pypy/doc/whatsnew-head.rst
--- a/pypy/doc/whatsnew-head.rst
+++ b/pypy/doc/whatsnew-head.rst
@@ -16,3 +16,8 @@
.. branch: datetime_api_27
Add ``DateTime_FromTimestamp`` and ``Date_FromTimestamp``
+
+.. branch: semlock-deadlock
+
+Test and reduce the probability of a deadlock when acquiring a semaphore by
+moving global state changes closer to the actual aquire.
diff --git a/pypy/module/_multiprocessing/interp_semaphore.py b/pypy/module/_multiprocessing/interp_semaphore.py
--- a/pypy/module/_multiprocessing/interp_semaphore.py
+++ b/pypy/module/_multiprocessing/interp_semaphore.py
@@ -46,7 +46,8 @@
eci = ExternalCompilationInfo(
includes = ['sys/time.h',
'limits.h',
- 'semaphore.h'],
+ 'semaphore.h',
+ ],
libraries = libraries,
)
@@ -259,6 +260,8 @@
res = rwin32.WaitForSingleObject(self.handle, 0)
if res != rwin32.WAIT_TIMEOUT:
+ self.last_tid = rthread.get_ident()
+ self.count += 1
return True
msecs = full_msecs
@@ -291,6 +294,8 @@
# handle result
if res != rwin32.WAIT_TIMEOUT:
+ self.last_tid = rthread.get_ident()
+ self.count += 1
return True
return False
@@ -369,8 +374,9 @@
elif e.errno in (errno.EAGAIN, errno.ETIMEDOUT):
return False
raise
- _check_signals(space)
-
+ _check_signals(space)
+ self.last_tid = rthread.get_ident()
+ self.count += 1
return True
finally:
if deadline:
@@ -439,6 +445,7 @@
self.count = 0
self.maxvalue = maxvalue
self.register_finalizer(space)
+ self.last_tid = -1
def kind_get(self, space):
return space.newint(self.kind)
@@ -476,15 +483,15 @@
if self.kind == RECURSIVE_MUTEX and self._ismine():
self.count += 1
return space.w_True
-
try:
+ # sets self.last_tid and increments self.count
+ # those steps need to be as close as possible to
+ # acquiring the semlock for self._ismine() to support
+ # multiple threads
got = semlock_acquire(self, space, block, w_timeout)
except OSError as e:
raise wrap_oserror(space, e)
-
if got:
- self.last_tid = rthread.get_ident()
- self.count += 1
return space.w_True
else:
return space.w_False
@@ -501,10 +508,10 @@
try:
semlock_release(self, space)
+ self.count -= 1
except OSError as e:
raise wrap_oserror(space, e)
- self.count -= 1
def after_fork(self):
self.count = 0
diff --git a/pypy/module/_multiprocessing/test/test_semaphore.py b/pypy/module/_multiprocessing/test/test_semaphore.py
--- a/pypy/module/_multiprocessing/test/test_semaphore.py
+++ b/pypy/module/_multiprocessing/test/test_semaphore.py
@@ -17,6 +17,7 @@
def setup_class(cls):
cls.w_SEMAPHORE = cls.space.wrap(SEMAPHORE)
cls.w_RECURSIVE = cls.space.wrap(RECURSIVE_MUTEX)
+ cls.w_runappdirect = cls.space.wrap(cls.runappdirect)
def test_semaphore(self):
from _multiprocessing import SemLock
@@ -108,3 +109,25 @@
with sem:
assert sem._count() == 1
assert sem._count() == 0
+
+ def test_in_threads(self):
+ from _multiprocessing import SemLock
+ from threading import Thread
+ from time import sleep
+ l = SemLock(0, 1, 1)
+ if self.runappdirect:
+ def f(id):
+ for i in range(10000):
+ pass
+ else:
+ def f(id):
+ for i in range(1000):
+ # reduce the probability of thread switching
+ # at exactly the wrong time in semlock_acquire
+ for j in range(10):
+ pass
+ threads = [Thread(None, f, args=(i,)) for i in range(2)]
+ [t.start() for t in threads]
+ # if the RLock calls to sem_wait and sem_post do not match,
+ # one of the threads will block and the call to join will fail
+ [t.join() for t in threads]
More information about the pypy-commit
mailing list