[pypy-commit] pypy stm-thread-2: Implement abortinfo and lastabortinfo. This should give a way

arigo noreply at buildbot.pypy.org
Fri Feb 22 10:12:43 CET 2013


Author: Armin Rigo <arigo at tunes.org>
Branch: stm-thread-2
Changeset: r61587:63749585b64c
Date: 2013-02-21 22:15 +0100
http://bitbucket.org/pypy/pypy/changeset/63749585b64c/

Log:	Implement abortinfo and lastabortinfo. This should give a way for
	the RPython program to register its position, and in case of abort,
	to read the position of the last aborted transaction.

diff --git a/rpython/rlib/rstm.py b/rpython/rlib/rstm.py
--- a/rpython/rlib/rstm.py
+++ b/rpython/rlib/rstm.py
@@ -4,10 +4,11 @@
 from rpython.rlib.objectmodel import keepalive_until_here, specialize
 from rpython.rlib.objectmodel import we_are_translated
 from rpython.rlib.rposix import get_errno, set_errno
-from rpython.rtyper.lltypesystem import lltype, llmemory, rffi, rclass
+from rpython.rtyper.lltypesystem import lltype, llmemory, rffi, rclass, rstr
 from rpython.rtyper.lltypesystem.lloperation import llop
 from rpython.rtyper.annlowlevel import cast_instance_to_base_ptr, llhelper
 from rpython.rtyper.annlowlevel import cast_base_ptr_to_instance
+from rpython.rtyper.extregistry import ExtRegistryEntry
 
 
 def is_inevitable():
@@ -32,6 +33,22 @@
 def is_atomic():
     return stmgcintf.StmOperations.get_atomic()
 
+def abort_info_push(instance, fieldnames):
+    "Special-cased below."
+
+def abort_info_pop(count):
+    stmgcintf.StmOperations.abort_info_pop(count)
+
+def inspect_abort_info():
+    p = stmgcintf.StmOperations.inspect_abort_info()
+    if p:
+        return rffi.charp2str(p)
+    else:
+        return None
+
+def abort_and_retry():
+    stmgcintf.StmOperations.abort_and_retry()
+
 def before_external_call():
     if not is_atomic():
         e = get_errno()
@@ -106,6 +123,45 @@
 
 # ____________________________________________________________
 
+class AbortInfoPush(ExtRegistryEntry):
+    _about_ = abort_info_push
+
+    def compute_result_annotation(self, s_instance, s_fieldnames):
+        from rpython.annotator.model import SomeInstance
+        assert isinstance(s_instance, SomeInstance)
+        assert s_fieldnames.is_constant()
+        assert isinstance(s_fieldnames.const, tuple)  # tuple of names
+
+    def specialize_call(self, hop):
+        fieldnames = hop.args_s[1].const
+        lst = [len(fieldnames)]
+        v_instance = hop.inputarg(hop.args_r[0], arg=0)
+        STRUCT = v_instance.concretetype.TO
+        for fieldname in fieldnames:
+            fieldname = 'inst_' + fieldname
+            TYPE = getattr(STRUCT, fieldname) #xxx check also in parent structs
+            if TYPE == lltype.Signed:
+                kind = 0
+            elif TYPE == lltype.Unsigned:
+                kind = 1
+            elif TYPE == lltype.Ptr(rstr.STR):
+                kind = 2
+            else:
+                raise NotImplementedError(
+                    "abort_info_push(%s, %r): field of type %r"
+                    % (STRUCT.__name__, fieldname, TYPE))
+            lst.append(kind)
+            lst.append(llmemory.offsetof(STRUCT, fieldname))
+        ARRAY = rffi.CArray(lltype.Signed)
+        array = lltype.malloc(ARRAY, len(lst), flavor='raw', immortal=True)
+        for i in range(len(lst)):
+            array[i] = lst[i]
+        hop.exception_cannot_occur()
+        c_array = hop.inputconst(lltype.Ptr(ARRAY), array)
+        hop.genop('stm_abort_info_push', [v_instance, c_array])
+
+# ____________________________________________________________
+
 class ThreadLocalReference(object):
     _COUNT = 0
 
