[pypy-commit] pypy stmgc-c7: Some lightweight redesign of the API, and more heavyweight redesign of the implementation

arigo noreply at buildbot.pypy.org
Sat Jan 31 01:16:11 CET 2015


Author: Armin Rigo <arigo at tunes.org>
Branch: stmgc-c7
Changeset: r75596:8286e713d46c
Date: 2015-01-31 01:15 +0100
http://bitbucket.org/pypy/pypy/changeset/8286e713d46c/

Log:	Some lightweight redesign of the API, and more heavyweight redesign
	of the implementation into a version that should be more conflict-
	free

diff --git a/lib_pypy/transaction.py b/lib_pypy/transaction.py
--- a/lib_pypy/transaction.py
+++ b/lib_pypy/transaction.py
@@ -43,57 +43,58 @@
     def hint_commit_soon():
         return None
 
-
-def set_num_threads(num):
-    """Set the number of threads to use."""
-    if num < 1:
-        raise ValueError("'num' must be at least 1, got %r" % (num,))
-    if _thread_pool.in_transaction:
-        raise TransactionError("cannot change the number of threads "
-                               "while running transactions")
-    _thread_pool.num_threads = num
+try:
+    from pypystm import getsegmentlimit
+except ImportError:
+    # Not a STM-enabled PyPy.
+    def getsegmentlimit():
+        return 1
 
 
 class TransactionError(Exception):
     pass
 
 
-# XXX right now uses the same API as the old pypy-stm.  This will
-# be redesigned later.
-
 def add(f, *args, **kwds):
-    """Register the call 'f(*args, **kwds)' as running a new
-    transaction.  If we are currently running in a transaction too, the
-    new transaction will only start after the end of the current
-    transaction.  Note that if the current transaction or another running
-    in the meantime raises an exception, all pending transactions are
-    cancelled.
+    """Register a new transaction that will be done by 'f(*args, **kwds)'.
+    Must be called within the transaction in the "with TransactionQueue()"
+    block, or within a transaction started by this one, directly or
+    indirectly.
     """
     _thread_local.pending.append((f, args, kwds))
 
 
-def run():
-    """Run the pending transactions, as well as all transactions started
-    by them, and so on.  The order is random and undeterministic.  Must
-    be called from the main program, i.e. not from within another
-    transaction.  If at some point all transactions are done, returns.
-    If a transaction raises an exception, it propagates here; in this
-    case all pending transactions are cancelled.
+class TransactionQueue(object):
+    """Use in 'with TransactionQueue():'.  Creates a queue of
+    transactions.  The first transaction in the queue is the content of
+    the 'with:' block, which is immediately started.
+
+    Any transaction can register new transactions that will be run
+    after the current one is finished, using the global function add().
     """
-    tpool = _thread_pool
-    if tpool.in_transaction:
-        raise TransactionError("recursive invocation of transaction.run()")
-    if not _thread_local.pending:
-        return     # nothing to do
-    try:
-        tpool.setup()
-        tpool.run()
-    finally:
-        tpool.teardown()
-    tpool.reraise()
 
-def number_of_transactions_in_last_run():
-    return _thread_pool.transactions_run
+    def __init__(self, nb_segments=0):
+        if nb_segments <= 0:
+            nb_segments = getsegmentlimit()
+        _thread_pool.ensure_threads(nb_segments)
+
+    def __enter__(self):
+        if hasattr(_thread_local, "pending"):
+            raise TransactionError(
+                "recursive invocation of TransactionQueue()")
+        if is_atomic():
+            raise TransactionError(
+                "invocation of TransactionQueue() from an atomic context")
+        _thread_local.pending = []
+        atomic.__enter__()
+
+    def __exit__(self, exc_type, exc_value, traceback):
+        atomic.__exit__(exc_type, exc_value, traceback)
+        pending = _thread_local.pending
+        del _thread_local.pending
+        if exc_type is None and len(pending) > 0:
+            _thread_pool.run(pending)
+
 
 # ____________________________________________________________
 
@@ -101,152 +102,115 @@
 class _ThreadPool(object):
 
     def __init__(self):
