[pypy-commit] pypy stmgc-c8: in-porgress: massively kill stuff in transaction.py thanks to the new queue
arigo
noreply at buildbot.pypy.org
Thu Jun 18 18:56:16 CEST 2015
Author: Armin Rigo <arigo at tunes.org>
Branch: stmgc-c8
Changeset: r78189:ad1b81c56740
Date: 2015-06-18 17:57 +0100
http://bitbucket.org/pypy/pypy/changeset/ad1b81c56740/
Log: in-porgress: massively kill stuff in transaction.py thanks to the
new queue
diff --git a/lib_pypy/pypy_test/test_transaction.py b/lib_pypy/pypy_test/test_transaction.py
--- a/lib_pypy/pypy_test/test_transaction.py
+++ b/lib_pypy/pypy_test/test_transaction.py
@@ -66,12 +66,14 @@
for x in range(N):
lsts = ([], [], [], [], [], [], [], [], [], [])
def do_stuff(i, j):
+ print 'do_stuff', i, j
lsts[i].append(j)
j += 1
if j < 5:
tq.add(do_stuff, i, j)
else:
lsts[i].append('foo')
+ print 'raising FooError!'
raise FooError
tq = transaction.TransactionQueue()
for i in range(10):
@@ -94,7 +96,8 @@
assert num_foos == 1, lsts
-def test_number_of_transactions_reported():
+# XXX reimplement or kill:
+def DONT_test_number_of_transactions_reported():
tq = transaction.TransactionQueue()
tq.add(lambda: None)
tq.add(lambda: None)
diff --git a/lib_pypy/transaction.py b/lib_pypy/transaction.py
--- a/lib_pypy/transaction.py
+++ b/lib_pypy/transaction.py
@@ -51,6 +51,12 @@
stmdict = dict
from time import time, clock
+try:
+ from pypystm import queue, Empty
+except ImportError:
+ from Queue import Queue as queue
+ from Queue import Empty
+
class stmidset(object):
def __init__(self):
self._hashtable = hashtable()
@@ -114,19 +120,12 @@
"""
def __init__(self):
- self._deque = collections.deque()
- self._pending = self._deque
- self._number_transactions_exec = 0
+ self._queue = queue()
def add(self, f, *args, **kwds):
"""Register a new transaction to be done by 'f(*args, **kwds)'.
"""
- # note: 'self._pending.append' can be two things here:
- # * if we are outside run(), it is the regular deque.append method;
- # * if we are inside run(), self._pending is a thread._local()
- # and then its append attribute is the append method of a
- # thread-local list.
- self._pending.append((f, args, kwds))
+ self._queue.put((f, args, kwds))
def add_generator(self, generator_iterator):
"""Register N new transactions to be done by a generator-iterator
@@ -144,141 +143,50 @@
def run(self, nb_segments=0):
"""Run all transactions, and all transactions started by these
ones, recursively, until the queue is empty. If one transaction
- raises, run() re-raises the exception and the unexecuted transaction
- are left in the queue.
+ raises, run() re-raises the exception.
"""
if is_atomic():
raise TransactionError(
"TransactionQueue.run() cannot be called in an atomic context")
- if not self._pending:
- return
if nb_segments <= 0:
nb_segments = getsegmentlimit()
- assert self._pending is self._deque, "broken state"
- try:
- self._pending = thread._local()
- lock_done_running = thread.allocate_lock()
- lock_done_running.acquire()
- lock_deque = thread.allocate_lock()
- locks = []
- exception = []
- args = (locks, lock_done_running, lock_deque,
- exception, nb_segments)
- #
- for i in range(nb_segments):
- thread.start_new_thread(self._thread_runner, args)
- #
- # The threads run here, and they will release this lock when
- # they are all finished.
- lock_done_running.acquire()
- #
- assert len(locks) == nb_segments
- for lock in locks:
- lock.release()
- #
- finally:
- self._pending = self._deque
+ self._exception = []
+ for i in range(nb_segments):
+ thread.start_new_thread(self._thread_runner, ())
#
- if exception:
- exc_type, exc_value, exc_traceback = exception
+ # The threads run here until queue.join() returns, i.e. until
+ # all add()ed transactions are executed.
+ self._queue.join()
+ #
+ for i in range(nb_segments):
+ self._queue.put((None, None, None))
+ #
+ if self._exception:
+ exc_type, exc_value, exc_traceback = self._exception
+ self._exception = None
raise exc_type, exc_value, exc_traceback
- def number_of_transactions_executed(self):
- if self._pending is self._deque:
- return self._number_transactions_exec
- raise TransactionError("TransactionQueue.run() is currently running")
+ #def number_of_transactions_executed(self):
+ # disabled for now
- def _thread_runner(self, locks, lock_done_running, lock_deque,
- exception, nb_segments):
- pending = []
- self._pending.append = pending.append
- deque = self._deque
- lock = thread.allocate_lock()
- lock.acquire()
- next_transaction = None
- count = [0]
- #
- def _pause_thread():
- self._number_transactions_exec += count[0]
- count[0] = 0
- locks.append(lock)
- if len(locks) == nb_segments:
- lock_done_running.release()
- lock_deque.release()
- #
- # Now wait until our lock is released.
- lock.acquire()
- return len(locks) == nb_segments
- #
- while not exception:
- assert next_transaction is None
- #
- # Look at the deque and try to fetch the next item on the left.
- # If empty, we add our lock to the 'locks' list.
- lock_deque.acquire()
- if deque:
- next_transaction = deque.popleft()
- lock_deque.release()
- else:
- if _pause_thread():
- return
- continue
- #
- # Now we have a next_transaction. Run it.
- assert len(pending) == 0
- while True:
- f, args, kwds = next_transaction
- # The next hint_commit_soon() is essential: without it, the
- # current transaction is short, so far, but includes everything
- # after some lock.acquire() done recently. That means that
- # anything we do in the atomic section will run with the lock
- # still acquired. This prevents any parallelization.
- hint_commit_soon()
+ def _thread_runner(self):
+ queue = self._queue
+ exception = self._exception
+ while True:
+ f, args, kwds = queue.get()
+ try:
+ if args is None:
+ break
with atomic:
- if exception:
- break
- next_transaction = None
- try:
- with signals_enabled:
- count[0] += 1
- f(*args, **kwds)
- except:
- exception.extend(sys.exc_info())
- break
- #
- # If no new 'pending' transactions have been added, exit
- # this loop and go back to fetch more from the deque.
- if len(pending) == 0:
- break
- #
- # If we have some new 'pending' transactions, add them
- # to the right of the deque and pop the next one from
- # the left. As we do this atomically with the
- # 'lock_deque', we are sure that the deque cannot be
- # empty before the popleft(). (We do that even when
- # 'len(pending) == 1' instead of simply assigning the
- # single item to 'next_transaction', because it looks
- # like a good idea to preserve some first-in-first-out
- # approximation.)
- with lock_deque:
- deque.extend(pending)
- next_transaction = deque.popleft()
- try:
- for i in range(1, len(pending)):
- locks.pop().release()
- except IndexError: # pop from empty list
- pass
- del pending[:]
- #
- # We exit here with an exception. Re-add 'next_transaction'
- # if it is not None.
- lock_deque.acquire()
- if next_transaction is not None:
- deque.appendleft(next_transaction)
- next_transaction = None
- while not _pause_thread():
- lock_deque.acquire()
+ if not exception:
+ try:
+ with signals_enabled:
+ f(*args, **kwds)
+ except:
+ exception.extend(sys.exc_info())
+ finally:
+ queue.task_done()
# ____________________________________________________________
More information about the pypy-commit
mailing list