[pypy-commit] pypy stm-thread: In-progress

arigo noreply at buildbot.pypy.org
Sat May 5 17:21:55 CEST 2012


Author: Armin Rigo <arigo at tunes.org>
Branch: stm-thread
Changeset: r54900:ef0813096a29
Date: 2012-05-05 17:21 +0200
http://bitbucket.org/pypy/pypy/changeset/ef0813096a29/

Log:	In-progress

diff --git a/pypy/rlib/objectmodel.py b/pypy/rlib/objectmodel.py
--- a/pypy/rlib/objectmodel.py
+++ b/pypy/rlib/objectmodel.py
@@ -476,7 +476,8 @@
 def hlinvoke(repr, llcallable, *args):
     raise TypeError, "hlinvoke is meant to be rtyped and not called direclty"
 
-def invoke_around_extcall(before, after):
+def invoke_around_extcall(before, after,
+                          enter_callback=None, leave_callback=None):
     """Call before() before any external function call, and after() after.
     At the moment only one pair before()/after() can be registered at a time.
     """
@@ -490,6 +491,13 @@
     from pypy.rpython.annlowlevel import llhelper
     llhelper(rffi.AroundFnPtr, before)
     llhelper(rffi.AroundFnPtr, after)
+    # do the same thing about enter/leave_callback
+    if enter_callback is not None:
+        rffi.aroundstate.enter_callback = enter_callback
+        llhelper(rffi.EnterCallbackFnPtr, enter_callback)
+    if leave_callback is not None:
+        rffi.aroundstate.leave_callback = leave_callback
+        llhelper(rffi.LeaveCallbackFnPtr, leave_callback)
 
 def is_in_callback():
     from pypy.rpython.lltypesystem import rffi
diff --git a/pypy/rlib/rstm.py b/pypy/rlib/rstm.py
--- a/pypy/rlib/rstm.py
+++ b/pypy/rlib/rstm.py
@@ -1,4 +1,5 @@
 from pypy.translator.stm import stmgcintf
+from pypy.rlib.debug import ll_assert
 
 
 def before_external_call():
@@ -11,6 +12,20 @@
 after_external_call._gctransformer_hint_cannot_collect_ = True
 after_external_call._dont_reach_me_in_del_ = True
 
+def enter_callback_call():
+    new_thread = stmgcintf.StmOperations.descriptor_init()
+    stmgcintf.StmOperations.begin_inevitable_transaction()
+    return new_thread
+enter_callback_call._gctransformer_hint_cannot_collect_ = True
+enter_callback_call._dont_reach_me_in_del_ = True
+
+def leave_callback_call(token):
+    stmgcintf.StmOperations.commit_transaction()
+    if token == 1:
+        stmgcintf.StmOperations.descriptor_done()
+leave_callback_call._gctransformer_hint_cannot_collect_ = True
+leave_callback_call._dont_reach_me_in_del_ = True
+
 def do_yield_thread():
     stmgcintf.StmOperations.do_yield_thread()
 do_yield_thread._gctransformer_hint_close_stack_ = True
diff --git a/pypy/rpython/llinterp.py b/pypy/rpython/llinterp.py
--- a/pypy/rpython/llinterp.py
+++ b/pypy/rpython/llinterp.py
@@ -964,8 +964,6 @@
     op_stm_writebarrier = _stm_not_implemented
     op_stm_normalize_global = _stm_not_implemented
     op_stm_become_inevitable = _stm_not_implemented
-    op_stm_thread_starting = _stm_not_implemented
-    op_stm_thread_stopping = _stm_not_implemented
 
     # operations on pyobjects!
     for opname in lloperation.opimpls.keys():
diff --git a/pypy/rpython/lltypesystem/opimpl.py b/pypy/rpython/lltypesystem/opimpl.py
--- a/pypy/rpython/lltypesystem/opimpl.py
+++ b/pypy/rpython/lltypesystem/opimpl.py
@@ -629,12 +629,6 @@
 def op_stm_stop_transaction():
     pass
 
-def op_stm_enter_transactional_mode():
-    pass
-
-def op_stm_leave_transactional_mode():
-    pass
-
 def op_nop(x):
     pass
 