-        try:
-            from pypystm import getsegmentlimit
-            self.num_threads = getsegmentlimit()
-        except ImportError:
-            self.num_threads = 4
-        self.in_transaction = False
-        self.transactions_run = None
+        self.lock_running = thread.allocate_lock()
+        self.lock_done_running = thread.allocate_lock()
+        self.lock_done_running.acquire()
+        self.nb_threads = 0
+        self.deque = collections.deque()
+        self.locks = []
+        self.lock_deque = thread.allocate_lock()
+        self.exception = []
 
-    def setup(self):
-        # a mutex to protect parts of _grab_next_thing_to_do()
-        self.lock_mutex = thread.allocate_lock()
-        # this lock is released if and only if there are things to do in
-        # 'self.pending'; both are modified together, with the lock_mutex.
-        self.lock_pending = thread.allocate_lock()
-        # this lock is released when we are finished at the end
-        self.lock_if_released_then_finished = thread.allocate_lock()
-        self.lock_if_released_then_finished.acquire()
+    def ensure_threads(self, n):
+        if n > self.nb_threads:
+            with self.lock_running:
+                for i in range(self.nb_threads, n):
+                    assert len(self.locks) == self.nb_threads
+                    self.nb_threads += 1
+                    thread.start_new_thread(self.thread_runner, ())
+                    # The newly started thread should run immediately into
+                    # the case 'if len(self.locks) == self.nb_threads:'
+                    # and release this lock.  Wait until it does.
+                    self.lock_done_running.acquire()
+
+    def run(self, pending):
+        # For now, can't run multiple threads with each an independent
+        # TransactionQueue(): they are serialized.
+        with self.lock_running:
+            assert self.exception == []
+            assert len(self.deque) == 0
+            deque = self.deque
+            with self.lock_deque:
+                deque.extend(pending)
+                try:
+                    for i in range(len(pending)):
+                        self.locks.pop().release()
+                except IndexError:     # pop from empty list
+                    pass
+            #
+            self.lock_done_running.acquire()
+            #
+            if self.exception:
+                exc_type, exc_value, exc_traceback = self.exception
+                del self.exception[:]
+                raise exc_type, exc_value, exc_traceback
+
+    def thread_runner(self):
+        deque = self.deque
+        lock = thread.allocate_lock()
+        lock.acquire()
+        pending = []
+        _thread_local.pending = pending
+        lock_deque = self.lock_deque
+        exception = self.exception
         #
-        self.pending = _thread_local.pending
-        # there must be pending items at the beginning, which means that
-        # 'lock_pending' can indeed be released
-        assert self.pending
-        _thread_local.pending = None
-        #
-        self.num_waiting_threads = 0
-        self.transactions_run = 0
-        self.finished = False
-        self.got_exception = []
-        self.in_transaction = True
+        while True:
+            #
+            # Look at the deque and try to fetch the next item on the left.
+            # If empty, we add our lock to the 'locks' list.
+            lock_deque.acquire()
+            if deque:
+                next_transaction = deque.popleft()
+                lock_deque.release()
+            else:
+                self.locks.append(lock)
+                if len(self.locks) == self.nb_threads:
+                    self.lock_done_running.release()
+                lock_deque.release()
+                #
+                # Now wait until our lock is released.
+                lock.acquire()
+                continue
+            #
+            # Now we have a next_transaction.  Run it.
+            assert len(pending) == 0
+            while True:
+                f, args, kwds = next_transaction
+                with atomic:
+                    if len(exception) == 0:
+                        try:
+                            f(*args, **kwds)
+                        except:
+                            exception.extend(sys.exc_info())
+                del next_transaction
+                #
+                # If no new 'pending' transactions have been added, exit
+                # this loop and go back to fetch more from the deque.
+                if len(pending) == 0:
+                    break
+                #
+                # If we have some new 'pending' transactions, add them
+                # to the right of the deque and pop the next one from
+                # the left.  As we do this atomically with the
+                # 'lock_deque', we are sure that the deque cannot be
+                # empty before the popleft().  (We do that even when
+                # 'len(pending) == 1' instead of simply assigning the
+                # single item to 'next_transaction', because it looks
+                # like a good idea to preserve some first-in-first-out
+                # approximation.)
+                with self.lock_deque:
+                    deque.extend(pending)
+                    next_transaction = deque.popleft()
+                    try:
+                        for i in range(1, len(pending)):
+                            self.locks.pop().release()
+                    except IndexError:     # pop from empty list
+                        pass
+                del pending[:]
 
