[pypy-commit] pypy default: fix semaphore deadlock in issue 2953

mattip pypy.commits at gmail.com
Tue Apr 30 09:42:51 EDT 2019


Author: Matti Picus <matti.picus at gmail.com>
Branch: 
Changeset: r96566:6187f28f2baf
Date: 2019-04-30 09:32 -0400
http://bitbucket.org/pypy/pypy/changeset/6187f28f2baf/

Log:	fix semaphore deadlock in issue 2953

diff --git a/pypy/doc/whatsnew-head.rst b/pypy/doc/whatsnew-head.rst
--- a/pypy/doc/whatsnew-head.rst
+++ b/pypy/doc/whatsnew-head.rst
@@ -16,3 +16,8 @@
 .. branch: datetime_api_27
 
 Add ``DateTime_FromTimestamp`` and ``Date_FromTimestamp``
+
+.. branch: semlock-deadlock
+
+Test and reduce the probability of a deadlock when acquiring a semaphore by
+moving global state changes closer to the actual aquire.
diff --git a/pypy/module/_multiprocessing/interp_semaphore.py b/pypy/module/_multiprocessing/interp_semaphore.py
--- a/pypy/module/_multiprocessing/interp_semaphore.py
+++ b/pypy/module/_multiprocessing/interp_semaphore.py
@@ -46,7 +46,8 @@
     eci = ExternalCompilationInfo(
         includes = ['sys/time.h',
                     'limits.h',
-                    'semaphore.h'],
+                    'semaphore.h',
+                    ],
         libraries = libraries,
         )
 
@@ -259,6 +260,8 @@
         res = rwin32.WaitForSingleObject(self.handle, 0)
 
         if res != rwin32.WAIT_TIMEOUT:
+            self.last_tid = rthread.get_ident()
+            self.count += 1
             return True
 
         msecs = full_msecs
@@ -291,6 +294,8 @@
 
         # handle result
         if res != rwin32.WAIT_TIMEOUT:
+            self.last_tid = rthread.get_ident()
+            self.count += 1
             return True
         return False
 
@@ -369,8 +374,9 @@
                     elif e.errno in (errno.EAGAIN, errno.ETIMEDOUT):
                         return False
                     raise
-                _check_signals(space)
-
+                _check_signals(space)    
+                self.last_tid = rthread.get_ident()
+                self.count += 1
                 return True
         finally:
             if deadline:
@@ -439,6 +445,7 @@
         self.count = 0
         self.maxvalue = maxvalue
         self.register_finalizer(space)
+        self.last_tid = -1
 
     def kind_get(self, space):
         return space.newint(self.kind)
@@ -476,15 +483,15 @@
         if self.kind == RECURSIVE_MUTEX and self._ismine():
             self.count += 1
             return space.w_True
-
         try:
+            # sets self.last_tid and increments self.count
+            # those steps need to be as close as possible to
+            # acquiring the semlock for self._ismine() to support
+            # multiple threads 
             got = semlock_acquire(self, space, block, w_timeout)
         except OSError as e:
             raise wrap_oserror(space, e)
-
         if got:
-            self.last_tid = rthread.get_ident()
-            self.count += 1
             return space.w_True
         else:
             return space.w_False
@@ -501,10 +508,10 @@
 
         try:
             semlock_release(self, space)
+            self.count -= 1
         except OSError as e:
             raise wrap_oserror(space, e)
 
-        self.count -= 1
 
     def after_fork(self):
         self.count = 0
diff --git a/pypy/module/_multiprocessing/test/test_semaphore.py b/pypy/module/_multiprocessing/test/test_semaphore.py
--- a/pypy/module/_multiprocessing/test/test_semaphore.py
+++ b/pypy/module/_multiprocessing/test/test_semaphore.py
@@ -17,6 +17,7 @@
     def setup_class(cls):
         cls.w_SEMAPHORE = cls.space.wrap(SEMAPHORE)
         cls.w_RECURSIVE = cls.space.wrap(RECURSIVE_MUTEX)
+        cls.w_runappdirect = cls.space.wrap(cls.runappdirect)
 
     def test_semaphore(self):
         from _multiprocessing import SemLock
@@ -108,3 +109,25 @@
         with sem:
             assert sem._count() == 1
         assert sem._count() == 0
+
+    def test_in_threads(self):
+        from _multiprocessing import SemLock
+        from threading import Thread
+        from time import sleep
+        l = SemLock(0, 1, 1)
+        if self.runappdirect:
+            def f(id):
+                for i in range(10000):
+                    pass
+        else:
+            def f(id):
+                for i in range(1000):
+                    # reduce the probability of thread switching
+                    # at exactly the wrong time in semlock_acquire
+                    for j in range(10):
+                        pass
+        threads = [Thread(None, f, args=(i,)) for i in range(2)]
+        [t.start() for t in threads]
+        # if the RLock calls to sem_wait and sem_post do not match,
+        # one of the threads will block and the call to join will fail
+        [t.join() for t in threads]


More information about the pypy-commit mailing list