diff --git a/rpython/rtyper/lltypesystem/lloperation.py b/rpython/rtyper/lltypesystem/lloperation.py
--- a/rpython/rtyper/lltypesystem/lloperation.py
+++ b/rpython/rtyper/lltypesystem/lloperation.py
@@ -431,9 +431,12 @@
     #'stm_jit_invoke_code':    LLOp(canmallocgc=True),
     'stm_threadlocalref_get': LLOp(sideeffects=False),
     'stm_threadlocalref_set': LLOp(),
-    'stm_threadlocalref_llset': LLOp(),
+    'stm_threadlocalref_llset':   LLOp(),
     'stm_threadlocalref_llcount': LLOp(sideeffects=False),
     'stm_threadlocalref_lladdr':  LLOp(sideeffects=False),
+    'stm_abort_info_push':    LLOp(),
+    'stm_extraref_llcount':   LLOp(sideeffects=False),
+    'stm_extraref_lladdr':    LLOp(sideeffects=False),
 
     # __________ address operations __________
 
diff --git a/rpython/rtyper/memory/gc/stmtls.py b/rpython/rtyper/memory/gc/stmtls.py
--- a/rpython/rtyper/memory/gc/stmtls.py
+++ b/rpython/rtyper/memory/gc/stmtls.py
@@ -203,8 +203,9 @@
         # Find the roots that are living in raw structures.
         self.collect_from_raw_structures()
         #
-        # Find the roots in the THREADLOCALREF structure.
-        self.collect_from_threadlocalref()
+        # Find the roots in the THREADLOCALREF structure, and
+        # the other extra roots hold by C code
+        self.collect_from_threadlocalref_and_misc()
         #
         # Also find the roots that are the local copy of global objects.
         self.collect_roots_from_tldict()
@@ -358,7 +359,7 @@
         self.gc.root_walker.walk_current_nongc_roots(
             StmGCTLS._trace_drag_out1, self)
 
-    def collect_from_threadlocalref(self):
+    def collect_from_threadlocalref_and_misc(self):
         if not we_are_translated():
             return
         i = llop.stm_threadlocalref_llcount(lltype.Signed)
@@ -367,6 +368,12 @@
             root = llop.stm_threadlocalref_lladdr(llmemory.Address, i)
             if self.gc.points_to_valid_gc_object(root):
                 self._trace_drag_out(root, None)
+        i = llop.stm_extraref_llcount(lltype.Signed)
+        while i > 0:
+            i -= 1
+            root = llop.stm_extraref_lladdr(llmemory.Address, i)
+            if self.gc.points_to_valid_gc_object(root):
+                self._trace_drag_out(root, None)
 
     def trace_and_drag_out_of_nursery(self, obj):
         # This is called to fix the references inside 'obj', to ensure that
diff --git a/rpython/translator/c/funcgen.py b/rpython/translator/c/funcgen.py
--- a/rpython/translator/c/funcgen.py
+++ b/rpython/translator/c/funcgen.py
@@ -590,6 +590,9 @@
     OP_STM_BECOME_INEVITABLE = _OP_STM
     OP_STM_BARRIER = _OP_STM
     OP_STM_PTR_EQ = _OP_STM
+    OP_STM_ABORT_INFO_PUSH = _OP_STM
+    OP_STM_EXTRAREF_LLCOUNT = _OP_STM
+    OP_STM_EXTRAREF_LLADDR = _OP_STM
 
 
     def OP_PTR_NONZERO(self, op):
diff --git a/rpython/translator/stm/funcgen.py b/rpython/translator/stm/funcgen.py
--- a/rpython/translator/stm/funcgen.py
+++ b/rpython/translator/stm/funcgen.py
@@ -38,6 +38,20 @@
 ##def stm_jit_invoke_code(funcgen, op):
 ##    return funcgen.OP_DIRECT_CALL(op)
 
+def stm_abort_info_push(funcgen, op):
+    arg0 = funcgen.expr(op.args[0])
+    arg1 = funcgen.expr(op.args[1])
+    return 'stm_abort_info_push(%s, %s);' % (arg0, arg1)
+
+def stm_extraref_llcount(funcgen, op):
+    result = funcgen.expr(op.result)
+    return '%s = stm_extraref_llcount();' % (result,)
+
+def stm_extraref_lladdr(funcgen, op):
+    arg0 = funcgen.expr(op.args[0])
+    result = funcgen.expr(op.result)
+    return '%s = stm_extraref_lladdr(%s);' % (result, arg0)
+
 def _stm_nogc_init_function():
     """Called at process start-up when running with no GC."""
     StmOperations.descriptor_init()