-    def run(self):
-        # start the N threads
-        task_counters = [[0] for i in range(self.num_threads)]
-        for counter in task_counters:
-            thread.start_new_thread(self._run_thread, (counter,))
-        # now wait.  When we manage to acquire the following lock, then
-        # we are finished.
-        self.lock_if_released_then_finished.acquire()
-        self.transactions_run = sum(x[0] for x in task_counters)
-
-    def teardown(self):
-        self.in_transaction = False
-        self.pending = None
-        self.lock_if_released_then_finished = None
-        self.lock_pending = None
-        self.lock_mutex = None
-        _thread_local.pending = collections.deque()
-
-    def reraise(self):
-        exc = self.got_exception
-        self.got_exception = None
-        if exc:
-            raise exc[0], exc[1], exc[2]    # exception, value, traceback
-
-    def _run_thread(self, counter):
-        tloc_pending = _thread_local.pending
-        got_exception = self.got_exception
-        try:
-            while True:
-                self._do_it(self._grab_next_thing_to_do(tloc_pending),
-                            got_exception)
-                counter[0] += 1
-        except _Done:
-            pass
-
-    def _grab_next_thing_to_do(self, tloc_pending):
-        if tloc_pending:
-            # grab the next thing to do from the thread-local deque
-            next = tloc_pending.popleft()
-            # add the rest, if any, to the global 'pending'
-            if tloc_pending:
-                #
-                self.lock_mutex.acquire()
-                if not self.pending:
-                    # self.pending is empty so far, but we are adding stuff.
-                    # we have to release the following lock.
-                    self.lock_pending.release()
-                self.pending.extend(tloc_pending)
-                self.lock_mutex.release()
-                #
-                tloc_pending.clear()
-            return next
-        #
-        self.lock_mutex.acquire()
-        while True:
-            try:
-                next = self.pending.popleft()
-            except IndexError:
-                # self.pending is empty: wait until it no longer is.
-                pass
-            else:
-                # self.pending was not empty.  If now it is empty, then
-                # fix the status of 'lock_pending'.
-                if not self.pending:
-                    self.lock_pending.acquire()
-                self.lock_mutex.release()
-                return next
-            #
-            # first check if all N threads are waiting here.
-            assert not self.finished
-            self.num_waiting_threads += 1
-            if self.num_waiting_threads == self.num_threads:
-                # yes, so finished!  unlock this to wake up the other
-                # threads, which are all waiting on the following acquire().
-                self.finished = True
-                self.lock_pending.release()
-            #
-            self.lock_mutex.release()
-            self.lock_pending.acquire()
-            self.lock_pending.release()
-            self.lock_mutex.acquire()
-            #
-            self.num_waiting_threads -= 1
-            if self.finished:
-                last_one_to_leave = self.num_waiting_threads == 0
-                self.lock_mutex.release()
-                if last_one_to_leave:
-                    self.lock_if_released_then_finished.release()
-                raise _Done
-
-    @staticmethod                             
-    def _do_it((f, args, kwds), got_exception):
-        # this is a staticmethod in order to make sure that we don't
-        # accidentally use 'self' in the atomic block.
-        try:                                  
-            hint_commit_soon()
-            with signals_enabled:
-                with atomic:
-                    if not got_exception:
-                        f(*args, **kwds)
-            hint_commit_soon()
-        except:
-            got_exception[:] = sys.exc_info()
 
 _thread_pool = _ThreadPool()
-
-
-class _Done(Exception):
-    pass
-
-
-class _ThreadLocal(thread._local):
-    def __init__(self):
-        self.pending = collections.deque()
-
-_thread_local = _ThreadLocal()
+_thread_local = thread._local()
 
 
 def XXXreport_abort_info(info):


More information about the pypy-commit mailing list