[pypy-commit] pypy stm-thread: Tentatively add a 'transaction' module in pure Python.

arigo noreply at buildbot.pypy.org
Thu May 10 23:48:04 CEST 2012

Author: Armin Rigo <arigo at tunes.org>
Branch: stm-thread
Changeset: r55018:08b2a6872d62
Date: 2012-05-10 23:47 +0200

Log:	Tentatively add a 'transaction' module in pure Python. For now with
	the same API as the old 'transaction' module. Minimal tests too;
	will need to test more in depth.

diff --git a/lib_pypy/pypy_test/test_transaction.py b/lib_pypy/pypy_test/test_transaction.py
new file mode 100644
--- /dev/null
+++ b/lib_pypy/pypy_test/test_transaction.py
@@ -0,0 +1,40 @@
+from lib_pypy import transaction
+N = 1000
+def test_simple_random_order():
+    for x in range(N):
+        lst = []
+        for i in range(10):
+            transaction.add(lst.append, i)
+        transaction.run()
+        print lst
+        assert sorted(lst) == range(10)
+def test_simple_fixed_order():
+    for x in range(N):
+        lst = []
+        def do_stuff(i):
+            lst.append(i)
+            i += 1
+            if i < 10:
+                transaction.add(do_stuff, i)
+        transaction.add(do_stuff, 0)
+        transaction.run()
+        print lst
+        assert lst == range(10)
+def test_simple_random_and_fixed_order():
+    for x in range(N):
+        lsts = ([], [], [], [], [])
+        def do_stuff(i, j):
+            lsts[i].append(j)
+            j += 1
+            if j < 10:
+                transaction.add(do_stuff, i, j)
+        for i in range(5):
+            transaction.add(do_stuff, i, 0)
+        transaction.run()
+        print lsts
+        assert lsts == (range(10),) * 5
diff --git a/lib_pypy/transaction.py b/lib_pypy/transaction.py
new file mode 100644
--- /dev/null
+++ b/lib_pypy/transaction.py
@@ -0,0 +1,215 @@
+Higher-level constructs to use multiple cores on a pypy-stm.
+Internally based on threads, this module should hide them completely and
+give a simple-to-use API.
+from __future__ import with_statement
+import thread, collections
+    from thread import atomic
+except ImportError:
+    # Not a STM-enabled PyPy.  We can still provide a version of 'atomic'
+    # that is good enough for our purposes.  An atomic block in one thread
+    # will not prevent running in all other threads not within an atomic
+    # block.
+    _atomic_lock = thread.allocate_lock()
+    class _Atomic(object):
+        def __enter__(self):
+            _atomic_lock.acquire()
+        def __exit__(self, *args):
+            _atomic_lock.release()
+    atomic = _Atomic()
+def set_num_threads(self, num):
+    """Set the number of threads to use."""
+    if num < 1:
+        raise ValueError("'num' must be at least 1, got %r" % (num,))
+    if _thread_pool.in_transaction:
+        raise TransactionError("cannot change the number of threads "
+                               "while running transactions")
+    _thread_pool.num_threads = num
+class TransactionError(Exception):
+    pass
+# XXX right now uses the same API as the old pypy-stm.  This will
+# be redesigned later.
+def add(f, *args, **kwds):
+    """Register the call 'f(*args, **kwds)' as running a new
+    transaction.  If we are currently running in a transaction too, the
+    new transaction will only start after the end of the current
+    transaction.  Note that if the current transaction or another running
+    in the meantime raises an exception, all pending transactions are
+    cancelled.
+    """
+    _thread_local.pending.append((f, args, kwds))
+def run():
+    """Run the pending transactions, as well as all transactions started
+    by them, and so on.  The order is random and undeterministic.  Must
+    be called from the main program, i.e. not from within another
+    transaction.  If at some point all transactions are done, returns.
+    If a transaction raises an exception, it propagates here; in this
+    case all pending transactions are cancelled.
+    """
+    tpool = _thread_pool
+    if tpool.in_transaction:
+        raise TransactionError("recursive invocation of transaction.run()")
+    if not _thread_local.pending:
+        return     # nothing to do
+    try:
+        tpool.setup()
+        tpool.run()
+    finally:
+        tpool.teardown()
+    tpool.reraise()
+# ____________________________________________________________
+class _ThreadPool(object):
+    def __init__(self):
+        self.num_threads = 4    # XXX default value, tweak
+        self.in_transaction = False
+    def setup(self):
+        # a mutex to protect parts of _grab_next_thing_to_do()
+        self.lock_mutex = thread.allocate_lock()
+        # this lock is released if and only if there are things to do in
+        # 'self.pending'; both are modified together, with the lock_mutex.
+        self.lock_pending = thread.allocate_lock()
+        # this lock is released when we are finished at the end
+        self.lock_if_released_then_finished = thread.allocate_lock()
+        self.lock_if_released_then_finished.acquire()
+        #
+        self.pending = _thread_local.pending
+        # there must be pending items at the beginning, which means that
+        # 'lock_pending' can indeed be released
+        assert self.pending
+        _thread_local.pending = None
+        #
+        self.num_waiting_threads = 0
+        self.finished = False
+        self.got_exception = []
+        self.in_transaction = True
+    def run(self):
+        # start the N threads
+        for i in range(self.num_threads):
+            thread.start_new_thread(self._run_thread, ())
+        # now wait.  When we manage to acquire the following lock, then
+        # we are finished.
+        self.lock_if_released_then_finished.acquire()
+    def teardown(self):
+        self.in_transaction = False
+        self.pending = None
+        self.lock_if_released_then_finished = None
+        self.lock_pending = None
+        self.lock_mutex = None
+        _thread_local.pending = collections.deque()
+    def reraise(self):
+        exc = self.got_exception
+        self.got_exception = None
+        if exc:
+            raise exc[0], exc[1], exc[2]    # exception, value, traceback
+    def _run_thread(self):
+        tloc_pending = _thread_local.pending
+        got_exception = self.got_exception
+        try:
+            while True:
+                self._do_it(self._grab_next_thing_to_do(tloc_pending),
+                            got_exception)
+        except _Done:
+            pass
+    def _grab_next_thing_to_do(self, tloc_pending):
+        if tloc_pending:
+            # grab the next thing to do from the thread-local deque
+            next = tloc_pending.popleft()
+            # add the rest, if any, to the global 'pending'
+            if tloc_pending:
+                #
+                self.lock_mutex.acquire()
+                if not self.pending:
+                    # self.pending is empty so far, but we are adding stuff.
+                    # we have to release the following lock.
+                    self.lock_pending.release()
+                self.pending.extend(tloc_pending)
+                self.lock_mutex.release()
+                #
+                tloc_pending.clear()
+            return next
+        #
+        self.lock_mutex.acquire()
+        while True:
+            try:
+                next = self.pending.popleft()
+            except IndexError:
+                # self.pending is empty: wait until it no longer is.
+                pass
+            else:
+                # self.pending was not empty.  If now it is empty, then
+                # fix the status of 'lock_pending'.
+                if not self.pending:
+                    self.lock_pending.acquire()
+                self.lock_mutex.release()
+                return next
+            #
+            # first check if all N threads are waiting here.
+            self.num_waiting_threads += 1
+            if self.num_waiting_threads == self.num_threads:
+                # yes, so finished!  unlock this to wake up the other
+                # threads, which are all waiting on the following acquire().
+                self.finished = True
+                self.lock_pending.release()
+            #
+            self.lock_mutex.release()
+            self.lock_pending.acquire()
+            self.lock_pending.release()
+            self.lock_mutex.acquire()
+            #
+            self.num_waiting_threads -= 1
+            if self.finished:
+                if self.num_waiting_threads == 0:    # last one to leave?
+                    self.lock_if_released_then_finished.release()
+                self.lock_mutex.release()
+                raise _Done
+    @staticmethod
+    def _do_it((f, args, kwds), got_exception):
+        # this is a staticmethod in order to make sure that we don't
+        # accidentally use 'self' in the atomic block.
+        with atomic:
+            if got_exception:
+                return    # return early if already an exception to reraise
+            try:
+                f(*args, **kwds)
+            except:
+                got_exception[:] = sys.exc_info()
+_thread_pool = _ThreadPool()
+class _Done(Exception):
+    pass
+class _ThreadLocal(thread._local):
+    def __init__(self):
+        self.pending = collections.deque()
+_thread_local = _ThreadLocal()

More information about the pypy-commit mailing list