[pypy-commit] pypy stm-thread-2: Implement abortinfo and lastabortinfo. This should give a way
arigo
noreply at buildbot.pypy.org
Fri Feb 22 10:12:43 CET 2013
Author: Armin Rigo <arigo at tunes.org>
Branch: stm-thread-2
Changeset: r61587:63749585b64c
Date: 2013-02-21 22:15 +0100
http://bitbucket.org/pypy/pypy/changeset/63749585b64c/
Log: Implement abortinfo and lastabortinfo. This should give a way for
the RPython program to register its position, and in case of abort,
to read the position of the last aborted transaction.
diff --git a/rpython/rlib/rstm.py b/rpython/rlib/rstm.py
--- a/rpython/rlib/rstm.py
+++ b/rpython/rlib/rstm.py
@@ -4,10 +4,11 @@
from rpython.rlib.objectmodel import keepalive_until_here, specialize
from rpython.rlib.objectmodel import we_are_translated
from rpython.rlib.rposix import get_errno, set_errno
-from rpython.rtyper.lltypesystem import lltype, llmemory, rffi, rclass
+from rpython.rtyper.lltypesystem import lltype, llmemory, rffi, rclass, rstr
from rpython.rtyper.lltypesystem.lloperation import llop
from rpython.rtyper.annlowlevel import cast_instance_to_base_ptr, llhelper
from rpython.rtyper.annlowlevel import cast_base_ptr_to_instance
+from rpython.rtyper.extregistry import ExtRegistryEntry
def is_inevitable():
@@ -32,6 +33,22 @@
def is_atomic():
return stmgcintf.StmOperations.get_atomic()
+def abort_info_push(instance, fieldnames):
+ "Special-cased below."
+
+def abort_info_pop(count):
+ stmgcintf.StmOperations.abort_info_pop(count)
+
+def inspect_abort_info():
+ p = stmgcintf.StmOperations.inspect_abort_info()
+ if p:
+ return rffi.charp2str(p)
+ else:
+ return None
+
+def abort_and_retry():
+ stmgcintf.StmOperations.abort_and_retry()
+
def before_external_call():
if not is_atomic():
e = get_errno()
@@ -106,6 +123,45 @@
# ____________________________________________________________
+class AbortInfoPush(ExtRegistryEntry):
+ _about_ = abort_info_push
+
+ def compute_result_annotation(self, s_instance, s_fieldnames):
+ from rpython.annotator.model import SomeInstance
+ assert isinstance(s_instance, SomeInstance)
+ assert s_fieldnames.is_constant()
+ assert isinstance(s_fieldnames.const, tuple) # tuple of names
+
+ def specialize_call(self, hop):
+ fieldnames = hop.args_s[1].const
+ lst = [len(fieldnames)]
+ v_instance = hop.inputarg(hop.args_r[0], arg=0)
+ STRUCT = v_instance.concretetype.TO
+ for fieldname in fieldnames:
+ fieldname = 'inst_' + fieldname
+ TYPE = getattr(STRUCT, fieldname) #xxx check also in parent structs
+ if TYPE == lltype.Signed:
+ kind = 0
+ elif TYPE == lltype.Unsigned:
+ kind = 1
+ elif TYPE == lltype.Ptr(rstr.STR):
+ kind = 2
+ else:
+ raise NotImplementedError(
+ "abort_info_push(%s, %r): field of type %r"
+ % (STRUCT.__name__, fieldname, TYPE))
+ lst.append(kind)
+ lst.append(llmemory.offsetof(STRUCT, fieldname))
+ ARRAY = rffi.CArray(lltype.Signed)
+ array = lltype.malloc(ARRAY, len(lst), flavor='raw', immortal=True)
+ for i in range(len(lst)):
+ array[i] = lst[i]
+ hop.exception_cannot_occur()
+ c_array = hop.inputconst(lltype.Ptr(ARRAY), array)
+ hop.genop('stm_abort_info_push', [v_instance, c_array])
+
+# ____________________________________________________________
+
class ThreadLocalReference(object):
_COUNT = 0
diff --git a/rpython/rtyper/lltypesystem/lloperation.py b/rpython/rtyper/lltypesystem/lloperation.py
--- a/rpython/rtyper/lltypesystem/lloperation.py
+++ b/rpython/rtyper/lltypesystem/lloperation.py
@@ -431,9 +431,12 @@
#'stm_jit_invoke_code': LLOp(canmallocgc=True),
'stm_threadlocalref_get': LLOp(sideeffects=False),
'stm_threadlocalref_set': LLOp(),
- 'stm_threadlocalref_llset': LLOp(),
+ 'stm_threadlocalref_llset': LLOp(),
'stm_threadlocalref_llcount': LLOp(sideeffects=False),
'stm_threadlocalref_lladdr': LLOp(sideeffects=False),
+ 'stm_abort_info_push': LLOp(),
+ 'stm_extraref_llcount': LLOp(sideeffects=False),
+ 'stm_extraref_lladdr': LLOp(sideeffects=False),
# __________ address operations __________
diff --git a/rpython/rtyper/memory/gc/stmtls.py b/rpython/rtyper/memory/gc/stmtls.py
--- a/rpython/rtyper/memory/gc/stmtls.py
+++ b/rpython/rtyper/memory/gc/stmtls.py
@@ -203,8 +203,9 @@
# Find the roots that are living in raw structures.
self.collect_from_raw_structures()
#
- # Find the roots in the THREADLOCALREF structure.
- self.collect_from_threadlocalref()
+ # Find the roots in the THREADLOCALREF structure, and
+ # the other extra roots hold by C code
+ self.collect_from_threadlocalref_and_misc()
#
# Also find the roots that are the local copy of global objects.
self.collect_roots_from_tldict()
@@ -358,7 +359,7 @@
self.gc.root_walker.walk_current_nongc_roots(
StmGCTLS._trace_drag_out1, self)
- def collect_from_threadlocalref(self):
+ def collect_from_threadlocalref_and_misc(self):
if not we_are_translated():
return
i = llop.stm_threadlocalref_llcount(lltype.Signed)
@@ -367,6 +368,12 @@
root = llop.stm_threadlocalref_lladdr(llmemory.Address, i)
if self.gc.points_to_valid_gc_object(root):
self._trace_drag_out(root, None)
+ i = llop.stm_extraref_llcount(lltype.Signed)
+ while i > 0:
+ i -= 1
+ root = llop.stm_extraref_lladdr(llmemory.Address, i)
+ if self.gc.points_to_valid_gc_object(root):
+ self._trace_drag_out(root, None)
def trace_and_drag_out_of_nursery(self, obj):
# This is called to fix the references inside 'obj', to ensure that
diff --git a/rpython/translator/c/funcgen.py b/rpython/translator/c/funcgen.py
--- a/rpython/translator/c/funcgen.py
+++ b/rpython/translator/c/funcgen.py
@@ -590,6 +590,9 @@
OP_STM_BECOME_INEVITABLE = _OP_STM
OP_STM_BARRIER = _OP_STM
OP_STM_PTR_EQ = _OP_STM
+ OP_STM_ABORT_INFO_PUSH = _OP_STM
+ OP_STM_EXTRAREF_LLCOUNT = _OP_STM
+ OP_STM_EXTRAREF_LLADDR = _OP_STM
def OP_PTR_NONZERO(self, op):
diff --git a/rpython/translator/stm/funcgen.py b/rpython/translator/stm/funcgen.py
--- a/rpython/translator/stm/funcgen.py
+++ b/rpython/translator/stm/funcgen.py
@@ -38,6 +38,20 @@
##def stm_jit_invoke_code(funcgen, op):
## return funcgen.OP_DIRECT_CALL(op)
+def stm_abort_info_push(funcgen, op):
+ arg0 = funcgen.expr(op.args[0])
+ arg1 = funcgen.expr(op.args[1])
+ return 'stm_abort_info_push(%s, %s);' % (arg0, arg1)
+
+def stm_extraref_llcount(funcgen, op):
+ result = funcgen.expr(op.result)
+ return '%s = stm_extraref_llcount();' % (result,)
+
+def stm_extraref_lladdr(funcgen, op):
+ arg0 = funcgen.expr(op.args[0])
+ result = funcgen.expr(op.result)
+ return '%s = stm_extraref_lladdr(%s);' % (result, arg0)
+
def _stm_nogc_init_function():
"""Called at process start-up when running with no GC."""
StmOperations.descriptor_init()
diff --git a/rpython/translator/stm/src_stm/et.c b/rpython/translator/stm/src_stm/et.c
--- a/rpython/translator/stm/src_stm/et.c
+++ b/rpython/translator/stm/src_stm/et.c
@@ -55,6 +55,8 @@
struct GcPtrList gcroots;
struct G2L global_to_local;
struct GcPtrList undolog;
+ struct GcPtrList abortinfo;
+ char *lastabortinfo;
struct FXCache recent_reads_cache;
};
@@ -368,10 +370,13 @@
spinloop();
}
+size_t _stm_decode_abort_info(struct tx_descriptor *d, char *output);
+
static void AbortTransaction(int num)
{
struct tx_descriptor *d = thread_descriptor;
unsigned long limit;
+ size_t size;
assert(d->active);
assert(!is_inevitable(d));
assert(num < ABORT_REASONS);
@@ -379,6 +384,16 @@
CancelLocks(d);
+ /* decode the 'abortinfo' and produce a human-readable summary in
+ the string 'lastabortinfo' */
+ size = _stm_decode_abort_info(d, NULL);
+ free(d->lastabortinfo);
+ d->lastabortinfo = malloc(size);
+ if (d->lastabortinfo != NULL)
+ _stm_decode_abort_info(d, d->lastabortinfo);
+
+ /* run the undo log in reverse order, cancelling the values set by
+ stm_ThreadLocalRef_LLSet(). */
if (d->undolog.size > 0) {
gcptr *item = d->undolog.items;
long i;
@@ -439,6 +454,7 @@
d->count_reads = 0;
fxcache_clear(&d->recent_reads_cache);
gcptrlist_clear(&d->undolog);
+ gcptrlist_clear(&d->abortinfo);
}
void BeginTransaction(jmp_buf* buf)
diff --git a/rpython/translator/stm/src_stm/et.h b/rpython/translator/stm/src_stm/et.h
--- a/rpython/translator/stm/src_stm/et.h
+++ b/rpython/translator/stm/src_stm/et.h
@@ -122,6 +122,11 @@
void stm_perform_transaction(long(*callback)(void*, long), void *arg,
void *save_and_restore);
void stm_abort_and_retry(void);
+void stm_abort_info_push(void *, void *);
+void stm_abort_info_pop(long);
+char *stm_inspect_abort_info(void);
+long stm_extraref_llcount(void);
+gcptr stm_extraref_lladdr(long);
#ifdef USING_NO_GC_AT_ALL
# define OP_GC_ADR_OF_ROOT_STACK_TOP(r) r = NULL
diff --git a/rpython/translator/stm/src_stm/lists.c b/rpython/translator/stm/src_stm/lists.c
--- a/rpython/translator/stm/src_stm/lists.c
+++ b/rpython/translator/stm/src_stm/lists.c
@@ -254,6 +254,11 @@
gcptrlist->size = i + 2;
}
+static void gcptrlist_reduce_size(struct GcPtrList *gcptrlist, long newsize)
+{
+ gcptrlist->size = newsize;
+}
+
/************************************************************/
/* The fxcache_xx functions implement a fixed-size set of gcptr's.
diff --git a/rpython/translator/stm/src_stm/rpyintf.c b/rpython/translator/stm/src_stm/rpyintf.c
--- a/rpython/translator/stm/src_stm/rpyintf.c
+++ b/rpython/translator/stm/src_stm/rpyintf.c
@@ -196,6 +196,83 @@
AbortTransaction(4); /* manual abort */
}
+void stm_abort_info_push(void *obj, void *fieldoffsets)
+{
+ struct tx_descriptor *d = thread_descriptor;
+ gcptrlist_insert2(&d->abortinfo, (gcptr)obj, (gcptr)fieldoffsets);
+}
+
+void stm_abort_info_pop(long count)
+{
+ struct tx_descriptor *d = thread_descriptor;
+ long newsize = d->abortinfo.size - 2 * count;
+ gcptrlist_reduce_size(&d->abortinfo, newsize < 0 ? 0 : newsize);
+}
+
+size_t _stm_decode_abort_info(struct tx_descriptor *d, char *output)
+{
+ size_t totalsize = 0;
+ long i;
+#define WRITE(c) { totalsize++; if (output) *output++=(c); }
+ for (i=0; i<d->abortinfo.size; i+=2) {
+ char *object = (char*)d->abortinfo.items[i+0];
+ long *fieldoffsets = (long*)d->abortinfo.items[i+1];
+ long j;
+ for (j=0; j<fieldoffsets[0]; j++) {
+ long kind = fieldoffsets[1+2*j+0];
+ long offset = fieldoffsets[1+2*j+1];
+ char buffer[24];
+ char *result = buffer;
+ size_t res_size;
+ RPyString *rps;
+ switch(kind) {
+ case 0: /* signed */
+ res_size = sprintf(buffer, "%ld", *(long*)(object + offset));
+ break;
+ case 1: /* unsigned */
+ res_size = sprintf(buffer, "%lu",
+ *(unsigned long*)(object + offset));
+ break;
+ case 2: /* pointer to STR */
+ rps = *(RPyString **)(object + offset);
+ res_size = RPyString_Size(rps);
+ result = _RPyString_AsString(rps);
+ break;
+ default:
+ fprintf(stderr, "Fatal RPython error: corrupted abort log\n");
+ abort();
+ }
+ while (res_size > 0) {
+ WRITE(*result);
+ result++;
+ res_size--;
+ }
+ WRITE('\n');
+ }
+ }
+ WRITE('\0'); /* final null character */
+#undef WRITE
+ return totalsize;
+}
+
+char *stm_inspect_abort_info(void)
+{
+ struct tx_descriptor *d = thread_descriptor;
+ return d->lastabortinfo;
+}
+
+long stm_extraref_llcount(void)
+{
+ struct tx_descriptor *d = thread_descriptor;
+ return d->abortinfo.size / 2;
+}
+
+gcptr stm_extraref_lladdr(long index)
+{
+ struct tx_descriptor *d = thread_descriptor;
+ return d->abortinfo.items[index * 2];
+}
+
#ifdef USING_NO_GC_AT_ALL
static __thread gcptr stm_nogc_chained_list;
void stm_nogc_start_transaction(void)
diff --git a/rpython/translator/stm/stmgcintf.py b/rpython/translator/stm/stmgcintf.py
--- a/rpython/translator/stm/stmgcintf.py
+++ b/rpython/translator/stm/stmgcintf.py
@@ -68,5 +68,10 @@
set_transaction_length = smexternal('stm_set_transaction_length',
[lltype.Signed], lltype.Void)
+ abort_info_pop = smexternal('stm_abort_info_pop',
+ [lltype.Signed], lltype.Void)
+ inspect_abort_info = smexternal('stm_inspect_abort_info',
+ [], rffi.CCHARP)
+
# for testing
- abort_and_retry = smexternal('stm_abort_and_retry', [], lltype.Void)
+ abort_and_retry = smexternal('stm_abort_and_retry', [], lltype.Void)
diff --git a/rpython/translator/stm/test/test_ztranslated.py b/rpython/translator/stm/test/test_ztranslated.py
--- a/rpython/translator/stm/test/test_ztranslated.py
+++ b/rpython/translator/stm/test/test_ztranslated.py
@@ -128,3 +128,32 @@
t, cbuilder = self.compile(main)
data = cbuilder.cmdexec('')
assert 'ok\n' in data
+
+ def test_abort_info(self):
+ from rpython.rtyper.lltypesystem.rclass import OBJECTPTR
+
+ class Foobar(object):
+ pass
+ globf = Foobar()
+
+ def check(_, retry_counter):
+ globf.xy = 100 + retry_counter
+ rstm.abort_info_push(globf, ('xy', 'yx'))
+ if retry_counter < 3:
+ rstm.abort_and_retry()
+ #
+ print rstm.inspect_abort_info()
+ #
+ rstm.abort_info_pop(2)
+ return 0
+
+ PS = lltype.Ptr(lltype.GcStruct('S', ('got_exception', OBJECTPTR)))
+ perform_transaction = rstm.make_perform_transaction(check, PS)
+
+ def main(argv):
+ globf.yx = 'hi there %d' % len(argv)
+ perform_transaction(lltype.nullptr(PS.TO))
+ return 0
+ t, cbuilder = self.compile(main)
+ data = cbuilder.cmdexec('a b')
+ assert '102\nhi there 3\n\n' in data
More information about the pypy-commit
mailing list