diff --git a/rpython/translator/stm/src_stm/et.c b/rpython/translator/stm/src_stm/et.c
--- a/rpython/translator/stm/src_stm/et.c
+++ b/rpython/translator/stm/src_stm/et.c
@@ -55,6 +55,8 @@
   struct GcPtrList gcroots;
   struct G2L global_to_local;
   struct GcPtrList undolog;
+  struct GcPtrList abortinfo;
+  char *lastabortinfo;
   struct FXCache recent_reads_cache;
 };
 
@@ -368,10 +370,13 @@
   spinloop();
 }
 
+size_t _stm_decode_abort_info(struct tx_descriptor *d, char *output);
+
 static void AbortTransaction(int num)
 {
   struct tx_descriptor *d = thread_descriptor;
   unsigned long limit;
+  size_t size;
   assert(d->active);
   assert(!is_inevitable(d));
   assert(num < ABORT_REASONS);
@@ -379,6 +384,16 @@
 
   CancelLocks(d);
 
+  /* decode the 'abortinfo' and produce a human-readable summary in
+     the string 'lastabortinfo' */
+  size = _stm_decode_abort_info(d, NULL);
+  free(d->lastabortinfo);
+  d->lastabortinfo = malloc(size);
+  if (d->lastabortinfo != NULL)
+    _stm_decode_abort_info(d, d->lastabortinfo);
+
+  /* run the undo log in reverse order, cancelling the values set by
+     stm_ThreadLocalRef_LLSet(). */
   if (d->undolog.size > 0) {
       gcptr *item = d->undolog.items;
       long i;
@@ -439,6 +454,7 @@
   d->count_reads = 0;
   fxcache_clear(&d->recent_reads_cache);
   gcptrlist_clear(&d->undolog);
+  gcptrlist_clear(&d->abortinfo);
 }
 
 void BeginTransaction(jmp_buf* buf)
diff --git a/rpython/translator/stm/src_stm/et.h b/rpython/translator/stm/src_stm/et.h
--- a/rpython/translator/stm/src_stm/et.h
+++ b/rpython/translator/stm/src_stm/et.h
@@ -122,6 +122,11 @@
 void stm_perform_transaction(long(*callback)(void*, long), void *arg,
                              void *save_and_restore);
 void stm_abort_and_retry(void);
+void stm_abort_info_push(void *, void *);
+void stm_abort_info_pop(long);
+char *stm_inspect_abort_info(void);
+long stm_extraref_llcount(void);
+gcptr stm_extraref_lladdr(long);
 
 #ifdef USING_NO_GC_AT_ALL
 # define OP_GC_ADR_OF_ROOT_STACK_TOP(r)   r = NULL
diff --git a/rpython/translator/stm/src_stm/lists.c b/rpython/translator/stm/src_stm/lists.c
--- a/rpython/translator/stm/src_stm/lists.c
+++ b/rpython/translator/stm/src_stm/lists.c
@@ -254,6 +254,11 @@
   gcptrlist->size = i + 2;
 }
 
+static void gcptrlist_reduce_size(struct GcPtrList *gcptrlist, long newsize)
+{
+  gcptrlist->size = newsize;
+}
+
 /************************************************************/
 
 /* The fxcache_xx functions implement a fixed-size set of gcptr's.
diff --git a/rpython/translator/stm/src_stm/rpyintf.c b/rpython/translator/stm/src_stm/rpyintf.c
--- a/rpython/translator/stm/src_stm/rpyintf.c
+++ b/rpython/translator/stm/src_stm/rpyintf.c
@@ -196,6 +196,83 @@
   AbortTransaction(4);    /* manual abort */
 }
 
