[pypy-commit] pypy stmgc-c8: in-porgress: massively kill stuff in transaction.py thanks to the new queue

arigo noreply at buildbot.pypy.org
Thu Jun 18 18:56:16 CEST 2015


Author: Armin Rigo <arigo at tunes.org>
Branch: stmgc-c8
Changeset: r78189:ad1b81c56740
Date: 2015-06-18 17:57 +0100
http://bitbucket.org/pypy/pypy/changeset/ad1b81c56740/

Log:	in-porgress: massively kill stuff in transaction.py thanks to the
	new queue

diff --git a/lib_pypy/pypy_test/test_transaction.py b/lib_pypy/pypy_test/test_transaction.py
--- a/lib_pypy/pypy_test/test_transaction.py
+++ b/lib_pypy/pypy_test/test_transaction.py
@@ -66,12 +66,14 @@
     for x in range(N):
         lsts = ([], [], [], [], [], [], [], [], [], [])
         def do_stuff(i, j):
+            print 'do_stuff', i, j
             lsts[i].append(j)
             j += 1
             if j < 5:
                 tq.add(do_stuff, i, j)
             else:
                 lsts[i].append('foo')
+                print 'raising FooError!'
                 raise FooError
         tq = transaction.TransactionQueue()
         for i in range(10):
@@ -94,7 +96,8 @@
         assert num_foos == 1, lsts
 
 
-def test_number_of_transactions_reported():
+# XXX reimplement or kill:
+def DONT_test_number_of_transactions_reported():
     tq = transaction.TransactionQueue()
     tq.add(lambda: None)
     tq.add(lambda: None)
diff --git a/lib_pypy/transaction.py b/lib_pypy/transaction.py
--- a/lib_pypy/transaction.py
+++ b/lib_pypy/transaction.py
@@ -51,6 +51,12 @@
     stmdict = dict
     from time import time, clock
 
+try:
+    from pypystm import queue, Empty
+except ImportError:
+    from Queue import Queue as queue
+    from Queue import Empty
+
 class stmidset(object):
     def __init__(self):
         self._hashtable = hashtable()
@@ -114,19 +120,12 @@
     """
 
     def __init__(self):
-        self._deque = collections.deque()
-        self._pending = self._deque
-        self._number_transactions_exec = 0
+        self._queue = queue()
 
     def add(self, f, *args, **kwds):
         """Register a new transaction to be done by 'f(*args, **kwds)'.
         """
-        # note: 'self._pending.append' can be two things here:
-        # * if we are outside run(), it is the regular deque.append method;
-        # * if we are inside run(), self._pending is a thread._local()
-        #   and then its append attribute is the append method of a
-        #   thread-local list.
-        self._pending.append((f, args, kwds))
+        self._queue.put((f, args, kwds))
 
     def add_generator(self, generator_iterator):
         """Register N new transactions to be done by a generator-iterator
@@ -144,141 +143,50 @@
     def run(self, nb_segments=0):
         """Run all transactions, and all transactions started by these
         ones, recursively, until the queue is empty.  If one transaction
-        raises, run() re-raises the exception and the unexecuted transaction
-        are left in the queue.
+        raises, run() re-raises the exception.
         """
         if is_atomic():
             raise TransactionError(
                 "TransactionQueue.run() cannot be called in an atomic context")
-        if not self._pending:
-            return
         if nb_segments <= 0:
             nb_segments = getsegmentlimit()
 
-        assert self._pending is self._deque, "broken state"
-        try:
-            self._pending = thread._local()
-            lock_done_running = thread.allocate_lock()
-            lock_done_running.acquire()
-            lock_deque = thread.allocate_lock()
-            locks = []
-            exception = []
-            args = (locks, lock_done_running, lock_deque,
-                    exception, nb_segments)
-            #
-            for i in range(nb_segments):
-                thread.start_new_thread(self._thread_runner, args)
-            #
-            # The threads run here, and they will release this lock when
-            # they are all finished.
-            lock_done_running.acquire()
-            #
-            assert len(locks) == nb_segments
-            for lock in locks:
-                lock.release()
-            #
-        finally:
-            self._pending = self._deque
+        self._exception = []
+        for i in range(nb_segments):
+            thread.start_new_thread(self._thread_runner, ())
         #
