[pypy-commit] pypy stm-gc: In-progress
arigo
noreply at buildbot.pypy.org
Sun Apr 22 15:05:31 CEST 2012
Author: Armin Rigo <arigo at tunes.org>
Branch: stm-gc
Changeset: r54616:3b5630b98667
Date: 2012-04-22 15:05 +0200
http://bitbucket.org/pypy/pypy/changeset/3b5630b98667/
Log: In-progress
diff --git a/pypy/rlib/rstm.py b/pypy/rlib/rstm.py
--- a/pypy/rlib/rstm.py
+++ b/pypy/rlib/rstm.py
@@ -2,7 +2,8 @@
from pypy.rpython.lltypesystem.lloperation import llop
from pypy.rpython.annlowlevel import llhelper, cast_instance_to_base_ptr
from pypy.rpython.annlowlevel import base_ptr_lltype, cast_base_ptr_to_instance
-from pypy.rlib.objectmodel import keepalive_until_here
+from pypy.rlib.objectmodel import keepalive_until_here, we_are_translated
+from pypy.rlib.debug import ll_assert
from pypy.translator.stm.stmgcintf import StmOperations
@@ -21,15 +22,28 @@
raise NotImplementedError
+def stm_operations():
+ if we_are_translated():
+ return StmOperations
+ else:
+ from pypy.rlib.test.test_rstm import fake_stm_operations
+ return fake_stm_operations
+
+
+def in_transaction():
+ return bool(stm_operations().in_transaction())
+
+
def run_all_transactions(initial_transaction,
num_threads = NUM_THREADS_DEFAULT):
- if StmOperations.in_transaction():
+ if in_transaction():
raise TransactionError("nested call to rstm.run_all_transactions()")
#
_transactionalstate.initialize()
#
# Tell the GC we are entering transactional mode. This makes
# sure that 'initial_transaction' is flagged as GLOBAL.
+ # (Actually it flags all surviving objects as GLOBAL.)
# No more GC operation afterwards!
llop.stm_enter_transactional_mode(lltype.Void)
#
@@ -39,31 +53,53 @@
# is no possibility of having a GC collection inbetween.
keepalive_until_here(initial_transaction)
#
+ # The following line causes the _run_transaction() function to be
+ # generated in the C source with a specific signature, where it
+ # can be called by the C code.
+ llhelper(StmOperations.RUN_TRANSACTION, _run_transaction)
+ #
# Tell the C code to run all transactions.
- callback = llhelper(_CALLBACK, _run_transaction)
ptr = _cast_transaction_to_voidp(initial_transaction)
- StmOperations.run_all_transactions(callback, ptr, num_threads)
+ stm_operations().run_all_transactions(ptr, num_threads)
#
# Tell the GC we are leaving transactional mode.
llop.stm_leave_transactional_mode(lltype.Void)
#
+ # Hack
+ if not we_are_translated():
+ stm_operations().leaving()
+ #
# If an exception was raised, re-raise it here.
_transactionalstate.close_exceptions()
-_CALLBACK = lltype.Ptr(lltype.FuncType([rffi.VOIDP, lltype.Signed],
- rffi.VOIDP))
-
def _cast_transaction_to_voidp(transaction):
- ptr = cast_instance_to_base_ptr(transaction)
- return lltype.cast_pointer(rffi.VOIDP, ptr)
+ if we_are_translated():
+ ptr = cast_instance_to_base_ptr(transaction)
+ return rffi.cast(rffi.VOIDP, ptr)
+ else:
+ return stm_operations().cast_transaction_to_voidp(transaction)
def _cast_voidp_to_transaction(transactionptr):
- ptr = lltype.cast_pointer(base_ptr_lltype(), transactionptr)
- return cast_base_ptr_to_instance(Transaction, ptr)
+ if we_are_translated():
+ ptr = rffi.cast(base_ptr_lltype(), transactionptr)
+ return cast_base_ptr_to_instance(Transaction, ptr)
+ else:
+ return stm_operations().cast_voidp_to_transaction(transactionptr)
class _TransactionalState(object):
+ """This is the class of a global singleton, seen by every transaction.
+ Used for cross-transaction synchronization. Of course writing to it
+ will likely cause conflicts. Reserved for now for storing the
+ exception that must be re-raised by run_all_transactions().
+ """
+ # The logic ensures that once a transaction calls must_reraise_exception()
+ # and commits, all uncommitted transactions will abort (because they have
+ # read '_reraise_exception' when they started) and then, when they retry,
+ # do nothing. This makes the transaction committing an exception the last
+ # one to commit, and it cleanly shuts down all other pending transactions.
+
def initialize(self):
self._reraise_exception = None
@@ -73,6 +109,8 @@
def must_reraise_exception(self, got_exception):
self._got_exception = got_exception
self._reraise_exception = self.reraise_exception_callback
+ if not we_are_translated():
+ import sys; self._got_tb = sys.exc_info()[2]
def close_exceptions(self):
if self._reraise_exception is not None:
@@ -80,8 +118,11 @@
@staticmethod
def reraise_exception_callback():
- exc = _transactionalstate._got_exception
+ self = _transactionalstate
+ exc = self._got_exception
self._got_exception = None
+ if not we_are_translated() and hasattr(self, '_got_tb'):
+ raise exc.__class__, exc, self._got_tb
raise exc
_transactionalstate = _TransactionalState()
diff --git a/pypy/rlib/test/test_rstm.py b/pypy/rlib/test/test_rstm.py
--- a/pypy/rlib/test/test_rstm.py
+++ b/pypy/rlib/test/test_rstm.py
@@ -1,101 +1,90 @@
-import os, thread, time
-from pypy.rlib.debug import debug_print, ll_assert, fatalerror
+import random
+from pypy.rpython.lltypesystem import lltype, rffi
from pypy.rlib import rstm
-from pypy.rpython.annlowlevel import llhelper
-from pypy.translator.stm.test.support import CompiledSTMTests
-from pypy.module.thread import ll_thread
-class Arg(object):
- pass
-arg_list = [Arg() for i in range(10)]
+class FakeStmOperations:
+ _in_transaction = 0
+ _mapping = {}
-def setx(arg, retry_counter):
- debug_print(arg.x)
- assert rstm._debug_get_state() == 1
- if arg.x == 303:
- # this will trigger stm_become_inevitable()
- os.write(1, "hello\n")
- assert rstm._debug_get_state() == 2
- arg.x = 42
+ def in_transaction(self):
+ return self._in_transaction
-def stm_perform_transaction(done=None, i=0):
- ll_assert(rstm._debug_get_state() == -2, "bad debug_get_state (1)")
- rstm.descriptor_init()
- arg = arg_list[i]
- if done is None:
- arg.x = 202
- else:
- arg.x = done.initial_x
- ll_assert(rstm._debug_get_state() == 0, "bad debug_get_state (2)")
- rstm.perform_transaction(setx, Arg, arg)
- ll_assert(rstm._debug_get_state() == 0, "bad debug_get_state (3)")
- ll_assert(arg.x == 42, "bad arg.x")
- if done is not None:
- ll_thread.release_NOAUTO(done.finished_lock)
- rstm.descriptor_done()
- ll_assert(rstm._debug_get_state() == -2, "bad debug_get_state (4)")
+ def _add(self, transactionptr):
+ r = random.random()
+ assert r not in self._pending # very bad luck if it is
+ self._pending[r] = transactionptr
-def test_stm_multiple_threads():
- ok = []
- def f(i):
- stm_perform_transaction(i=i)
- ok.append(i)
- rstm.enter_transactional_mode()
- for i in range(10):
- thread.start_new_thread(f, (i,))
- timeout = 10
- while len(ok) < 10:
- time.sleep(0.1)
- timeout -= 0.1
- assert timeout >= 0.0, "timeout!"
- rstm.leave_transactional_mode()
- assert sorted(ok) == range(10)
+ def run_all_transactions(self, initial_transaction_ptr, num_threads=4):
+ self._pending = {}
+ self._add(initial_transaction_ptr)
+ while self._pending:
+ r, transactionptr = self._pending.popitem()
+ transaction = self.cast_voidp_to_transaction(transactionptr)
+ transaction._next_transaction = None
+ nextptr = rstm._run_transaction(transactionptr, 0)
+ next = self.cast_voidp_to_transaction(nextptr)
+ while next is not None:
+ self._add(self.cast_transaction_to_voidp(next))
+ next = next._next_transaction
+ del self._pending
+ def cast_transaction_to_voidp(self, transaction):
+ if transaction is None:
+ return lltype.nullptr(rffi.VOIDP.TO)
+ assert isinstance(transaction, rstm.Transaction)
+ num = 10000 + len(self._mapping)
+ self._mapping[num] = transaction
+ return rffi.cast(rffi.VOIDP, num)
-class TestTransformSingleThread(CompiledSTMTests):
+ def cast_voidp_to_transaction(self, transactionptr):
+ if not transactionptr:
+ return None
+ num = rffi.cast(lltype.Signed, transactionptr)
+ return self._mapping[num]
- def test_no_pointer_operations(self):
- def simplefunc(argv):
- i = 0
- while i < 100:
- i += 3
- debug_print(i)
- return 0
- t, cbuilder = self.compile(simplefunc)
- dataout, dataerr = cbuilder.cmdexec('', err=True)
- assert dataout == ''
- assert '102' in dataerr.splitlines()
+ def leaving(self):
+ self._mapping.clear()
- def build_perform_transaction(self):
- class Done: done = False
- done = Done()
- def g():
- stm_perform_transaction(done)
- def f(argv):
- done.initial_x = int(argv[1])
- assert rstm._debug_get_state() == -1 # main thread
- done.finished_lock = ll_thread.allocate_ll_lock()
- ll_thread.acquire_NOAUTO(done.finished_lock, True)
- #
- rstm.enter_transactional_mode()
- #
- llcallback = llhelper(ll_thread.CALLBACK, g)
- ident = ll_thread.c_thread_start_NOGIL(llcallback)
- ll_thread.acquire_NOAUTO(done.finished_lock, True)
- #
- rstm.leave_transactional_mode()
- return 0
- t, cbuilder = self.compile(f)
- return cbuilder
+fake_stm_operations = FakeStmOperations()
- def test_perform_transaction(self):
- cbuilder = self.build_perform_transaction()
- #
- dataout, dataerr = cbuilder.cmdexec('202', err=True)
- assert dataout == ''
- assert '202' in dataerr.splitlines()
- #
- dataout, dataerr = cbuilder.cmdexec('303', err=True)
- assert 'hello' in dataout.splitlines()
- assert '303' in dataerr.splitlines()
+
+def test_in_transaction():
+ res = rstm.in_transaction()
+ assert res is False
+
+def test_run_all_transactions_minimal():
+ seen = []
+ class Empty(rstm.Transaction):
+ def run(self):
+ seen.append(42)
+ rstm.run_all_transactions(Empty())
+ assert seen == [42]
+
+def test_run_all_transactions_recursive():
+ seen = []
+ class DoInOrder(rstm.Transaction):
+ def run(self):
+ assert self._next_transaction is None
+ if len(seen) < 10:
+ seen.append(len(seen))
+ return [self]
+ rstm.run_all_transactions(DoInOrder())
+ assert seen == range(10)
+
+def test_run_all_transactions_random_order():
+ seen = []
+ class AddToSeen(rstm.Transaction):
+ def run(self):
+ seen.append(self.value)
+ class DoInOrder(rstm.Transaction):
+ count = 0
+ def run(self):
+ assert self._next_transaction is None
+ if self.count < 50:
+ other = AddToSeen()
+ other.value = self.count
+ self.count += 1
+ return [self, other]
+ rstm.run_all_transactions(DoInOrder())
+ assert seen != range(50) and sorted(seen) == range(50)
diff --git a/pypy/rpython/lltypesystem/lloperation.py b/pypy/rpython/lltypesystem/lloperation.py
--- a/pypy/rpython/lltypesystem/lloperation.py
+++ b/pypy/rpython/lltypesystem/lloperation.py
@@ -403,14 +403,12 @@
'stm_getarrayitem': LLOp(sideeffects=False, canrun=True),
'stm_getinteriorfield': LLOp(sideeffects=False, canrun=True),
'stm_become_inevitable': LLOp(),
- 'stm_descriptor_init': LLOp(canrun=True),
- 'stm_descriptor_done': LLOp(canrun=True),
'stm_enter_transactional_mode': LLOp(canrun=True, canmallocgc=True),
'stm_leave_transactional_mode': LLOp(canrun=True, canmallocgc=True),
- 'stm_writebarrier': LLOp(sideeffects=False),
+ 'stm_writebarrier': LLOp(),
'stm_normalize_global': LLOp(),
'stm_start_transaction': LLOp(canrun=True, canmallocgc=True),
- 'stm_commit_transaction': LLOp(canrun=True, canmallocgc=True),
+ 'stm_stop_transaction': LLOp(canrun=True, canmallocgc=True),
# __________ address operations __________
diff --git a/pypy/rpython/lltypesystem/opimpl.py b/pypy/rpython/lltypesystem/opimpl.py
--- a/pypy/rpython/lltypesystem/opimpl.py
+++ b/pypy/rpython/lltypesystem/opimpl.py
@@ -625,17 +625,19 @@
def op_stm_descriptor_init():
# for direct testing only
+ xxx
from pypy.translator.stm import stmgcintf
stmgcintf.StmOperations.set_tls(llmemory.NULL, 0)
def op_stm_descriptor_done():
+ xxx
from pypy.translator.stm import stmgcintf
stmgcintf.StmOperations.del_tls()
def op_stm_start_transaction():
pass
-def op_stm_commit_transaction():
+def op_stm_stop_transaction():
pass
def op_stm_enter_transactional_mode():
diff --git a/pypy/translator/stm/src_stm/core.c b/pypy/translator/stm/src_stm/core.c
new file mode 100644
--- /dev/null
+++ b/pypy/translator/stm/src_stm/core.c
@@ -0,0 +1,668 @@
+/* -*- c-basic-offset: 2 -*- */
+
+/************************************************************/
+
+#define ABORT_REASONS 8
+#define SPINLOOP_REASONS 10
+
+struct tx_descriptor {
+ void *rpython_tls_object;
+ jmp_buf *setjmp_buf;
+ owner_version_t start_time;
+ owner_version_t end_time;
+ /*unsigned long last_known_global_timestamp;*/
+ owner_version_t my_lock_word;
+ struct OrecList reads;
+ unsigned num_commits;
+ unsigned num_aborts[ABORT_REASONS];
+ unsigned num_spinloops[SPINLOOP_REASONS];
+ /*unsigned int spinloop_counter;*/
+ struct RedoLog redolog; /* last item, because it's the biggest one */
+};
+
+/* global_timestamp contains in its lowest bit a flag equal to 1
+ if there is an inevitable transaction running */
+static volatile unsigned long global_timestamp = 2;
+static __thread struct tx_descriptor *thread_descriptor = NULL;
+static __thread struct tx_descriptor *active_thread_descriptor = NULL;
+static long (*rpython_get_size)(void*);
+
+/************************************************************/
+
+static unsigned long get_global_timestamp(struct tx_descriptor *d)
+{
+ return (/*d->last_known_global_timestamp =*/ global_timestamp);
+}
+
+static _Bool change_global_timestamp(struct tx_descriptor *d,
+ unsigned long old,
+ unsigned long new)
+{
+ if (bool_cas(&global_timestamp, old, new))
+ {
+ /*d->last_known_global_timestamp = new;*/
+ return 1;
+ }
+ return 0;
+}
+
+static void set_global_timestamp(struct tx_descriptor *d, unsigned long new)
+{
+ global_timestamp = new;
+ /*d->last_known_global_timestamp = new;*/
+}
+
+static void tx_abort(int);
+
+static void tx_spinloop(int num)
+{
+ unsigned int c;
+ int i;
+ struct tx_descriptor *d = active_thread_descriptor;
+ d->num_spinloops[num]++;
+
+ //printf("tx_spinloop(%d)\n", num);
+
+#if 0
+ c = d->spinloop_counter;
+ d->spinloop_counter = c * 9;
+ i = c & 0xff0000;
+ while (i >= 0) {
+ spinloop();
+ i -= 0x10000;
+ }
+#else
+ spinloop();
+#endif
+}
+
+static _Bool is_inevitable(struct tx_descriptor *d)
+{
+ return d->setjmp_buf == NULL;
+}
+
+/*** run the redo log to commit a transaction, and release the locks */
+static void tx_redo(struct tx_descriptor *d)
+{
+ owner_version_t newver = d->end_time;
+ wlog_t *item;
+ REDOLOG_LOOP_FORWARD(d->redolog, item)
+ {
+ void *globalobj = item->addr;
+ void *localobj = item->val;
+ long size = rpython_get_size(localobj);
+ memcpy(((char *)globalobj) + sizeof(orec_t),
+ ((char *)localobj) + sizeof(orec_t),
+ size - sizeof(orec_t));
+ /* unlock the orec */
+ volatile orec_t* o = get_orec(globalobj);
+ CFENCE;
+ o->version = newver;
+ } REDOLOG_LOOP_END;
+}
+
+/*** on abort, release locks and restore the old version number. */
+static void releaseAndRevertLocks(struct tx_descriptor *d)
+{
+ wlog_t *item;
+ REDOLOG_LOOP_FORWARD(d->redolog, item)
+ {
+ if (item->p != -1)
+ {
+ volatile orec_t* o = get_orec(item->addr);
+ o->version = item->p;
+ }
+ } REDOLOG_LOOP_END;
+}
+
+/*** release locks and restore the old version number, ready to retry later */
+static void releaseLocksForRetry(struct tx_descriptor *d)
+{
+ wlog_t *item;
+ REDOLOG_LOOP_FORWARD(d->redolog, item)
+ {
+ volatile orec_t* o = get_orec(item->addr);
+ assert(item->p != -1);
+ o->version = item->p;
+ item->p = -1;
+ } REDOLOG_LOOP_END;
+}
+
+/*** lock all locations */
+static void acquireLocks(struct tx_descriptor *d)
+{
+ wlog_t *item;
+ // try to lock every location in the write set
+ REDOLOG_LOOP_BACKWARD(d->redolog, item)
+ {
+ // get orec, read its version#
+ volatile orec_t* o = get_orec(item->addr);
+ owner_version_t ovt;
+
+ retry:
+ ovt = o->version;
+
+ // if orec not locked, lock it
+ //
+ // NB: if ovt > start time, we may introduce inconsistent
+ // reads. Since most writes are also reads, we'll just abort under this
+ // condition. This can introduce false conflicts
+ if (!IS_LOCKED_OR_NEWER(ovt, d->start_time)) {
+ if (!bool_cas(&o->version, ovt, d->my_lock_word))
+ goto retry;
+ // save old version to item->p. Now we hold the lock.
+ item->p = ovt;
+ }
+ // else if the location is too recent...
+ else if (!IS_LOCKED(ovt))
+ tx_abort(0);
+ // else it is locked: check it's not by me
+ else {
+ assert(ovt != d->my_lock_word);
+ // we can either abort or spinloop. Because we are at the end of
+ // the transaction we might try to spinloop, even though after the
+ // lock is released the ovt will be very recent, possibly
+ // > d->start_time. It is necessary to spinloop in case we are
+ // inevitable, so use that as a criteria. Another solution to avoid
+ // deadlocks would be to sort the order in which we take the locks.
+ if (is_inevitable(d))
+ tx_spinloop(8);
+ else
+ tx_abort(6);
+ goto retry;
+ }
+ } REDOLOG_LOOP_END;
+}
+
+static void common_cleanup(struct tx_descriptor *d)
+{
+ d->reads.size = 0;
+ redolog_clear(&d->redolog);
+}
+
+static void tx_cleanup(struct tx_descriptor *d)
+{
+ // release the locks and restore version numbers
+ releaseAndRevertLocks(d);
+ // reset all lists
+ common_cleanup(d);
+}
+
+static void tx_restart(struct tx_descriptor *d)
+{
+ tx_cleanup(d);
+ tx_spinloop(0);
+ longjmp(*d->setjmp_buf, 1);
+}
+
+/*** increase the abort count and restart the transaction */
+static void tx_abort(int reason)
+{
+ struct tx_descriptor *d = active_thread_descriptor;
+ assert(!is_inevitable(d));
+ d->num_aborts[reason]++;
+#ifdef RPY_STM_DEBUG_PRINT
+ PYPY_DEBUG_START("stm-abort");
+ if (PYPY_HAVE_DEBUG_PRINTS)
+ fprintf(PYPY_DEBUG_FILE, "thread %lx aborting %d\n",
+ (long)pthread_self(), reason);
+ PYPY_DEBUG_STOP("stm-abort");
+#endif
+ tx_restart(d);
+}
+
+/**
+ * fast-path validation, assuming that I don't hold locks.
+ */
+static void validate_fast(struct tx_descriptor *d, int lognum)
+{
+ int i;
+ owner_version_t ovt;
+ assert(!is_inevitable(d));
+ for (i=0; i<d->reads.size; i++)
+ {
+ retry:
+ ovt = d->reads.items[i]->version;
+ if (IS_LOCKED_OR_NEWER(ovt, d->start_time))
+ {
+ // If locked, we wait until it becomes unlocked. The chances are
+ // that it will then have a very recent start_time, likely
+ // > d->start_time, but it might still be better than always aborting
+ if (IS_LOCKED(ovt))
+ {
+ tx_spinloop(lognum); /* tx_spinloop(1), tx_spinloop(2),
+ tx_spinloop(3) */
+ goto retry;
+ }
+ else
+ // abort if the timestamp is newer than my start time.
+ tx_abort(lognum); /* tx_abort(1), tx_abort(2), tx_abort(3) */
+ }
+ }
+}
+
+/**
+ * validate the read set by making sure that all orecs that we've read have
+ * timestamps at least as old as our start time, unless we locked those orecs.
+ */
+static void validate(struct tx_descriptor *d)
+{
+ int i;
+ owner_version_t ovt;
+ assert(!is_inevitable(d));
+ for (i=0; i<d->reads.size; i++)
+ {
+ ovt = d->reads.items[i]->version; // read this orec
+ if (IS_LOCKED_OR_NEWER(ovt, d->start_time))
+ {
+ if (!IS_LOCKED(ovt))
+ // if unlocked and newer than start time, abort
+ tx_abort(4);
+ else
+ {
+ // if locked and not by me, abort
+ if (ovt != d->my_lock_word)
+ tx_abort(5);
+ }
+ }
+ }
+}
+
+#ifdef USE_PTHREAD_MUTEX
+/* mutex: only to avoid busy-looping too much in tx_spinloop() below */
+static pthread_mutex_t mutex_inevitable = PTHREAD_MUTEX_INITIALIZER;
+# ifdef RPY_STM_ASSERT
+static unsigned long locked_by = 0;
+static void mutex_lock(void)
+{
+ unsigned long pself = (unsigned long)pthread_self();
+#ifdef RPY_STM_DEBUG_PRINT
+ if (PYPY_HAVE_DEBUG_PRINTS) fprintf(PYPY_DEBUG_FILE,
+ "%lx: mutex inev locking...\n", pself);
+#endif
+ assert(locked_by != pself);
+ pthread_mutex_lock(&mutex_inevitable);
+ locked_by = pself;
+#ifdef RPY_STM_DEBUG_PRINT
+ if (PYPY_HAVE_DEBUG_PRINTS) fprintf(PYPY_DEBUG_FILE,
+ "%lx: mutex inev locked\n", pself);
+#endif
+}
+static void mutex_unlock(void)
+{
+ unsigned long pself = (unsigned long)pthread_self();
+ locked_by = 0;
+#ifdef RPY_STM_DEBUG_PRINT
+ if (PYPY_HAVE_DEBUG_PRINTS) fprintf(PYPY_DEBUG_FILE,
+ "%lx: mutex inev unlocked\n", pself);
+#endif
+ pthread_mutex_unlock(&mutex_inevitable);
+}
+# else
+# define mutex_lock() pthread_mutex_lock(&mutex_inevitable)
+# define mutex_unlock() pthread_mutex_unlock(&mutex_inevitable)
+# endif
+#else
+# define mutex_lock() /* nothing */
+# define mutex_unlock() /* nothing */
+#endif
+
+static void wait_end_inevitability(struct tx_descriptor *d)
+{
+ unsigned long curts;
+ releaseLocksForRetry(d);
+
+ // We are going to wait until the other inevitable transaction
+ // finishes. XXX We could do better here: we could check if
+ // committing 'd' would create a conflict for the other inevitable
+ // thread 'd_inev' or not. It requires peeking in 'd_inev' from this
+ // thread (which we never do so far) in order to do something like
+ // 'validate_fast(d_inev); d_inev->start_time = updated;'
+
+ while ((curts = get_global_timestamp(d)) & 1)
+ {
+ // while we're about to wait anyway, we can do a validate_fast
+ if (d->start_time < curts - 1)
+ {
+ validate_fast(d, 3);
+ d->start_time = curts - 1;
+ }
+ tx_spinloop(4);
+ mutex_lock();
+ mutex_unlock();
+ }
+ acquireLocks(d);
+}
+
+static void commitInevitableTransaction(struct tx_descriptor *d)
+{
+ unsigned long ts;
+ _Bool ok;
+
+ // no-one else can modify global_timestamp if I'm inevitable
+ // and d_inev_checking is 0
+ ts = get_global_timestamp(d);
+ assert(ts & 1);
+ set_global_timestamp(d, ts + 1);
+ d->end_time = ts + 1;
+ assert(d->end_time == (d->start_time + 2));
+
+ // run the redo log, and release the locks
+ tx_redo(d);
+
+ mutex_unlock();
+}
+
+/* lazy/lazy read instrumentation */
+#define STM_DO_READ(READ_OPERATION) \
+ if (is_inevitable(d)) { \
+ /* if is_inevitable(), then we don't need to do the checking of */ \
+ /* o->version done below --- but more importantly, we don't need */ \
+ /* to insert o in the OrecList */ \
+ READ_OPERATION; \
+ } \
+ else { \
+ retry: \
+ /* read the orec BEFORE we read anything else */ \
+ ovt = o->version; \
+ CFENCE; \
+ \
+ /* this tx doesn't hold any locks, so if the lock for this addr is */ \
+ /* held, there is contention. A lock is never hold for too long, */ \
+ /* so spinloop until it is released. */ \
+ if (IS_LOCKED_OR_NEWER(ovt, d->start_time)) \
+ { \
+ if (IS_LOCKED(ovt)) { \
+ tx_spinloop(7); \
+ goto retry; \
+ } \
+ /* else this location is too new, scale forward */ \
+ owner_version_t newts = get_global_timestamp(d) & ~1; \
+ validate_fast(d, 1); \
+ d->start_time = newts; \
+ } \
+ \
+ /* orec is unlocked, with ts <= start_time. read the location */ \
+ READ_OPERATION; \
+ \
+ /* postvalidate AFTER reading addr: */ \
+ CFENCE; \
+ if (__builtin_expect(o->version != ovt, 0)) \
+ goto retry; /* oups, try again */ \
+ \
+ oreclist_insert(&d->reads, (orec_t*)o); \
+ }
+
+
+#define STM_READ_WORD(SIZE, SUFFIX, TYPE) \
+TYPE stm_read_int##SIZE##SUFFIX(void* addr, long offset) \
+{ \
+ struct tx_descriptor *d = active_thread_descriptor; \
+ volatile orec_t *o = get_orec(addr); \
+ owner_version_t ovt; \
+ \
+ assert(sizeof(TYPE) == SIZE); \
+ /* XXX try to remove this check from the main path: */ \
+ /* d is NULL only when in non-main threads but */ \
+ /* outside a transaction. */ \
+ if (d == NULL) \
+ return *(TYPE *)(((char *)addr) + offset); \
+ \
+ if ((o->tid & GCFLAG_WAS_COPIED) != 0) \
+ { \
+ /* Look up in the thread-local dictionary. */ \
+ wlog_t *found; \
+ REDOLOG_FIND(d->redolog, addr, found, goto not_found); \
+ orec_t *localobj = (orec_t *)found->val; \
+ assert((localobj->tid & GCFLAG_GLOBAL) == 0); \
+ return *(TYPE *)(((char *)localobj) + offset); \
+ \
+ not_found:; \
+ } \
+ \
+ TYPE tmp; \
+ STM_DO_READ(tmp = *(TYPE *)(((char *)addr) + offset)); \
+ return tmp; \
+}
+
+STM_READ_WORD(1, , char)
+STM_READ_WORD(2, , short)
+STM_READ_WORD(4, , int)
+STM_READ_WORD(8, , long long)
+STM_READ_WORD(8,f, double)
+STM_READ_WORD(4,f, float)
+
+void stm_copy_transactional_to_raw(void *src, void *dst, long size)
+{
+ struct tx_descriptor *d = active_thread_descriptor;
+ volatile orec_t *o = get_orec(src);
+ owner_version_t ovt;
+
+ assert(d != NULL);
+
+ /* don't copy the header */
+ src = ((char *)src) + sizeof(orec_t);
+ dst = ((char *)dst) + sizeof(orec_t);
+ size -= sizeof(orec_t);
+
+ STM_DO_READ(memcpy(dst, src, size));
+}
+
+static struct tx_descriptor *descriptor_init(long in_main_thread)
+{
+ assert(thread_descriptor == NULL);
+ assert(active_thread_descriptor == NULL);
+ if (1) /* for hg diff */
+ {
+ struct tx_descriptor *d = malloc(sizeof(struct tx_descriptor));
+ memset(d, 0, sizeof(struct tx_descriptor));
+
+#ifdef RPY_STM_DEBUG_PRINT
+ PYPY_DEBUG_START("stm-init");
+#endif
+
+ if (in_main_thread)
+ {
+ d->my_lock_word = 0; /* special value for the main thread */
+ }
+ else
+ {
+ /* initialize 'my_lock_word' to be a unique negative number */
+ d->my_lock_word = (owner_version_t)d;
+ if (!IS_LOCKED(d->my_lock_word))
+ d->my_lock_word = ~d->my_lock_word;
+ assert(IS_LOCKED(d->my_lock_word));
+ }
+ /*d->spinloop_counter = (unsigned int)(d->my_lock_word | 1);*/
+
+ thread_descriptor = d;
+ if (in_main_thread)
+ ; //stm_leave_transactional_mode();
+ else
+ ; /* active_thread_descriptor stays NULL */
+
+#ifdef RPY_STM_DEBUG_PRINT
+ if (PYPY_HAVE_DEBUG_PRINTS) fprintf(PYPY_DEBUG_FILE, "thread %lx starting\n",
+ (long)pthread_self());
+ PYPY_DEBUG_STOP("stm-init");
+#endif
+ return d;
+ }
+}
+
+static void descriptor_done(void)
+{
+ struct tx_descriptor *d = thread_descriptor;
+ assert(d != NULL);
+ assert(active_thread_descriptor == NULL);
+
+ thread_descriptor = NULL;
+
+#ifdef RPY_STM_DEBUG_PRINT
+ PYPY_DEBUG_START("stm-done");
+ if (PYPY_HAVE_DEBUG_PRINTS) {
+ int num_aborts = 0, num_spinloops = 0;
+ int i, prevchar;
+ char line[256], *p = line;
+
+ for (i=0; i<ABORT_REASONS; i++)
+ num_aborts += d->num_aborts[i];
+ for (i=0; i<SPINLOOP_REASONS; i++)
+ num_spinloops += d->num_spinloops[i];
+
+ p += sprintf(p, "thread %lx: %d commits, %d aborts\n",
+ (long)pthread_self(),
+ d->num_commits,
+ num_aborts);
+
+ for (i=0; i<ABORT_REASONS; i++)
+ p += sprintf(p, "%c%d", i == 0 ? '[' : ',',
+ d->num_aborts[i]);
+
+ for (i=1; i<SPINLOOP_REASONS; i++) /* num_spinloops[0] == num_aborts */
+ p += sprintf(p, "%c%d", i == 1 ? '|' : ',',
+ d->num_spinloops[i]);
+
+ p += sprintf(p, "]\n");
+ fwrite(line, 1, p - line, PYPY_DEBUG_FILE);
+ }
+ PYPY_DEBUG_STOP("stm-done");
+#endif
+
+ free(d);
+}
+
+static void begin_transaction(jmp_buf* buf)
+{
+ struct tx_descriptor *d = thread_descriptor;
+ /* you need to call descriptor_init() before calling
+ stm_perform_transaction() */
+ assert(d != NULL);
+ d->setjmp_buf = buf;
+ d->start_time = (/*d->last_known_global_timestamp*/ global_timestamp) & ~1;
+ active_thread_descriptor = d;
+}
+
+static long commit_transaction(void)
+{
+ struct tx_descriptor *d = active_thread_descriptor;
+ assert(d != NULL);
+
+ // if I don't have writes, I'm committed
+ if (!redolog_any_entry(&d->redolog))
+ {
+ if (is_inevitable(d))
+ {
+ unsigned long ts = get_global_timestamp(d);
+ assert(ts & 1);
+ set_global_timestamp(d, ts - 1);
+ mutex_unlock();
+ }
+ d->num_commits++;
+ common_cleanup(d);
+ active_thread_descriptor = NULL;
+ return d->start_time;
+ }
+
+ // bring that variable over to this CPU core (optimization, maybe)
+ global_timestamp;
+
+ // acquire locks
+ acquireLocks(d);
+
+ if (is_inevitable(d))
+ {
+ commitInevitableTransaction(d);
+ }
+ else
+ {
+ while (1)
+ {
+ unsigned long expected = get_global_timestamp(d);
+ if (expected & 1)
+ {
+ // wait until it is done. hopefully we can then proceed
+ // without conflicts.
+ wait_end_inevitability(d);
+ continue;
+ }
+ if (change_global_timestamp(d, expected, expected + 2))
+ {
+ d->end_time = expected + 2;
+ break;
+ }
+ }
+
+ // validate (but skip validation if nobody else committed)
+ if (d->end_time != (d->start_time + 2))
+ validate(d);
+
+ // run the redo log, and release the locks
+ tx_redo(d);
+ }
+
+ // remember that this was a commit
+ d->num_commits++;
+
+ // reset all lists
+ common_cleanup(d);
+ active_thread_descriptor = NULL;
+ return d->end_time;
+}
+
+void stm_try_inevitable(STM_CCHARP1(why))
+{
+ /* when a transaction is inevitable, its start_time is equal to
+ global_timestamp and global_timestamp cannot be incremented
+ by another thread. We set the lowest bit in global_timestamp
+ to 1. */
+ struct tx_descriptor *d = active_thread_descriptor;
+ if (d == NULL)
+ return;
+
+ if (is_inevitable(d)) /* also when the transaction is inactive */
+ {
+ return; /* I am already inevitable */
+ }
+
+#ifdef RPY_STM_DEBUG_PRINT
+ PYPY_DEBUG_START("stm-inevitable");
+ if (PYPY_HAVE_DEBUG_PRINTS)
+ {
+ fprintf(PYPY_DEBUG_FILE, "%s\n", why);
+ }
+#endif
+
+ while (1)
+ {
+ unsigned long curtime = get_global_timestamp(d);
+ if (d->start_time != (curtime & ~1))
+ { /* scale forward */
+ validate_fast(d, 2);
+ d->start_time = curtime & ~1;
+ }
+ mutex_lock();
+ if (curtime & 1) /* there is, or was, already an inevitable thread */
+ {
+ /* should we spinloop here, or abort (and likely come back
+ in try_inevitable() very soon)? unclear. For now
+ let's try to spinloop, after the waiting done by
+ acquiring the mutex */
+ mutex_unlock();
+ tx_spinloop(6);
+ continue;
+ }
+ if (change_global_timestamp(d, curtime, curtime + 1))
+ break;
+ mutex_unlock();
+ }
+ d->setjmp_buf = NULL; /* inevitable from now on */
+#ifdef RPY_STM_DEBUG_PRINT
+ PYPY_DEBUG_STOP("stm-inevitable");
+#endif
+}
+
+void stm_abort_and_retry(void)
+{
+ tx_abort(7); /* manual abort */
+}
diff --git a/pypy/translator/stm/src_stm/et.c b/pypy/translator/stm/src_stm/et.c
--- a/pypy/translator/stm/src_stm/et.c
+++ b/pypy/translator/stm/src_stm/et.c
@@ -13,7 +13,7 @@
#include <stdio.h>
#include <string.h>
-#define USE_PTHREAD_MUTEX /* optional */
+#define USE_PTHREAD_MUTEX /* can be made optional */
#ifdef USE_PTHREAD_MUTEX
# include <pthread.h>
#endif
@@ -55,821 +55,12 @@
#define get_orec(addr) ((volatile orec_t *)(addr))
+/************************************************************/
+
#include "src_stm/lists.c"
+#include "src_stm/core.c"
+#include "src_stm/rpyintf.c"
/************************************************************/
-#define ABORT_REASONS 8
-#define SPINLOOP_REASONS 10
-
-struct tx_descriptor {
- void *rpython_tls_object;
- jmp_buf *setjmp_buf;
- owner_version_t start_time;
- owner_version_t end_time;
- /*unsigned long last_known_global_timestamp;*/
- owner_version_t my_lock_word;
- struct OrecList reads;
- unsigned num_commits;
- unsigned num_aborts[ABORT_REASONS];
- unsigned num_spinloops[SPINLOOP_REASONS];
- /*unsigned int spinloop_counter;*/
- struct RedoLog redolog; /* last item, because it's the biggest one */
-};
-
-/* global_timestamp contains in its lowest bit a flag equal to 1
- if there is an inevitable transaction running */
-static volatile unsigned long global_timestamp = 2;
-static __thread struct tx_descriptor *thread_descriptor = NULL;
-static __thread struct tx_descriptor *active_thread_descriptor = NULL;
-static long (*rpython_get_size)(void*);
-
-/************************************************************/
-
-static unsigned long get_global_timestamp(struct tx_descriptor *d)
-{
- return (/*d->last_known_global_timestamp =*/ global_timestamp);
-}
-
-static _Bool change_global_timestamp(struct tx_descriptor *d,
- unsigned long old,
- unsigned long new)
-{
- if (bool_cas(&global_timestamp, old, new))
- {
- /*d->last_known_global_timestamp = new;*/
- return 1;
- }
- return 0;
-}
-
-static void set_global_timestamp(struct tx_descriptor *d, unsigned long new)
-{
- global_timestamp = new;
- /*d->last_known_global_timestamp = new;*/
-}
-
-static void tx_abort(int);
-
-static void tx_spinloop(int num)
-{
- unsigned int c;
- int i;
- struct tx_descriptor *d = active_thread_descriptor;
- d->num_spinloops[num]++;
-
- //printf("tx_spinloop(%d)\n", num);
-
-#if 0
- c = d->spinloop_counter;
- d->spinloop_counter = c * 9;
- i = c & 0xff0000;
- while (i >= 0) {
- spinloop();
- i -= 0x10000;
- }
-#else
- spinloop();
-#endif
-}
-
-static _Bool is_inevitable(struct tx_descriptor *d)
-{
- return d->setjmp_buf == NULL;
-}
-
-/*** run the redo log to commit a transaction, and release the locks */
-static void tx_redo(struct tx_descriptor *d)
-{
- owner_version_t newver = d->end_time;
- wlog_t *item;
- REDOLOG_LOOP_FORWARD(d->redolog, item)
- {
- void *globalobj = item->addr;
- void *localobj = item->val;
- long size = rpython_get_size(localobj);
- memcpy(((char *)globalobj) + sizeof(orec_t),
- ((char *)localobj) + sizeof(orec_t),
- size - sizeof(orec_t));
- /* unlock the orec */
- volatile orec_t* o = get_orec(globalobj);
- CFENCE;
- o->version = newver;
- } REDOLOG_LOOP_END;
-}
-
-/*** on abort, release locks and restore the old version number. */
-static void releaseAndRevertLocks(struct tx_descriptor *d)
-{
- wlog_t *item;
- REDOLOG_LOOP_FORWARD(d->redolog, item)
- {
- if (item->p != -1)
- {
- volatile orec_t* o = get_orec(item->addr);
- o->version = item->p;
- }
- } REDOLOG_LOOP_END;
-}
-
-/*** release locks and restore the old version number, ready to retry later */
-static void releaseLocksForRetry(struct tx_descriptor *d)
-{
- wlog_t *item;
- REDOLOG_LOOP_FORWARD(d->redolog, item)
- {
- volatile orec_t* o = get_orec(item->addr);
- assert(item->p != -1);
- o->version = item->p;
- item->p = -1;
- } REDOLOG_LOOP_END;
-}
-
-/*** lock all locations */
-static void acquireLocks(struct tx_descriptor *d)
-{
- wlog_t *item;
- // try to lock every location in the write set
- REDOLOG_LOOP_BACKWARD(d->redolog, item)
- {
- // get orec, read its version#
- volatile orec_t* o = get_orec(item->addr);
- owner_version_t ovt;
-
- retry:
- ovt = o->version;
-
- // if orec not locked, lock it
- //
- // NB: if ovt > start time, we may introduce inconsistent
- // reads. Since most writes are also reads, we'll just abort under this
- // condition. This can introduce false conflicts
- if (!IS_LOCKED_OR_NEWER(ovt, d->start_time)) {
- if (!bool_cas(&o->version, ovt, d->my_lock_word))
- goto retry;
- // save old version to item->p. Now we hold the lock.
- item->p = ovt;
- }
- // else if the location is too recent...
- else if (!IS_LOCKED(ovt))
- tx_abort(0);
- // else it is locked: check it's not by me
- else {
- assert(ovt != d->my_lock_word);
- // we can either abort or spinloop. Because we are at the end of
- // the transaction we might try to spinloop, even though after the
- // lock is released the ovt will be very recent, possibly
- // > d->start_time. It is necessary to spinloop in case we are
- // inevitable, so use that as a criteria. Another solution to avoid
- // deadlocks would be to sort the order in which we take the locks.
- if (is_inevitable(d))
- tx_spinloop(8);
- else
- tx_abort(6);
- goto retry;
- }
- } REDOLOG_LOOP_END;
-}
-
-static void common_cleanup(struct tx_descriptor *d)
-{
- d->reads.size = 0;
- redolog_clear(&d->redolog);
-}
-
-static void tx_cleanup(struct tx_descriptor *d)
-{
- // release the locks and restore version numbers
- releaseAndRevertLocks(d);
- // reset all lists
- common_cleanup(d);
-}
-
-static void tx_restart(struct tx_descriptor *d)
-{
- tx_cleanup(d);
- tx_spinloop(0);
- longjmp(*d->setjmp_buf, 1);
-}
-
-/*** increase the abort count and restart the transaction */
-static void tx_abort(int reason)
-{
- struct tx_descriptor *d = active_thread_descriptor;
- assert(!is_inevitable(d));
- d->num_aborts[reason]++;
-#ifdef RPY_STM_DEBUG_PRINT
- PYPY_DEBUG_START("stm-abort");
- if (PYPY_HAVE_DEBUG_PRINTS)
- fprintf(PYPY_DEBUG_FILE, "thread %lx aborting %d\n",
- (long)pthread_self(), reason);
- PYPY_DEBUG_STOP("stm-abort");
-#endif
- tx_restart(d);
-}
-
-/**
- * fast-path validation, assuming that I don't hold locks.
- */
-static void validate_fast(struct tx_descriptor *d, int lognum)
-{
- int i;
- owner_version_t ovt;
- assert(!is_inevitable(d));
- for (i=0; i<d->reads.size; i++)
- {
- retry:
- ovt = d->reads.items[i]->version;
- if (IS_LOCKED_OR_NEWER(ovt, d->start_time))
- {
- // If locked, we wait until it becomes unlocked. The chances are
- // that it will then have a very recent start_time, likely
- // > d->start_time, but it might still be better than always aborting
- if (IS_LOCKED(ovt))
- {
- tx_spinloop(lognum); /* tx_spinloop(1), tx_spinloop(2),
- tx_spinloop(3) */
- goto retry;
- }
- else
- // abort if the timestamp is newer than my start time.
- tx_abort(lognum); /* tx_abort(1), tx_abort(2), tx_abort(3) */
- }
- }
-}
-
-/**
- * validate the read set by making sure that all orecs that we've read have
- * timestamps at least as old as our start time, unless we locked those orecs.
- */
-static void validate(struct tx_descriptor *d)
-{
- int i;
- owner_version_t ovt;
- assert(!is_inevitable(d));
- for (i=0; i<d->reads.size; i++)
- {
- ovt = d->reads.items[i]->version; // read this orec
- if (IS_LOCKED_OR_NEWER(ovt, d->start_time))
- {
- if (!IS_LOCKED(ovt))
- // if unlocked and newer than start time, abort
- tx_abort(4);
- else
- {
- // if locked and not by me, abort
- if (ovt != d->my_lock_word)
- tx_abort(5);
- }
- }
- }
-}
-
-#ifdef USE_PTHREAD_MUTEX
-/* mutex: only to avoid busy-looping too much in tx_spinloop() below */
-static pthread_mutex_t mutex_inevitable = PTHREAD_MUTEX_INITIALIZER;
-# ifdef RPY_STM_ASSERT
-unsigned long locked_by = 0;
-static void mutex_lock(void)
-{
- unsigned long pself = (unsigned long)pthread_self();
-#ifdef RPY_STM_DEBUG_PRINT
- if (PYPY_HAVE_DEBUG_PRINTS) fprintf(PYPY_DEBUG_FILE,
- "%lx: mutex inev locking...\n", pself);
-#endif
- assert(locked_by != pself);
- pthread_mutex_lock(&mutex_inevitable);
- locked_by = pself;
-#ifdef RPY_STM_DEBUG_PRINT
- if (PYPY_HAVE_DEBUG_PRINTS) fprintf(PYPY_DEBUG_FILE,
- "%lx: mutex inev locked\n", pself);
-#endif
-}
-static void mutex_unlock(void)
-{
- unsigned long pself = (unsigned long)pthread_self();
- locked_by = 0;
-#ifdef RPY_STM_DEBUG_PRINT
- if (PYPY_HAVE_DEBUG_PRINTS) fprintf(PYPY_DEBUG_FILE,
- "%lx: mutex inev unlocked\n", pself);
-#endif
- pthread_mutex_unlock(&mutex_inevitable);
-}
-# else
-# define mutex_lock() pthread_mutex_lock(&mutex_inevitable)
-# define mutex_unlock() pthread_mutex_unlock(&mutex_inevitable)
-# endif
-#else
-# define mutex_lock() /* nothing */
-# define mutex_unlock() /* nothing */
-#endif
-
-static void wait_end_inevitability(struct tx_descriptor *d)
-{
- unsigned long curts;
- releaseLocksForRetry(d);
-
- // We are going to wait until the other inevitable transaction
- // finishes. XXX We could do better here: we could check if
- // committing 'd' would create a conflict for the other inevitable
- // thread 'd_inev' or not. It requires peeking in 'd_inev' from this
- // thread (which we never do so far) in order to do something like
- // 'validate_fast(d_inev); d_inev->start_time = updated;'
-
- while ((curts = get_global_timestamp(d)) & 1)
- {
- // while we're about to wait anyway, we can do a validate_fast
- if (d->start_time < curts - 1)
- {
- validate_fast(d, 3);
- d->start_time = curts - 1;
- }
- tx_spinloop(4);
- mutex_lock();
- mutex_unlock();
- }
- acquireLocks(d);
-}
-
-static void commitInevitableTransaction(struct tx_descriptor *d)
-{
- unsigned long ts;
- _Bool ok;
-
- // no-one else can modify global_timestamp if I'm inevitable
- // and d_inev_checking is 0
- ts = get_global_timestamp(d);
- assert(ts & 1);
- set_global_timestamp(d, ts + 1);
- d->end_time = ts + 1;
- assert(d->end_time == (d->start_time + 2));
-
- // run the redo log, and release the locks
- tx_redo(d);
-
- mutex_unlock();
-}
-
-/* lazy/lazy read instrumentation */
-#define STM_DO_READ(READ_OPERATION) \
- if (is_inevitable(d)) { \
- /* if is_inevitable(), then we don't need to do the checking of */ \
- /* o->version done below --- but more importantly, we don't need */ \
- /* to insert o in the OrecList */ \
- READ_OPERATION; \
- } \
- else { \
- retry: \
- /* read the orec BEFORE we read anything else */ \
- ovt = o->version; \
- CFENCE; \
- \
- /* this tx doesn't hold any locks, so if the lock for this addr is */ \
- /* held, there is contention. A lock is never hold for too long, */ \
- /* so spinloop until it is released. */ \
- if (IS_LOCKED_OR_NEWER(ovt, d->start_time)) \
- { \
- if (IS_LOCKED(ovt)) { \
- tx_spinloop(7); \
- goto retry; \
- } \
- /* else this location is too new, scale forward */ \
- owner_version_t newts = get_global_timestamp(d) & ~1; \
- validate_fast(d, 1); \
- d->start_time = newts; \
- } \
- \
- /* orec is unlocked, with ts <= start_time. read the location */ \
- READ_OPERATION; \
- \
- /* postvalidate AFTER reading addr: */ \
- CFENCE; \
- if (__builtin_expect(o->version != ovt, 0)) \
- goto retry; /* oups, try again */ \
- \
- oreclist_insert(&d->reads, (orec_t*)o); \
- }
-
-
-#define STM_READ_WORD(SIZE, SUFFIX, TYPE) \
-TYPE stm_read_int##SIZE##SUFFIX(void* addr, long offset) \
-{ \
- struct tx_descriptor *d = active_thread_descriptor; \
- volatile orec_t *o = get_orec(addr); \
- owner_version_t ovt; \
- \
- assert(sizeof(TYPE) == SIZE); \
- /* XXX try to remove this check from the main path: */ \
- /* d is NULL only when in non-main threads but */ \
- /* outside a transaction. */ \
- if (d == NULL) \
- return *(TYPE *)(((char *)addr) + offset); \
- \
- if ((o->tid & GCFLAG_WAS_COPIED) != 0) \
- { \
- /* Look up in the thread-local dictionary. */ \
- wlog_t *found; \
- REDOLOG_FIND(d->redolog, addr, found, goto not_found); \
- orec_t *localobj = (orec_t *)found->val; \
- assert((localobj->tid & GCFLAG_GLOBAL) == 0); \
- return *(TYPE *)(((char *)localobj) + offset); \
- \
- not_found:; \
- } \
- \
- TYPE tmp; \
- STM_DO_READ(tmp = *(TYPE *)(((char *)addr) + offset)); \
- return tmp; \
-}
-
-STM_READ_WORD(1, , char)
-STM_READ_WORD(2, , short)
-STM_READ_WORD(4, , int)
-STM_READ_WORD(8, , long long)
-STM_READ_WORD(8,f, double)
-STM_READ_WORD(4,f, float)
-
-void stm_copy_transactional_to_raw(void *src, void *dst, long size)
-{
- struct tx_descriptor *d = active_thread_descriptor;
- volatile orec_t *o = get_orec(src);
- owner_version_t ovt;
-
- assert(d != NULL);
-
- /* don't copy the header */
- src = ((char *)src) + sizeof(orec_t);
- dst = ((char *)dst) + sizeof(orec_t);
- size -= sizeof(orec_t);
-
- STM_DO_READ(memcpy(dst, src, size));
-}
-
-
-static struct tx_descriptor *descriptor_init(long in_main_thread)
-{
- assert(thread_descriptor == NULL);
- assert(active_thread_descriptor == NULL);
- if (1) /* for hg diff */
- {
- struct tx_descriptor *d = malloc(sizeof(struct tx_descriptor));
- memset(d, 0, sizeof(struct tx_descriptor));
-
-#ifdef RPY_STM_DEBUG_PRINT
- PYPY_DEBUG_START("stm-init");
-#endif
-
- if (in_main_thread)
- {
- d->my_lock_word = 0; /* special value for the main thread */
- }
- else
- {
- /* initialize 'my_lock_word' to be a unique negative number */
- d->my_lock_word = (owner_version_t)d;
- if (!IS_LOCKED(d->my_lock_word))
- d->my_lock_word = ~d->my_lock_word;
- assert(IS_LOCKED(d->my_lock_word));
- }
- /*d->spinloop_counter = (unsigned int)(d->my_lock_word | 1);*/
-
- thread_descriptor = d;
- if (in_main_thread)
- stm_leave_transactional_mode();
- else
- ; /* active_thread_descriptor stays NULL */
-
-#ifdef RPY_STM_DEBUG_PRINT
- if (PYPY_HAVE_DEBUG_PRINTS) fprintf(PYPY_DEBUG_FILE, "thread %lx starting\n",
- (long)pthread_self());
- PYPY_DEBUG_STOP("stm-init");
-#endif
- return d;
- }
-}
-
-static void descriptor_done(void)
-{
- struct tx_descriptor *d = thread_descriptor;
- assert(d != NULL);
- assert(active_thread_descriptor == NULL);
-
- thread_descriptor = NULL;
-
-#ifdef RPY_STM_DEBUG_PRINT
- PYPY_DEBUG_START("stm-done");
- if (PYPY_HAVE_DEBUG_PRINTS) {
- int num_aborts = 0, num_spinloops = 0;
- int i, prevchar;
- char line[256], *p = line;
-
- for (i=0; i<ABORT_REASONS; i++)
- num_aborts += d->num_aborts[i];
- for (i=0; i<SPINLOOP_REASONS; i++)
- num_spinloops += d->num_spinloops[i];
-
- p += sprintf(p, "thread %lx: %d commits, %d aborts\n",
- (long)pthread_self(),
- d->num_commits,
- num_aborts);
-
- for (i=0; i<ABORT_REASONS; i++)
- p += sprintf(p, "%c%d", i == 0 ? '[' : ',',
- d->num_aborts[i]);
-
- for (i=1; i<SPINLOOP_REASONS; i++) /* num_spinloops[0] == num_aborts */
- p += sprintf(p, "%c%d", i == 1 ? '|' : ',',
- d->num_spinloops[i]);
-
- p += sprintf(p, "]\n");
- fwrite(line, 1, p - line, PYPY_DEBUG_FILE);
- }
- PYPY_DEBUG_STOP("stm-done");
-#endif
-
- free(d);
-}
-
-static void begin_transaction(jmp_buf* buf)
-{
- struct tx_descriptor *d = thread_descriptor;
- /* you need to call descriptor_init() before calling
- stm_perform_transaction() */
- assert(d != NULL);
- d->setjmp_buf = buf;
- d->start_time = (/*d->last_known_global_timestamp*/ global_timestamp) & ~1;
- active_thread_descriptor = d;
-}
-
-static long commit_transaction(void)
-{
- struct tx_descriptor *d = active_thread_descriptor;
- assert(d != NULL);
-
- // if I don't have writes, I'm committed
- if (!redolog_any_entry(&d->redolog))
- {
- if (is_inevitable(d))
- {
- unsigned long ts = get_global_timestamp(d);
- assert(ts & 1);
- set_global_timestamp(d, ts - 1);
- mutex_unlock();
- }
- d->num_commits++;
- common_cleanup(d);
- active_thread_descriptor = NULL;
- return d->start_time;
- }
-
- // bring that variable over to this CPU core (optimization, maybe)
- global_timestamp;
-
- // acquire locks
- acquireLocks(d);
-
- if (is_inevitable(d))
- {
- commitInevitableTransaction(d);
- }
- else
- {
- while (1)
- {
- unsigned long expected = get_global_timestamp(d);
- if (expected & 1)
- {
- // wait until it is done. hopefully we can then proceed
- // without conflicts.
- wait_end_inevitability(d);
- continue;
- }
- if (change_global_timestamp(d, expected, expected + 2))
- {
- d->end_time = expected + 2;
- break;
- }
- }
-
- // validate (but skip validation if nobody else committed)
- if (d->end_time != (d->start_time + 2))
- validate(d);
-
- // run the redo log, and release the locks
- tx_redo(d);
- }
-
- // remember that this was a commit
- d->num_commits++;
-
- // reset all lists
- common_cleanup(d);
- active_thread_descriptor = NULL;
- return d->end_time;
-}
-
-void* stm_perform_transaction(void*(*callback)(void*, long), void *arg,
- void *save_and_restore)
-{
- void *result;
- jmp_buf _jmpbuf;
- volatile long v_counter = 0;
- long counter;
- void *volatile saved_value;
- assert(active_thread_descriptor == NULL);
- if (save_and_restore)
- saved_value = *(void**)save_and_restore;
- setjmp(_jmpbuf);
- if (save_and_restore)
- *(void**)save_and_restore = saved_value;
- begin_transaction(&_jmpbuf);
- counter = v_counter;
- v_counter = counter + 1;
- result = callback(arg, counter);
- commit_transaction();
- return result;
-}
-
-void stm_enter_transactional_mode(void)
-{
- struct tx_descriptor *d = active_thread_descriptor;
- assert(d != NULL);
- assert(is_inevitable(d));
- /* we only need a subset of a full commit */
- acquireLocks(d);
- commitInevitableTransaction(d);
- common_cleanup(d);
- active_thread_descriptor = NULL;
-}
-
-void stm_leave_transactional_mode(void)
-{
- struct tx_descriptor *d = thread_descriptor;
- assert(active_thread_descriptor == NULL);
-
- mutex_lock();
- d->setjmp_buf = NULL;
- d->start_time = get_global_timestamp(d);
- assert(!(d->start_time & 1));
- set_global_timestamp(d, d->start_time | 1);
-
- assert(is_inevitable(d));
- active_thread_descriptor = d;
-}
-
-void stm_try_inevitable(STM_CCHARP1(why))
-{
- /* when a transaction is inevitable, its start_time is equal to
- global_timestamp and global_timestamp cannot be incremented
- by another thread. We set the lowest bit in global_timestamp
- to 1. */
- struct tx_descriptor *d = active_thread_descriptor;
- if (d == NULL)
- return;
-
- if (is_inevitable(d)) /* also when the transaction is inactive */
- {
- return; /* I am already inevitable */
- }
-
-#ifdef RPY_STM_DEBUG_PRINT
- PYPY_DEBUG_START("stm-inevitable");
- if (PYPY_HAVE_DEBUG_PRINTS)
- {
- fprintf(PYPY_DEBUG_FILE, "%s\n", why);
- }
-#endif
-
- while (1)
- {
- unsigned long curtime = get_global_timestamp(d);
- if (d->start_time != (curtime & ~1))
- { /* scale forward */
- validate_fast(d, 2);
- d->start_time = curtime & ~1;
- }
- mutex_lock();
- if (curtime & 1) /* there is, or was, already an inevitable thread */
- {
- /* should we spinloop here, or abort (and likely come back
- in try_inevitable() very soon)? unclear. For now
- let's try to spinloop, after the waiting done by
- acquiring the mutex */
- mutex_unlock();
- tx_spinloop(6);
- continue;
- }
- if (change_global_timestamp(d, curtime, curtime + 1))
- break;
- mutex_unlock();
- }
- d->setjmp_buf = NULL; /* inevitable from now on */
-#ifdef RPY_STM_DEBUG_PRINT
- PYPY_DEBUG_STOP("stm-inevitable");
-#endif
-}
-
-void stm_abort_and_retry(void)
-{
- tx_abort(7); /* manual abort */
-}
-
-long stm_debug_get_state(void)
-{
- struct tx_descriptor *d = thread_descriptor;
- if (d == NULL)
- return -2;
- if (active_thread_descriptor == NULL)
- {
- assert(d->my_lock_word != 0);
- return 0;
- }
- assert(d == active_thread_descriptor);
- if (d->my_lock_word == 0)
- return -1;
- if (!is_inevitable(d))
- return 1;
- else
- return 2;
-}
-
-long stm_thread_id(void)
-{
- struct tx_descriptor *d = thread_descriptor;
- if (d == NULL)
- return 0; /* no thread_descriptor yet, assume it's the main thread */
- return d->my_lock_word;
-}
-
-
-void stm_set_tls(void *newtls, long in_main_thread)
-{
- struct tx_descriptor *d = descriptor_init(in_main_thread);
- d->rpython_tls_object = newtls;
-}
-
-void *stm_get_tls(void)
-{
- return thread_descriptor->rpython_tls_object;
-}
-
-void stm_del_tls(void)
-{
- descriptor_done();
-}
-
-void *stm_tldict_lookup(void *key)
-{
- struct tx_descriptor *d = thread_descriptor;
- wlog_t* found;
- REDOLOG_FIND(d->redolog, key, found, goto not_found);
- return found->val;
-
- not_found:
- return NULL;
-}
-
-void stm_tldict_add(void *key, void *value)
-{
- struct tx_descriptor *d = active_thread_descriptor;
- assert(d != NULL);
- redolog_insert(&d->redolog, key, value);
-}
-
-void stm_tldict_enum(void(*callback)(void*, void*, void*))
-{
- struct tx_descriptor *d = thread_descriptor;
- wlog_t *item;
- void *tls = stm_get_tls();
-
- REDOLOG_LOOP_FORWARD(d->redolog, item)
- {
- callback(tls, item->addr, item->val);
- } REDOLOG_LOOP_END;
-}
-
-void stm_setup_size_getter(long(*getsize_fn)(void*))
-{
- rpython_get_size = getsize_fn;
-}
-
-long stm_in_transaction(void)
-{
- struct tx_descriptor *d = active_thread_descriptor;
- return d != NULL;
-}
-
-void _stm_activate_transaction(long activate)
-{
- assert(thread_descriptor != NULL);
- if (activate)
- {
- active_thread_descriptor = thread_descriptor;
- }
- else
- {
- active_thread_descriptor = NULL;
- }
-}
-
#endif /* PYPY_NOT_MAIN_FILE */
diff --git a/pypy/translator/stm/src_stm/et.h b/pypy/translator/stm/src_stm/et.h
--- a/pypy/translator/stm/src_stm/et.h
+++ b/pypy/translator/stm/src_stm/et.h
@@ -12,6 +12,11 @@
#include "src/commondefs.h"
+long stm_in_transaction(void);
+void stm_run_all_transactions(void*(*)(void*, long), void*, long);
+
+
+
void stm_setup_size_getter(long(*)(void*));
void stm_set_tls(void *, long);
@@ -40,8 +45,6 @@
void* stm_perform_transaction(void*(*)(void*, long), void*, void*);
-void stm_enter_transactional_mode(void);
-void stm_leave_transactional_mode(void);
void stm_try_inevitable(STM_CCHARP1(why));
void stm_abort_and_retry(void);
long stm_debug_get_state(void); /* -1: descriptor_init() was not called
@@ -50,7 +53,6 @@
2: in an inevitable transaction */
long stm_thread_id(void); /* returns a unique thread id,
or 0 if descriptor_init() was not called */
-long stm_in_transaction(void);
void _stm_activate_transaction(long);
void stm_copy_transactional_to_raw(void *src, void *dst, long size);
diff --git a/pypy/translator/stm/src_stm/rpyintf.c b/pypy/translator/stm/src_stm/rpyintf.c
new file mode 100644
--- /dev/null
+++ b/pypy/translator/stm/src_stm/rpyintf.c
@@ -0,0 +1,139 @@
+/* -*- c-basic-offset: 2 -*- */
+
+long stm_thread_id(void)
+{
+ struct tx_descriptor *d = thread_descriptor;
+ if (d == NULL)
+ return 0; /* no thread_descriptor yet, assume it's the main thread */
+ return d->my_lock_word;
+}
+
+void stm_set_tls(void *newtls, long in_main_thread)
+{
+ struct tx_descriptor *d = descriptor_init(in_main_thread);
+ d->rpython_tls_object = newtls;
+}
+
+void *stm_get_tls(void)
+{
+ return thread_descriptor->rpython_tls_object;
+}
+
+void stm_del_tls(void)
+{
+ descriptor_done();
+}
+
+void *stm_tldict_lookup(void *key)
+{
+ struct tx_descriptor *d = thread_descriptor;
+ wlog_t* found;
+ REDOLOG_FIND(d->redolog, key, found, goto not_found);
+ return found->val;
+
+ not_found:
+ return NULL;
+}
+
+void stm_tldict_add(void *key, void *value)
+{
+ struct tx_descriptor *d = active_thread_descriptor;
+ assert(d != NULL);
+ redolog_insert(&d->redolog, key, value);
+}
+
+void stm_tldict_enum(void(*callback)(void*, void*, void*))
+{
+ struct tx_descriptor *d = thread_descriptor;
+ wlog_t *item;
+ void *tls = stm_get_tls();
+
+ REDOLOG_LOOP_FORWARD(d->redolog, item)
+ {
+ callback(tls, item->addr, item->val);
+ } REDOLOG_LOOP_END;
+}
+
+void stm_setup_size_getter(long(*getsize_fn)(void*))
+{
+ rpython_get_size = getsize_fn;
+}
+
+long stm_in_transaction(void)
+{
+ struct tx_descriptor *d = thread_descriptor;
+ return d != NULL;
+}
+
+void _stm_activate_transaction(long activate)
+{
+ assert(thread_descriptor != NULL);
+ if (activate)
+ {
+ active_thread_descriptor = thread_descriptor;
+ }
+ else
+ {
+ active_thread_descriptor = NULL;
+ }
+}
+
+
+/* a helper to directly read the field '_next_transaction' on
+ RPython instances of pypy.rlib.rstm.Transaction */
+static void *next_transaction(void *
+
+
+
+/* this lock is acquired when we start running transactions, and
+ released only when we are finished. */
+static pthread_mutex_t mutex_unfinished = PTHREAD_MUTEX_INITIALIZER;
+
+/* this mutex is used to ensure non-conflicting accesses to global
+ data in run_thread(). */
+static pthread_mutex_t mutex_global = PTHREAD_MUTEX_INITIALIZER;
+
+/* some global data put there by run_all_transactions(). */
+typedef void *(*run_transaction_t)(void *, long);
+static run_transaction_t g_run_transaction;
+static void *g_first_transaction, *g_last_transaction;
+static int g_num_threads;
+static int g_num_waiting_threads;
+
+/* the main function running a thread */
+static void *run_thread(void *ignored)
+{
+ pthread_mutex_lock(&mutex_global);
+
+ g_num_waiting_threads++;
+ if (g_num_waiting_threads == g_num_threads)
+ pthread_mutex_unlock(&mutex_unfinished);
+
+ pthread_mutex_unlock(&mutex_global);
+ return NULL;
+}
+
+void stm_run_all_transactions(run_transaction_t run_transaction,
+ void *initial_transaction,
+ long num_threads)
+{
+ int i;
+ g_run_transaction = run_transaction;
+ g_first_transaction = initial_transaction;
+ g_last_transaction = initial_transaction;
+ g_num_threads = (int)num_threads;
+ g_num_waiting_threads = 0;
+
+ pthread_mutex_lock(&mutex_unfinished);
+
+ for (i=0; i<(int)num_threads; i++)
+ {
+ pthread_t th;
+ int status = pthread_create(&th, NULL, run_thread, NULL);
+ assert(status == 0);
+ pthread_detach(th);
+ }
+
+ pthread_mutex_lock(&mutex_unfinished);
+ pthread_mutex_unlock(&mutex_unfinished);
+}
diff --git a/pypy/translator/stm/stmgcintf.py b/pypy/translator/stm/stmgcintf.py
--- a/pypy/translator/stm/stmgcintf.py
+++ b/pypy/translator/stm/stmgcintf.py
@@ -35,19 +35,22 @@
'8f': rffi.DOUBLE,
'4f': rffi.FLOAT}
- CALLBACK_TX = lltype.Ptr(lltype.FuncType([rffi.VOIDP, lltype.Signed],
- rffi.VOIDP))
- CALLBACK_ENUM = lltype.Ptr(lltype.FuncType([llmemory.Address] * 3,
- lltype.Void))
- GETSIZE = lltype.Ptr(lltype.FuncType([llmemory.Address], lltype.Signed))
+ RUN_TRANSACTION = lltype.Ptr(lltype.FuncType([rffi.VOIDP, lltype.Signed],
+ rffi.VOIDP))
def _freeze_(self):
return True
- setup_size_getter = smexternal('stm_setup_size_getter', [GETSIZE],
- lltype.Void)
+ in_transaction = smexternal('stm_in_transaction', [], lltype.Signed)
+ run_all_transactions = smexternal('stm_run_all_transactions',
+ [rffi.VOIDP, lltype.Signed],
+ lltype.Void)
- in_transaction = smexternal('stm_in_transaction', [], lltype.Signed)
+
+
+ # xxx review and delete xxx
+ CALLBACK_ENUM = lltype.Ptr(lltype.FuncType([llmemory.Address] * 3,
+ lltype.Void))
_activate_transaction = smexternal('_stm_activate_transaction',
[lltype.Signed], lltype.Void)
@@ -56,11 +59,6 @@
get_tls = smexternal('stm_get_tls', [], llmemory.Address)
del_tls = smexternal('stm_del_tls', [], lltype.Void)
- enter_transactional_mode = smexternal('stm_enter_transactional_mode',
- [], lltype.Void)
- leave_transactional_mode = smexternal('stm_leave_transactional_mode',
- [], lltype.Void)
-
tldict_lookup = smexternal('stm_tldict_lookup', [llmemory.Address],
llmemory.Address)
tldict_add = smexternal('stm_tldict_add', [llmemory.Address] * 2,
@@ -79,15 +77,5 @@
lltype.Void)
try_inevitable = smexternal('stm_try_inevitable', [], lltype.Void)
- perform_transaction = smexternal('stm_perform_transaction',
- [CALLBACK_TX, rffi.VOIDP,
- llmemory.Address], rffi.VOIDP)
thread_id = smexternal('stm_thread_id', [], lltype.Signed)
abort_and_retry = smexternal('stm_abort_and_retry', [], lltype.Void)
-
- _debug_get_state = smexternal('stm_debug_get_state', [], lltype.Signed)
- STATE_NOT_INITIALIZED = -2
- STATE_MAIN_THREAD = -1
- STATE_INACTIVE = 0
- STATE_ACTIVE = 1
- STATE_ACTIVE_INEVITABLE = 2
More information about the pypy-commit
mailing list