[pypy-commit] pypy stmgc-c7: Some lightweight redesign of the API, and more heavyweight redesign of the implementation
arigo
noreply at buildbot.pypy.org
Sat Jan 31 01:16:11 CET 2015
Author: Armin Rigo <arigo at tunes.org>
Branch: stmgc-c7
Changeset: r75596:8286e713d46c
Date: 2015-01-31 01:15 +0100
http://bitbucket.org/pypy/pypy/changeset/8286e713d46c/
Log: Some lightweight redesign of the API, and more heavyweight redesign
of the implementation into a version that should be more conflict-
free
diff --git a/lib_pypy/transaction.py b/lib_pypy/transaction.py
--- a/lib_pypy/transaction.py
+++ b/lib_pypy/transaction.py
@@ -43,57 +43,58 @@
def hint_commit_soon():
return None
-
-def set_num_threads(num):
- """Set the number of threads to use."""
- if num < 1:
- raise ValueError("'num' must be at least 1, got %r" % (num,))
- if _thread_pool.in_transaction:
- raise TransactionError("cannot change the number of threads "
- "while running transactions")
- _thread_pool.num_threads = num
+try:
+ from pypystm import getsegmentlimit
+except ImportError:
+ # Not a STM-enabled PyPy.
+ def getsegmentlimit():
+ return 1
class TransactionError(Exception):
pass
-# XXX right now uses the same API as the old pypy-stm. This will
-# be redesigned later.
-
def add(f, *args, **kwds):
- """Register the call 'f(*args, **kwds)' as running a new
- transaction. If we are currently running in a transaction too, the
- new transaction will only start after the end of the current
- transaction. Note that if the current transaction or another running
- in the meantime raises an exception, all pending transactions are
- cancelled.
+ """Register a new transaction that will be done by 'f(*args, **kwds)'.
+ Must be called within the transaction in the "with TransactionQueue()"
+ block, or within a transaction started by this one, directly or
+ indirectly.
"""
_thread_local.pending.append((f, args, kwds))
-def run():
- """Run the pending transactions, as well as all transactions started
- by them, and so on. The order is random and undeterministic. Must
- be called from the main program, i.e. not from within another
- transaction. If at some point all transactions are done, returns.
- If a transaction raises an exception, it propagates here; in this
- case all pending transactions are cancelled.
+class TransactionQueue(object):
+ """Use in 'with TransactionQueue():'. Creates a queue of
+ transactions. The first transaction in the queue is the content of
+ the 'with:' block, which is immediately started.
+
+ Any transaction can register new transactions that will be run
+ after the current one is finished, using the global function add().
"""
- tpool = _thread_pool
- if tpool.in_transaction:
- raise TransactionError("recursive invocation of transaction.run()")
- if not _thread_local.pending:
- return # nothing to do
- try:
- tpool.setup()
- tpool.run()
- finally:
- tpool.teardown()
- tpool.reraise()
-def number_of_transactions_in_last_run():
- return _thread_pool.transactions_run
+ def __init__(self, nb_segments=0):
+ if nb_segments <= 0:
+ nb_segments = getsegmentlimit()
+ _thread_pool.ensure_threads(nb_segments)
+
+ def __enter__(self):
+ if hasattr(_thread_local, "pending"):
+ raise TransactionError(
+ "recursive invocation of TransactionQueue()")
+ if is_atomic():
+ raise TransactionError(
+ "invocation of TransactionQueue() from an atomic context")
+ _thread_local.pending = []
+ atomic.__enter__()
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ atomic.__exit__(exc_type, exc_value, traceback)
+ pending = _thread_local.pending
+ del _thread_local.pending
+ if exc_type is None and len(pending) > 0:
+ _thread_pool.run(pending)
+
# ____________________________________________________________
@@ -101,152 +102,115 @@
class _ThreadPool(object):
def __init__(self):
- try:
- from pypystm import getsegmentlimit
- self.num_threads = getsegmentlimit()
- except ImportError:
- self.num_threads = 4
- self.in_transaction = False
- self.transactions_run = None
+ self.lock_running = thread.allocate_lock()
+ self.lock_done_running = thread.allocate_lock()
+ self.lock_done_running.acquire()
+ self.nb_threads = 0
+ self.deque = collections.deque()
+ self.locks = []
+ self.lock_deque = thread.allocate_lock()
+ self.exception = []
- def setup(self):
- # a mutex to protect parts of _grab_next_thing_to_do()
- self.lock_mutex = thread.allocate_lock()
- # this lock is released if and only if there are things to do in
- # 'self.pending'; both are modified together, with the lock_mutex.
- self.lock_pending = thread.allocate_lock()
- # this lock is released when we are finished at the end
- self.lock_if_released_then_finished = thread.allocate_lock()
- self.lock_if_released_then_finished.acquire()
+ def ensure_threads(self, n):
+ if n > self.nb_threads:
+ with self.lock_running:
+ for i in range(self.nb_threads, n):
+ assert len(self.locks) == self.nb_threads
+ self.nb_threads += 1
+ thread.start_new_thread(self.thread_runner, ())
+ # The newly started thread should run immediately into
+ # the case 'if len(self.locks) == self.nb_threads:'
+ # and release this lock. Wait until it does.
+ self.lock_done_running.acquire()
+
+ def run(self, pending):
+ # For now, can't run multiple threads with each an independent
+ # TransactionQueue(): they are serialized.
+ with self.lock_running:
+ assert self.exception == []
+ assert len(self.deque) == 0
+ deque = self.deque
+ with self.lock_deque:
+ deque.extend(pending)
+ try:
+ for i in range(len(pending)):
+ self.locks.pop().release()
+ except IndexError: # pop from empty list
+ pass
+ #
+ self.lock_done_running.acquire()
+ #
+ if self.exception:
+ exc_type, exc_value, exc_traceback = self.exception
+ del self.exception[:]
+ raise exc_type, exc_value, exc_traceback
+
+ def thread_runner(self):
+ deque = self.deque
+ lock = thread.allocate_lock()
+ lock.acquire()
+ pending = []
+ _thread_local.pending = pending
+ lock_deque = self.lock_deque
+ exception = self.exception
#
- self.pending = _thread_local.pending
- # there must be pending items at the beginning, which means that
- # 'lock_pending' can indeed be released
- assert self.pending
- _thread_local.pending = None
- #
- self.num_waiting_threads = 0
- self.transactions_run = 0
- self.finished = False
- self.got_exception = []
- self.in_transaction = True
+ while True:
+ #
+ # Look at the deque and try to fetch the next item on the left.
+ # If empty, we add our lock to the 'locks' list.
+ lock_deque.acquire()
+ if deque:
+ next_transaction = deque.popleft()
+ lock_deque.release()
+ else:
+ self.locks.append(lock)
+ if len(self.locks) == self.nb_threads:
+ self.lock_done_running.release()
+ lock_deque.release()
+ #
+ # Now wait until our lock is released.
+ lock.acquire()
+ continue
+ #
+ # Now we have a next_transaction. Run it.
+ assert len(pending) == 0
+ while True:
+ f, args, kwds = next_transaction
+ with atomic:
+ if len(exception) == 0:
+ try:
+ f(*args, **kwds)
+ except:
+ exception.extend(sys.exc_info())
+ del next_transaction
+ #
+ # If no new 'pending' transactions have been added, exit
+ # this loop and go back to fetch more from the deque.
+ if len(pending) == 0:
+ break
+ #
+ # If we have some new 'pending' transactions, add them
+ # to the right of the deque and pop the next one from
+ # the left. As we do this atomically with the
+ # 'lock_deque', we are sure that the deque cannot be
+ # empty before the popleft(). (We do that even when
+ # 'len(pending) == 1' instead of simply assigning the
+ # single item to 'next_transaction', because it looks
+ # like a good idea to preserve some first-in-first-out
+ # approximation.)
+ with self.lock_deque:
+ deque.extend(pending)
+ next_transaction = deque.popleft()
+ try:
+ for i in range(1, len(pending)):
+ self.locks.pop().release()
+ except IndexError: # pop from empty list
+ pass
+ del pending[:]
- def run(self):
- # start the N threads
- task_counters = [[0] for i in range(self.num_threads)]
- for counter in task_counters:
- thread.start_new_thread(self._run_thread, (counter,))
- # now wait. When we manage to acquire the following lock, then
- # we are finished.
- self.lock_if_released_then_finished.acquire()
- self.transactions_run = sum(x[0] for x in task_counters)
-
- def teardown(self):
- self.in_transaction = False
- self.pending = None
- self.lock_if_released_then_finished = None
- self.lock_pending = None
- self.lock_mutex = None
- _thread_local.pending = collections.deque()
-
- def reraise(self):
- exc = self.got_exception
- self.got_exception = None
- if exc:
- raise exc[0], exc[1], exc[2] # exception, value, traceback
-
- def _run_thread(self, counter):
- tloc_pending = _thread_local.pending
- got_exception = self.got_exception
- try:
- while True:
- self._do_it(self._grab_next_thing_to_do(tloc_pending),
- got_exception)
- counter[0] += 1
- except _Done:
- pass
-
- def _grab_next_thing_to_do(self, tloc_pending):
- if tloc_pending:
- # grab the next thing to do from the thread-local deque
- next = tloc_pending.popleft()
- # add the rest, if any, to the global 'pending'
- if tloc_pending:
- #
- self.lock_mutex.acquire()
- if not self.pending:
- # self.pending is empty so far, but we are adding stuff.
- # we have to release the following lock.
- self.lock_pending.release()
- self.pending.extend(tloc_pending)
- self.lock_mutex.release()
- #
- tloc_pending.clear()
- return next
- #
- self.lock_mutex.acquire()
- while True:
- try:
- next = self.pending.popleft()
- except IndexError:
- # self.pending is empty: wait until it no longer is.
- pass
- else:
- # self.pending was not empty. If now it is empty, then
- # fix the status of 'lock_pending'.
- if not self.pending:
- self.lock_pending.acquire()
- self.lock_mutex.release()
- return next
- #
- # first check if all N threads are waiting here.
- assert not self.finished
- self.num_waiting_threads += 1
- if self.num_waiting_threads == self.num_threads:
- # yes, so finished! unlock this to wake up the other
- # threads, which are all waiting on the following acquire().
- self.finished = True
- self.lock_pending.release()
- #
- self.lock_mutex.release()
- self.lock_pending.acquire()
- self.lock_pending.release()
- self.lock_mutex.acquire()
- #
- self.num_waiting_threads -= 1
- if self.finished:
- last_one_to_leave = self.num_waiting_threads == 0
- self.lock_mutex.release()
- if last_one_to_leave:
- self.lock_if_released_then_finished.release()
- raise _Done
-
- @staticmethod
- def _do_it((f, args, kwds), got_exception):
- # this is a staticmethod in order to make sure that we don't
- # accidentally use 'self' in the atomic block.
- try:
- hint_commit_soon()
- with signals_enabled:
- with atomic:
- if not got_exception:
- f(*args, **kwds)
- hint_commit_soon()
- except:
- got_exception[:] = sys.exc_info()
_thread_pool = _ThreadPool()
-
-
-class _Done(Exception):
- pass
-
-
-class _ThreadLocal(thread._local):
- def __init__(self):
- self.pending = collections.deque()
-
-_thread_local = _ThreadLocal()
+_thread_local = thread._local()
def XXXreport_abort_info(info):
More information about the pypy-commit
mailing list