-        if exception:
-            exc_type, exc_value, exc_traceback = exception
+        # The threads run here until queue.join() returns, i.e. until
+        # all add()ed transactions are executed.
+        self._queue.join()
+        #
+        for i in range(nb_segments):
+            self._queue.put((None, None, None))
+        #
+        if self._exception:
+            exc_type, exc_value, exc_traceback = self._exception
+            self._exception = None
             raise exc_type, exc_value, exc_traceback
 
-    def number_of_transactions_executed(self):
-        if self._pending is self._deque:
-            return self._number_transactions_exec
-        raise TransactionError("TransactionQueue.run() is currently running")
+    #def number_of_transactions_executed(self):
+    #    disabled for now
 
-    def _thread_runner(self, locks, lock_done_running, lock_deque,
-                       exception, nb_segments):
-        pending = []
-        self._pending.append = pending.append
-        deque = self._deque
-        lock = thread.allocate_lock()
-        lock.acquire()
-        next_transaction = None
-        count = [0]
-        #
-        def _pause_thread():
-            self._number_transactions_exec += count[0]
-            count[0] = 0
-            locks.append(lock)
-            if len(locks) == nb_segments:
-                lock_done_running.release()
-            lock_deque.release()
-            #
-            # Now wait until our lock is released.
-            lock.acquire()
-            return len(locks) == nb_segments
-        #
-        while not exception:
-            assert next_transaction is None
-            #
-            # Look at the deque and try to fetch the next item on the left.
-            # If empty, we add our lock to the 'locks' list.
-            lock_deque.acquire()
-            if deque:
-                next_transaction = deque.popleft()
-                lock_deque.release()
-            else:
-                if _pause_thread():
-                    return
-                continue
-            #
-            # Now we have a next_transaction.  Run it.
-            assert len(pending) == 0
-            while True:
-                f, args, kwds = next_transaction
-                # The next hint_commit_soon() is essential: without it, the
-                # current transaction is short, so far, but includes everything
-                # after some lock.acquire() done recently.  That means that
-                # anything we do in the atomic section will run with the lock
-                # still acquired.  This prevents any parallelization.
-                hint_commit_soon()
+    def _thread_runner(self):
+        queue = self._queue
+        exception = self._exception
+        while True:
+            f, args, kwds = queue.get()
+            try:
+                if args is None:
+                    break
                 with atomic:
-                    if exception:
-                        break
-                    next_transaction = None
-                    try:
-                        with signals_enabled:
-                            count[0] += 1
-                            f(*args, **kwds)
-                    except:
-                        exception.extend(sys.exc_info())
-                        break
-                #
-                # If no new 'pending' transactions have been added, exit
-                # this loop and go back to fetch more from the deque.
-                if len(pending) == 0:
-                    break
-                #
-                # If we have some new 'pending' transactions, add them
-                # to the right of the deque and pop the next one from
-                # the left.  As we do this atomically with the
-                # 'lock_deque', we are sure that the deque cannot be
-                # empty before the popleft().  (We do that even when
-                # 'len(pending) == 1' instead of simply assigning the
-                # single item to 'next_transaction', because it looks
-                # like a good idea to preserve some first-in-first-out
-                # approximation.)
-                with lock_deque:
-                    deque.extend(pending)
-                    next_transaction = deque.popleft()
-                    try:
-                        for i in range(1, len(pending)):
-                            locks.pop().release()
-                    except IndexError:     # pop from empty list
-                        pass
-                del pending[:]
-        #
-        # We exit here with an exception.  Re-add 'next_transaction'
-        # if it is not None.
-        lock_deque.acquire()
-        if next_transaction is not None:
-            deque.appendleft(next_transaction)
-            next_transaction = None
-        while not _pause_thread():
-            lock_deque.acquire()
+                    if not exception:
+                        try:
+                            with signals_enabled:
+                                f(*args, **kwds)
+                        except:
+                            exception.extend(sys.exc_info())
+            finally:
+                queue.task_done()
 
 
 # ____________________________________________________________


More information about the pypy-commit mailing list