[pypy-commit] pypy stm-thread: In-progress
arigo
noreply at buildbot.pypy.org
Sat May 5 17:21:55 CEST 2012
Author: Armin Rigo <arigo at tunes.org>
Branch: stm-thread
Changeset: r54900:ef0813096a29
Date: 2012-05-05 17:21 +0200
http://bitbucket.org/pypy/pypy/changeset/ef0813096a29/
Log: In-progress
diff --git a/pypy/rlib/objectmodel.py b/pypy/rlib/objectmodel.py
--- a/pypy/rlib/objectmodel.py
+++ b/pypy/rlib/objectmodel.py
@@ -476,7 +476,8 @@
def hlinvoke(repr, llcallable, *args):
raise TypeError, "hlinvoke is meant to be rtyped and not called direclty"
-def invoke_around_extcall(before, after):
+def invoke_around_extcall(before, after,
+ enter_callback=None, leave_callback=None):
"""Call before() before any external function call, and after() after.
At the moment only one pair before()/after() can be registered at a time.
"""
@@ -490,6 +491,13 @@
from pypy.rpython.annlowlevel import llhelper
llhelper(rffi.AroundFnPtr, before)
llhelper(rffi.AroundFnPtr, after)
+ # do the same thing about enter/leave_callback
+ if enter_callback is not None:
+ rffi.aroundstate.enter_callback = enter_callback
+ llhelper(rffi.EnterCallbackFnPtr, enter_callback)
+ if leave_callback is not None:
+ rffi.aroundstate.leave_callback = leave_callback
+ llhelper(rffi.LeaveCallbackFnPtr, leave_callback)
def is_in_callback():
from pypy.rpython.lltypesystem import rffi
diff --git a/pypy/rlib/rstm.py b/pypy/rlib/rstm.py
--- a/pypy/rlib/rstm.py
+++ b/pypy/rlib/rstm.py
@@ -1,4 +1,5 @@
from pypy.translator.stm import stmgcintf
+from pypy.rlib.debug import ll_assert
def before_external_call():
@@ -11,6 +12,20 @@
after_external_call._gctransformer_hint_cannot_collect_ = True
after_external_call._dont_reach_me_in_del_ = True
+def enter_callback_call():
+ new_thread = stmgcintf.StmOperations.descriptor_init()
+ stmgcintf.StmOperations.begin_inevitable_transaction()
+ return new_thread
+enter_callback_call._gctransformer_hint_cannot_collect_ = True
+enter_callback_call._dont_reach_me_in_del_ = True
+
+def leave_callback_call(token):
+ stmgcintf.StmOperations.commit_transaction()
+ if token == 1:
+ stmgcintf.StmOperations.descriptor_done()
+leave_callback_call._gctransformer_hint_cannot_collect_ = True
+leave_callback_call._dont_reach_me_in_del_ = True
+
def do_yield_thread():
stmgcintf.StmOperations.do_yield_thread()
do_yield_thread._gctransformer_hint_close_stack_ = True
diff --git a/pypy/rpython/llinterp.py b/pypy/rpython/llinterp.py
--- a/pypy/rpython/llinterp.py
+++ b/pypy/rpython/llinterp.py
@@ -964,8 +964,6 @@
op_stm_writebarrier = _stm_not_implemented
op_stm_normalize_global = _stm_not_implemented
op_stm_become_inevitable = _stm_not_implemented
- op_stm_thread_starting = _stm_not_implemented
- op_stm_thread_stopping = _stm_not_implemented
# operations on pyobjects!
for opname in lloperation.opimpls.keys():
diff --git a/pypy/rpython/lltypesystem/opimpl.py b/pypy/rpython/lltypesystem/opimpl.py
--- a/pypy/rpython/lltypesystem/opimpl.py
+++ b/pypy/rpython/lltypesystem/opimpl.py
@@ -629,12 +629,6 @@
def op_stm_stop_transaction():
pass
-def op_stm_enter_transactional_mode():
- pass
-
-def op_stm_leave_transactional_mode():
- pass
-
def op_nop(x):
pass
diff --git a/pypy/rpython/lltypesystem/rffi.py b/pypy/rpython/lltypesystem/rffi.py
--- a/pypy/rpython/lltypesystem/rffi.py
+++ b/pypy/rpython/lltypesystem/rffi.py
@@ -281,10 +281,12 @@
source = py.code.Source(r"""
def wrapper(%s): # no *args - no GIL for mallocing the tuple
llop.gc_stack_bottom(lltype.Void) # marker for trackgcroot.py
+ token = 0
if aroundstate is not None:
- after = aroundstate.after
- if after:
- after()
+ if aroundstate.enter_callback is not None:
+ token = aroundstate.enter_callback()
+ elif aroundstate.after is not None:
+ aroundstate.after()
# from now on we hold the GIL
stackcounter.stacks_counter += 1
try:
@@ -299,9 +301,10 @@
result = errorcode
stackcounter.stacks_counter -= 1
if aroundstate is not None:
- before = aroundstate.before
- if before:
- before()
+ if aroundstate.leave_callback is not None:
+ aroundstate.leave_callback(token)
+ elif aroundstate.before is not None:
+ aroundstate.before()
# here we don't hold the GIL any more. As in the wrapper() produced
# by llexternal, it is essential that no exception checking occurs
# after the call to before().
@@ -317,11 +320,15 @@
_make_wrapper_for._annspecialcase_ = 'specialize:memo'
AroundFnPtr = lltype.Ptr(lltype.FuncType([], lltype.Void))
+EnterCallbackFnPtr = lltype.Ptr(lltype.FuncType([], lltype.Signed))
+LeaveCallbackFnPtr = lltype.Ptr(lltype.FuncType([lltype.Signed], lltype.Void))
class AroundState:
_alloc_flavor_ = "raw"
def _freeze_(self):
self.before = None # or a regular RPython function
self.after = None # or a regular RPython function
+ self.enter_callback = None
+ self.leave_callback = None
return False
aroundstate = AroundState()
aroundstate._freeze_()
diff --git a/pypy/rpython/memory/gc/stmgc.py b/pypy/rpython/memory/gc/stmgc.py
--- a/pypy/rpython/memory/gc/stmgc.py
+++ b/pypy/rpython/memory/gc/stmgc.py
@@ -124,7 +124,6 @@
GCFLAG_VISITED = first_gcflag << 2
GCFLAG_HAS_SHADOW = first_gcflag << 3
GCFLAG_FIXED_HASH = first_gcflag << 4
-GCFLAG_PREBUILT = first_gcflag << 5
def always_inline(fn):
@@ -202,16 +201,30 @@
#
self.sharedarea.setup()
#
+ self.stm_operations.descriptor_init()
+ self.stm_operations.begin_inevitable_transaction()
self.setup_thread()
def setup_thread(self):
+ """Build the StmGCTLS object and start a transaction at the level
+ of the GC. The C-level transaction should already be started."""
+ ll_assert(self.stm_operations.in_transaction(),
+ "setup_thread: not in a transaction")
from pypy.rpython.memory.gc.stmtls import StmGCTLS
- StmGCTLS(self).start_transaction()
+ stmtls = StmGCTLS(self)
+ stmtls.start_transaction()
def teardown_thread(self):
- self.stm_operations.try_inevitable()
+ """Stop the current transaction, commit it at the level of
+ C code, and tear down the StmGCTLS object. For symmetry, this
+ ensures that the level of C has another (empty) transaction
+ started."""
+ ll_assert(bool(self.stm_operations.in_transaction()),
+ "teardown_thread: not in a transaction")
stmtls = self.get_tls()
stmtls.stop_transaction()
+ self.stm_operations.commit_transaction()
+ self.stm_operations.begin_inevitable_transaction()
stmtls.delete()
@always_inline
@@ -287,7 +300,7 @@
hdr.tid = self.combine(typeid16, flags)
def init_gc_object_immortal(self, addr, typeid16, flags=0):
- flags |= GCFLAG_GLOBAL | GCFLAG_PREBUILT
+ flags |= GCFLAG_GLOBAL
self.init_gc_object(addr, typeid16, flags)
# ----------
diff --git a/pypy/rpython/memory/gc/stmtls.py b/pypy/rpython/memory/gc/stmtls.py
--- a/pypy/rpython/memory/gc/stmtls.py
+++ b/pypy/rpython/memory/gc/stmtls.py
@@ -11,7 +11,6 @@
from pypy.rpython.memory.gc.stmgc import always_inline, dont_inline
from pypy.rpython.memory.gc.stmgc import GCFLAG_GLOBAL, GCFLAG_VISITED
from pypy.rpython.memory.gc.stmgc import GCFLAG_WAS_COPIED, GCFLAG_HAS_SHADOW
-from pypy.rpython.memory.gc.stmgc import GCFLAG_PREBUILT
class StmGCTLS(object):
diff --git a/pypy/rpython/memory/gctransform/stmframework.py b/pypy/rpython/memory/gctransform/stmframework.py
--- a/pypy/rpython/memory/gctransform/stmframework.py
+++ b/pypy/rpython/memory/gctransform/stmframework.py
@@ -12,33 +12,13 @@
class StmFrameworkGCTransformer(FrameworkGCTransformer):
def _declare_functions(self, GCClass, getfn, s_gc, *args):
- gc = self.gcdata.gc
- #
- def gc_thread_start():
- self.root_walker.allocate_shadow_stack()
- gc.setup_thread()
- #
- def gc_thread_die():
- gc.teardown_thread()
- self.root_walker.free_shadow_stack()
- #
- #def start_transaction(gc):
- # self.root_walker.start_transaction()
- # gc.start_transaction()
- #
super(StmFrameworkGCTransformer, self)._declare_functions(
GCClass, getfn, s_gc, *args)
- self.thread_start_ptr = getfn(
- gc_thread_start,
- [], annmodel.s_None)
- self.thread_die_ptr = getfn(
- gc_thread_die,
- [], annmodel.s_None)
self.stm_writebarrier_ptr = getfn(
- gc.stm_writebarrier,
+ self.gcdata.gc.stm_writebarrier,
[annmodel.SomeAddress()], annmodel.SomeAddress())
self.stm_normalize_global_ptr = getfn(
- gc.stm_normalize_global,
+ self.gcdata.gc.stm_normalize_global,
[annmodel.SomeAddress()], annmodel.SomeAddress())
def build_root_walker(self):
@@ -55,24 +35,6 @@
resulttype=llmemory.Address)
hop.genop('adr_add', [v_gcdata_adr, c_ofs], resultvar=op.result)
- def gct_stm_thread_starting(self, hop):
- hop.genop("direct_call", [self.thread_starting_ptr, self.c_const_gc])
-
- def gct_stm_thread_stopping(self, hop):
- hop.genop("direct_call", [self.thread_stopping_ptr, self.c_const_gc])
-
- def gct_stm_enter_transactional_mode(self, hop):
- livevars = self.push_roots(hop)
- hop.genop("direct_call", [self.stm_enter_transactional_mode_ptr,
- self.c_const_gc])
- self.pop_roots(hop, livevars)
-
- def gct_stm_leave_transactional_mode(self, hop):
- livevars = self.push_roots(hop)
- hop.genop("direct_call", [self.stm_leave_transactional_mode_ptr,
- self.c_const_gc])
- self.pop_roots(hop, livevars)
-
def gct_stm_writebarrier(self, hop):
op = hop.spaceop
v_adr = hop.genop('cast_ptr_to_adr',
@@ -149,9 +111,22 @@
self.root_stack_depth = rsd
def need_thread_support(self, gctransformer, getfn):
- # we always have thread support, and it is handled
- # in _declare_functions() already
- pass
+ gc = gctransformer.gcdata.gc
+ #
+ def gc_thread_start():
+ self.allocate_shadow_stack()
+ gc.setup_thread()
+ #
+ def gc_thread_die():
+ gc.teardown_thread()
+ self.free_shadow_stack()
+ #
+ self.thread_start_ptr = getfn(
+ gc_thread_start,
+ [], annmodel.s_None)
+ self.thread_die_ptr = getfn(
+ gc_thread_die,
+ [], annmodel.s_None)
def setup_root_walker(self):
self.allocate_shadow_stack()
diff --git a/pypy/translator/stm/src_stm/core.c b/pypy/translator/stm/src_stm/core.c
--- a/pypy/translator/stm/src_stm/core.c
+++ b/pypy/translator/stm/src_stm/core.c
@@ -12,6 +12,7 @@
/*unsigned long last_known_global_timestamp;*/
owner_version_t my_lock_word;
struct OrecList reads;
+ int active; /* 0 = inactive, 1 = regular, 2 = inevitable */
unsigned num_commits;
unsigned num_aborts[ABORT_REASONS];
unsigned num_spinloops[SPINLOOP_REASONS];
@@ -61,6 +62,7 @@
unsigned int c;
int i;
struct tx_descriptor *d = thread_descriptor;
+ assert(d->active);
d->num_spinloops[num]++;
//printf("tx_spinloop(%d)\n", num);
@@ -80,7 +82,10 @@
static _Bool is_inevitable(struct tx_descriptor *d)
{
- return d->setjmp_buf == NULL;
+ /* Assert that we are running a transaction.
+ Returns True if this transaction is inevitable. */
+ assert(d->active == 1 + !d->setjmp_buf);
+ return d->active == 2;
}
/*** run the redo log to commit a transaction, and release the locks */
@@ -180,6 +185,7 @@
{
d->reads.size = 0;
redolog_clear(&d->redolog);
+ d->active = 0;
}
static void tx_cleanup(struct tx_descriptor *d)
@@ -201,7 +207,7 @@
static void tx_abort(int reason)
{
struct tx_descriptor *d = thread_descriptor;
- assert(!is_inevitable(d));
+ assert(d->active == 1);
d->num_aborts[reason]++;
#ifdef RPY_STM_DEBUG_PRINT
PYPY_DEBUG_START("stm-abort");
@@ -220,7 +226,7 @@
{
int i;
owner_version_t ovt;
- assert(!is_inevitable(d));
+ assert(d->active == 1);
for (i=0; i<d->reads.size; i++)
{
retry:
@@ -251,7 +257,7 @@
{
int i;
owner_version_t ovt;
- assert(!is_inevitable(d));
+ assert(d->active == 1);
for (i=0; i<d->reads.size; i++)
{
ovt = GETVERSION(d->reads.items[i]); // read this orec
@@ -443,10 +449,9 @@
STM_DO_READ(memcpy(dst, src, size));
}
-static void descriptor_init(void)
+long stm_descriptor_init(void)
{
- assert(thread_descriptor == NULL);
- if (1)
+ if (thread_descriptor == NULL)
{
struct tx_descriptor *d = malloc(sizeof(struct tx_descriptor));
memset(d, 0, sizeof(struct tx_descriptor));
@@ -469,13 +474,17 @@
(long)pthread_self(), (long)d->my_lock_word);
PYPY_DEBUG_STOP("stm-init");
#endif
+ return 1;
}
+ else
+ return 0; /* already initialized */
}
-static void descriptor_done(void)
+void stm_descriptor_done(void)
{
struct tx_descriptor *d = thread_descriptor;
assert(d != NULL);
+ assert(d->active == 0);
thread_descriptor = NULL;
@@ -516,6 +525,8 @@
static void begin_transaction(jmp_buf* buf)
{
struct tx_descriptor *d = thread_descriptor;
+ assert(d->active == 0);
+ d->active = 1;
d->setjmp_buf = buf;
d->start_time = (/*d->last_known_global_timestamp*/ global_timestamp) & ~1;
}
@@ -525,6 +536,8 @@
/* Equivalent to begin_transaction(); stm_try_inevitable();
except more efficient */
struct tx_descriptor *d = thread_descriptor;
+ assert(d->active == 0);
+ d->active = 2;
d->setjmp_buf = NULL;
while (1)
@@ -612,8 +625,10 @@
by another thread. We set the lowest bit in global_timestamp
to 1. */
struct tx_descriptor *d = thread_descriptor;
- if (is_inevitable(d))
- return; /* I am already inevitable */
+ if (d == NULL || d->active != 1)
+ return; /* I am already inevitable, or not in a transaction at all
+ (XXX statically we should know when we're outside
+ a transaction) */
#ifdef RPY_STM_DEBUG_PRINT
PYPY_DEBUG_START("stm-inevitable");
@@ -670,7 +685,6 @@
void stm_set_tls(void *newtls)
{
- descriptor_init();
rpython_tls_object = newtls;
}
@@ -681,7 +695,7 @@
void stm_del_tls(void)
{
- descriptor_done();
+ rpython_tls_object = NULL;
}
void *stm_tldict_lookup(void *key)
@@ -714,6 +728,18 @@
} REDOLOG_LOOP_END;
}
+long stm_in_transaction(void)
+{
+ struct tx_descriptor *d = thread_descriptor;
+ return d->active;
+}
+
+long stm_is_inevitable(void)
+{
+ struct tx_descriptor *d = thread_descriptor;
+ return is_inevitable(d);
+}
+
#undef GETVERSION
#undef GETVERSIONREF
#undef SETVERSION
diff --git a/pypy/translator/stm/src_stm/et.h b/pypy/translator/stm/src_stm/et.h
--- a/pypy/translator/stm/src_stm/et.h
+++ b/pypy/translator/stm/src_stm/et.h
@@ -21,9 +21,15 @@
void stm_tldict_add(void *, void *);
void stm_tldict_enum(void);
+long stm_descriptor_init(void);
+void stm_descriptor_done(void);
+
void stm_begin_inevitable_transaction(void);
void stm_commit_transaction(void);
+long stm_in_transaction(void);
+long stm_is_inevitable(void);
+
/* these functions are declared by generated C code from pypy.rlib.rstm
and from the GC (see llop.nop(...)) */
extern void pypy_g__stm_thread_starting(void);
diff --git a/pypy/translator/stm/stmgcintf.py b/pypy/translator/stm/stmgcintf.py
--- a/pypy/translator/stm/stmgcintf.py
+++ b/pypy/translator/stm/stmgcintf.py
@@ -48,6 +48,8 @@
# C part of the implementation of the pypy.rlib.rstm module
in_transaction = smexternal('stm_in_transaction', [], lltype.Signed)
is_inevitable = smexternal('stm_is_inevitable', [], lltype.Signed)
+ descriptor_init = smexternal('stm_descriptor_init', [], lltype.Signed)
+ descriptor_done = smexternal('stm_descriptor_done', [], lltype.Void)
begin_inevitable_transaction = smexternal(
'stm_begin_inevitable_transaction', [], lltype.Void)
commit_transaction = smexternal(
diff --git a/pypy/translator/stm/test/targetdemo2.py b/pypy/translator/stm/test/targetdemo2.py
--- a/pypy/translator/stm/test/targetdemo2.py
+++ b/pypy/translator/stm/test/targetdemo2.py
@@ -127,7 +127,8 @@
def setup_threads():
#space.threadlocals.setup_threads(space)
bootstrapper.setup()
- invoke_around_extcall(rstm.before_external_call, rstm.after_external_call)
+ invoke_around_extcall(rstm.before_external_call, rstm.after_external_call,
+ rstm.enter_callback_call, rstm.leave_callback_call)
def start_thread(args):
bootstrapper.acquire(args)
diff --git a/pypy/translator/stm/transform.py b/pypy/translator/stm/transform.py
--- a/pypy/translator/stm/transform.py
+++ b/pypy/translator/stm/transform.py
@@ -330,3 +330,6 @@
for link in block.exits:
link.args = [renames.get(v, v) for v in link.args]
block.operations = newoperations
+
+# XXX must repeat stm_writebarrier after doing something that can
+# go to the next transaction
More information about the pypy-commit
mailing list