diff --git a/pypy/rpython/lltypesystem/rffi.py b/pypy/rpython/lltypesystem/rffi.py
--- a/pypy/rpython/lltypesystem/rffi.py
+++ b/pypy/rpython/lltypesystem/rffi.py
@@ -281,10 +281,12 @@
     source = py.code.Source(r"""
         def wrapper(%s):    # no *args - no GIL for mallocing the tuple
             llop.gc_stack_bottom(lltype.Void)   # marker for trackgcroot.py
+            token = 0
             if aroundstate is not None:
-                after = aroundstate.after
-                if after:
-                    after()
+                if aroundstate.enter_callback is not None:
+                    token = aroundstate.enter_callback()
+                elif aroundstate.after is not None:
+                    aroundstate.after()
             # from now on we hold the GIL
             stackcounter.stacks_counter += 1
             try:
@@ -299,9 +301,10 @@
                 result = errorcode
             stackcounter.stacks_counter -= 1
             if aroundstate is not None:
-                before = aroundstate.before
-                if before:
-                    before()
+                if aroundstate.leave_callback is not None:
+                    aroundstate.leave_callback(token)
+                elif aroundstate.before is not None:
+                    aroundstate.before()
             # here we don't hold the GIL any more. As in the wrapper() produced
             # by llexternal, it is essential that no exception checking occurs
             # after the call to before().
@@ -317,11 +320,15 @@
 _make_wrapper_for._annspecialcase_ = 'specialize:memo'
 
 AroundFnPtr = lltype.Ptr(lltype.FuncType([], lltype.Void))
+EnterCallbackFnPtr = lltype.Ptr(lltype.FuncType([], lltype.Signed))
+LeaveCallbackFnPtr = lltype.Ptr(lltype.FuncType([lltype.Signed], lltype.Void))
 class AroundState:
     _alloc_flavor_ = "raw"
     def _freeze_(self):
         self.before = None    # or a regular RPython function
         self.after = None     # or a regular RPython function
+        self.enter_callback = None
+        self.leave_callback = None
         return False
 aroundstate = AroundState()
 aroundstate._freeze_()
diff --git a/pypy/rpython/memory/gc/stmgc.py b/pypy/rpython/memory/gc/stmgc.py
--- a/pypy/rpython/memory/gc/stmgc.py
+++ b/pypy/rpython/memory/gc/stmgc.py
@@ -124,7 +124,6 @@
 GCFLAG_VISITED    = first_gcflag << 2
 GCFLAG_HAS_SHADOW = first_gcflag << 3
 GCFLAG_FIXED_HASH = first_gcflag << 4
-GCFLAG_PREBUILT   = first_gcflag << 5
 
 
 def always_inline(fn):
@@ -202,16 +201,30 @@
         #
         self.sharedarea.setup()
         #
+        self.stm_operations.descriptor_init()
+        self.stm_operations.begin_inevitable_transaction()
         self.setup_thread()
 
     def setup_thread(self):
+        """Build the StmGCTLS object and start a transaction at the level
+        of the GC.  The C-level transaction should already be started."""
+        ll_assert(self.stm_operations.in_transaction(),
+                  "setup_thread: not in a transaction")
         from pypy.rpython.memory.gc.stmtls import StmGCTLS
-        StmGCTLS(self).start_transaction()
+        stmtls = StmGCTLS(self)
+        stmtls.start_transaction()
 
     def teardown_thread(self):
-        self.stm_operations.try_inevitable()
+        """Stop the current transaction, commit it at the level of
+        C code, and tear down the StmGCTLS object.  For symmetry, this
+        ensures that the level of C has another (empty) transaction
+        started."""
+        ll_assert(bool(self.stm_operations.in_transaction()),
+                  "teardown_thread: not in a transaction")
         stmtls = self.get_tls()
         stmtls.stop_transaction()
+        self.stm_operations.commit_transaction()
+        self.stm_operations.begin_inevitable_transaction()
         stmtls.delete()
 
     @always_inline
@@ -287,7 +300,7 @@
         hdr.tid = self.combine(typeid16, flags)
 
     def init_gc_object_immortal(self, addr, typeid16, flags=0):
-        flags |= GCFLAG_GLOBAL | GCFLAG_PREBUILT
+        flags |= GCFLAG_GLOBAL
         self.init_gc_object(addr, typeid16, flags)
 
     # ----------
diff --git a/pypy/rpython/memory/gc/stmtls.py b/pypy/rpython/memory/gc/stmtls.py
--- a/pypy/rpython/memory/gc/stmtls.py
+++ b/pypy/rpython/memory/gc/stmtls.py
@@ -11,7 +11,6 @@
 from pypy.rpython.memory.gc.stmgc import always_inline, dont_inline
 from pypy.rpython.memory.gc.stmgc import GCFLAG_GLOBAL, GCFLAG_VISITED
 from pypy.rpython.memory.gc.stmgc import GCFLAG_WAS_COPIED, GCFLAG_HAS_SHADOW
-from pypy.rpython.memory.gc.stmgc import GCFLAG_PREBUILT
 
 
 class StmGCTLS(object):
diff --git a/pypy/rpython/memory/gctransform/stmframework.py b/pypy/rpython/memory/gctransform/stmframework.py
--- a/pypy/rpython/memory/gctransform/stmframework.py
+++ b/pypy/rpython/memory/gctransform/stmframework.py
@@ -12,33 +12,13 @@
 class StmFrameworkGCTransformer(FrameworkGCTransformer):
 
     def _declare_functions(self, GCClass, getfn, s_gc, *args):
-        gc = self.gcdata.gc
-        #
-        def gc_thread_start():
-            self.root_walker.allocate_shadow_stack()
-            gc.setup_thread()
-        #
-        def gc_thread_die():
-            gc.teardown_thread()
-            self.root_walker.free_shadow_stack()
-        #
-        #def start_transaction(gc):
-        #    self.root_walker.start_transaction()
-        #    gc.start_transaction()
-        #
         super(StmFrameworkGCTransformer, self)._declare_functions(
             GCClass, getfn, s_gc, *args)
-        self.thread_start_ptr = getfn(
-            gc_thread_start,
-            [], annmodel.s_None)
-        self.thread_die_ptr = getfn(
-            gc_thread_die,
-            [], annmodel.s_None)
         self.stm_writebarrier_ptr = getfn(
-            gc.stm_writebarrier,
+            self.gcdata.gc.stm_writebarrier,
             [annmodel.SomeAddress()], annmodel.SomeAddress())
         self.stm_normalize_global_ptr = getfn(
-            gc.stm_normalize_global,
+            self.gcdata.gc.stm_normalize_global,
             [annmodel.SomeAddress()], annmodel.SomeAddress())
 
     def build_root_walker(self):
@@ -55,24 +35,6 @@
                                  resulttype=llmemory.Address)
         hop.genop('adr_add', [v_gcdata_adr, c_ofs], resultvar=op.result)
 
-    def gct_stm_thread_starting(self, hop):
-        hop.genop("direct_call", [self.thread_starting_ptr, self.c_const_gc])
-
-    def gct_stm_thread_stopping(self, hop):
-        hop.genop("direct_call", [self.thread_stopping_ptr, self.c_const_gc])
-
-    def gct_stm_enter_transactional_mode(self, hop):
-        livevars = self.push_roots(hop)
-        hop.genop("direct_call", [self.stm_enter_transactional_mode_ptr,
-                                  self.c_const_gc])
-        self.pop_roots(hop, livevars)
-
-    def gct_stm_leave_transactional_mode(self, hop):
-        livevars = self.push_roots(hop)
-        hop.genop("direct_call", [self.stm_leave_transactional_mode_ptr,
-                                  self.c_const_gc])
-        self.pop_roots(hop, livevars)
-
     def gct_stm_writebarrier(self, hop):
         op = hop.spaceop
         v_adr = hop.genop('cast_ptr_to_adr',
@@ -149,9 +111,22 @@
             self.root_stack_depth = rsd
 
     def need_thread_support(self, gctransformer, getfn):
-        # we always have thread support, and it is handled
-        # in _declare_functions() already
-        pass
+        gc = gctransformer.gcdata.gc
+        #
+        def gc_thread_start():
+            self.allocate_shadow_stack()
+            gc.setup_thread()
+        #
+        def gc_thread_die():
+            gc.teardown_thread()
+            self.free_shadow_stack()
+        #
+        self.thread_start_ptr = getfn(
+            gc_thread_start,
+            [], annmodel.s_None)
+        self.thread_die_ptr = getfn(
+            gc_thread_die,
+            [], annmodel.s_None)
 
     def setup_root_walker(self):
         self.allocate_shadow_stack()
diff --git a/pypy/translator/stm/src_stm/core.c b/pypy/translator/stm/src_stm/core.c
--- a/pypy/translator/stm/src_stm/core.c
+++ b/pypy/translator/stm/src_stm/core.c
@@ -12,6 +12,7 @@
   /*unsigned long last_known_global_timestamp;*/
   owner_version_t my_lock_word;
   struct OrecList reads;
+  int active;    /* 0 = inactive, 1 = regular, 2 = inevitable */
   unsigned num_commits;
   unsigned num_aborts[ABORT_REASONS];
   unsigned num_spinloops[SPINLOOP_REASONS];
@@ -61,6 +62,7 @@
   unsigned int c;
   int i;
   struct tx_descriptor *d = thread_descriptor;
+  assert(d->active);
   d->num_spinloops[num]++;
 
   //printf("tx_spinloop(%d)\n", num);
@@ -80,7 +82,10 @@
 
 static _Bool is_inevitable(struct tx_descriptor *d)
 {
-  return d->setjmp_buf == NULL;
+  /* Assert that we are running a transaction.
+     Returns True if this transaction is inevitable. */
+  assert(d->active == 1 + !d->setjmp_buf);
+  return d->active == 2;
 }
 
 /*** run the redo log to commit a transaction, and release the locks */
@@ -180,6 +185,7 @@
 {
   d->reads.size = 0;
   redolog_clear(&d->redolog);
+  d->active = 0;
 }
 
 static void tx_cleanup(struct tx_descriptor *d)
@@ -201,7 +207,7 @@
 static void tx_abort(int reason)
 {
   struct tx_descriptor *d = thread_descriptor;
-  assert(!is_inevitable(d));
+  assert(d->active == 1);
   d->num_aborts[reason]++;
 #ifdef RPY_STM_DEBUG_PRINT
   PYPY_DEBUG_START("stm-abort");
@@ -220,7 +226,7 @@
 {
   int i;
   owner_version_t ovt;
-  assert(!is_inevitable(d));
+  assert(d->active == 1);
   for (i=0; i<d->reads.size; i++)
     {
     retry:
@@ -251,7 +257,7 @@
 {
   int i;
   owner_version_t ovt;
-  assert(!is_inevitable(d));
+  assert(d->active == 1);
   for (i=0; i<d->reads.size; i++)
     {
       ovt = GETVERSION(d->reads.items[i]);      // read this orec
@@ -443,10 +449,9 @@
   STM_DO_READ(memcpy(dst, src, size));
 }
 
-static void descriptor_init(void)
+long stm_descriptor_init(void)
 {
-  assert(thread_descriptor == NULL);
-  if (1)
+  if (thread_descriptor == NULL)
     {
       struct tx_descriptor *d = malloc(sizeof(struct tx_descriptor));
       memset(d, 0, sizeof(struct tx_descriptor));
@@ -469,13 +474,17 @@
                 (long)pthread_self(), (long)d->my_lock_word);
       PYPY_DEBUG_STOP("stm-init");
 #endif
+      return 1;
     }
+  else
+    return 0;   /* already initialized */
 }
 
-static void descriptor_done(void)
+void stm_descriptor_done(void)
 {
   struct tx_descriptor *d = thread_descriptor;
   assert(d != NULL);
+  assert(d->active == 0);
 
   thread_descriptor = NULL;
 
@@ -516,6 +525,8 @@
 static void begin_transaction(jmp_buf* buf)
 {
   struct tx_descriptor *d = thread_descriptor;
+  assert(d->active == 0);
+  d->active = 1;
   d->setjmp_buf = buf;
   d->start_time = (/*d->last_known_global_timestamp*/ global_timestamp) & ~1;
 }
@@ -525,6 +536,8 @@
   /* Equivalent to begin_transaction(); stm_try_inevitable();
      except more efficient */
   struct tx_descriptor *d = thread_descriptor;
+  assert(d->active == 0);
+  d->active = 2;
   d->setjmp_buf = NULL;
 
   while (1)
@@ -612,8 +625,10 @@
      by another thread.  We set the lowest bit in global_timestamp
      to 1. */
   struct tx_descriptor *d = thread_descriptor;
-  if (is_inevitable(d))
-    return;  /* I am already inevitable */
+  if (d == NULL || d->active != 1)
+    return;  /* I am already inevitable, or not in a transaction at all
+                (XXX statically we should know when we're outside
+                a transaction) */
 
 #ifdef RPY_STM_DEBUG_PRINT
   PYPY_DEBUG_START("stm-inevitable");
@@ -670,7 +685,6 @@
 
 void stm_set_tls(void *newtls)
 {
-  descriptor_init();
   rpython_tls_object = newtls;
 }
 
@@ -681,7 +695,7 @@
 
 void stm_del_tls(void)
 {
-  descriptor_done();
+  rpython_tls_object = NULL;
 }
 
 void *stm_tldict_lookup(void *key)
@@ -714,6 +728,18 @@
     } REDOLOG_LOOP_END;
 }
 
+long stm_in_transaction(void)
+{
+  struct tx_descriptor *d = thread_descriptor;
+  return d->active;
+}
+
+long stm_is_inevitable(void)
+{
+  struct tx_descriptor *d = thread_descriptor;
+  return is_inevitable(d);
+}
+
 #undef GETVERSION
 #undef GETVERSIONREF
 #undef SETVERSION
diff --git a/pypy/translator/stm/src_stm/et.h b/pypy/translator/stm/src_stm/et.h
--- a/pypy/translator/stm/src_stm/et.h
+++ b/pypy/translator/stm/src_stm/et.h
@@ -21,9 +21,15 @@
 void stm_tldict_add(void *, void *);
 void stm_tldict_enum(void);
 
+long stm_descriptor_init(void);
+void stm_descriptor_done(void);
+
 void stm_begin_inevitable_transaction(void);
 void stm_commit_transaction(void);
 
+long stm_in_transaction(void);
+long stm_is_inevitable(void);
+
 /* these functions are declared by generated C code from pypy.rlib.rstm
    and from the GC (see llop.nop(...)) */
 extern void pypy_g__stm_thread_starting(void);
diff --git a/pypy/translator/stm/stmgcintf.py b/pypy/translator/stm/stmgcintf.py
--- a/pypy/translator/stm/stmgcintf.py
+++ b/pypy/translator/stm/stmgcintf.py
@@ -48,6 +48,8 @@
     # C part of the implementation of the pypy.rlib.rstm module
     in_transaction = smexternal('stm_in_transaction', [], lltype.Signed)
     is_inevitable = smexternal('stm_is_inevitable', [], lltype.Signed)
+    descriptor_init = smexternal('stm_descriptor_init', [], lltype.Signed)
+    descriptor_done = smexternal('stm_descriptor_done', [], lltype.Void)
     begin_inevitable_transaction = smexternal(
         'stm_begin_inevitable_transaction', [], lltype.Void)
     commit_transaction = smexternal(
diff --git a/pypy/translator/stm/test/targetdemo2.py b/pypy/translator/stm/test/targetdemo2.py
--- a/pypy/translator/stm/test/targetdemo2.py
+++ b/pypy/translator/stm/test/targetdemo2.py
@@ -127,7 +127,8 @@
 def setup_threads():
     #space.threadlocals.setup_threads(space)
     bootstrapper.setup()
-    invoke_around_extcall(rstm.before_external_call, rstm.after_external_call)
+    invoke_around_extcall(rstm.before_external_call, rstm.after_external_call,
+                          rstm.enter_callback_call, rstm.leave_callback_call)
 
 def start_thread(args):
     bootstrapper.acquire(args)
diff --git a/pypy/translator/stm/transform.py b/pypy/translator/stm/transform.py
--- a/pypy/translator/stm/transform.py
+++ b/pypy/translator/stm/transform.py
@@ -330,3 +330,6 @@
             for link in block.exits:
                 link.args = [renames.get(v, v) for v in link.args]
         block.operations = newoperations
+
+# XXX must repeat stm_writebarrier after doing something that can
+# go to the next transaction


More information about the pypy-commit mailing list