+void stm_abort_info_push(void *obj, void *fieldoffsets)
+{
+  struct tx_descriptor *d = thread_descriptor;
+  gcptrlist_insert2(&d->abortinfo, (gcptr)obj, (gcptr)fieldoffsets);
+}
+
+void stm_abort_info_pop(long count)
+{
+  struct tx_descriptor *d = thread_descriptor;
+  long newsize = d->abortinfo.size - 2 * count;
+  gcptrlist_reduce_size(&d->abortinfo, newsize < 0 ? 0 : newsize);
+}
+
+size_t _stm_decode_abort_info(struct tx_descriptor *d, char *output)
+{
+    size_t totalsize = 0;
+    long i;
+#define WRITE(c)   { totalsize++; if (output) *output++=(c); }
+    for (i=0; i<d->abortinfo.size; i+=2) {
+        char *object       = (char*)d->abortinfo.items[i+0];
+        long *fieldoffsets = (long*)d->abortinfo.items[i+1];
+        long j;
+        for (j=0; j<fieldoffsets[0]; j++) {
+            long kind   = fieldoffsets[1+2*j+0];
+            long offset = fieldoffsets[1+2*j+1];
+            char buffer[24];
+            char *result = buffer;
+            size_t res_size;
+            RPyString *rps;
+            switch(kind) {
+            case 0:    /* signed */
+                res_size = sprintf(buffer, "%ld", *(long*)(object + offset));
+                break;
+            case 1:    /* unsigned */
+                res_size = sprintf(buffer, "%lu",
+                                   *(unsigned long*)(object + offset));
+                break;
+            case 2:    /* pointer to STR */
+                rps = *(RPyString **)(object + offset);
+                res_size = RPyString_Size(rps);
+                result = _RPyString_AsString(rps);
+                break;
+            default:
+                fprintf(stderr, "Fatal RPython error: corrupted abort log\n");
+                abort();
+            }
+            while (res_size > 0) {
+                WRITE(*result);
+                result++;
+                res_size--;
+            }
+            WRITE('\n');
+        }
+    }
+    WRITE('\0');   /* final null character */
+#undef WRITE
+    return totalsize;
+}
+
+char *stm_inspect_abort_info(void)
+{
+    struct tx_descriptor *d = thread_descriptor;
+    return d->lastabortinfo;
+}
+
+long stm_extraref_llcount(void)
+{
+    struct tx_descriptor *d = thread_descriptor;
+    return d->abortinfo.size / 2;
+}
+
+gcptr stm_extraref_lladdr(long index)
+{
+    struct tx_descriptor *d = thread_descriptor;
+    return d->abortinfo.items[index * 2];
+}
+
 #ifdef USING_NO_GC_AT_ALL
 static __thread gcptr stm_nogc_chained_list;
 void stm_nogc_start_transaction(void)
diff --git a/rpython/translator/stm/stmgcintf.py b/rpython/translator/stm/stmgcintf.py
--- a/rpython/translator/stm/stmgcintf.py
+++ b/rpython/translator/stm/stmgcintf.py
@@ -68,5 +68,10 @@
     set_transaction_length = smexternal('stm_set_transaction_length',
                                         [lltype.Signed], lltype.Void)
 
+    abort_info_pop = smexternal('stm_abort_info_pop',
+                                [lltype.Signed], lltype.Void)
+    inspect_abort_info = smexternal('stm_inspect_abort_info',
+                                    [], rffi.CCHARP)
+
     # for testing
-    abort_and_retry  = smexternal('stm_abort_and_retry', [], lltype.Void)
+    abort_and_retry = smexternal('stm_abort_and_retry', [], lltype.Void)
diff --git a/rpython/translator/stm/test/test_ztranslated.py b/rpython/translator/stm/test/test_ztranslated.py
--- a/rpython/translator/stm/test/test_ztranslated.py
+++ b/rpython/translator/stm/test/test_ztranslated.py
@@ -128,3 +128,32 @@
         t, cbuilder = self.compile(main)
         data = cbuilder.cmdexec('')
         assert 'ok\n' in data
+
+    def test_abort_info(self):
+        from rpython.rtyper.lltypesystem.rclass import OBJECTPTR
+
+        class Foobar(object):
+            pass
+        globf = Foobar()
+
+        def check(_, retry_counter):
+            globf.xy = 100 + retry_counter
+            rstm.abort_info_push(globf, ('xy', 'yx'))
+            if retry_counter < 3:
+                rstm.abort_and_retry()
+            #
+            print rstm.inspect_abort_info()
+            #
+            rstm.abort_info_pop(2)
+            return 0
+
+        PS = lltype.Ptr(lltype.GcStruct('S', ('got_exception', OBJECTPTR)))
+        perform_transaction = rstm.make_perform_transaction(check, PS)
+
+        def main(argv):
+            globf.yx = 'hi there %d' % len(argv)
+            perform_transaction(lltype.nullptr(PS.TO))
+            return 0
+        t, cbuilder = self.compile(main)
+        data = cbuilder.cmdexec('a b')
+        assert '102\nhi there 3\n\n' in data


More information about the pypy-commit mailing list