[pypy-commit] pypy stmgc-c4: import stmgc with stop_all_other_threads() and partial commit for inevitable transactions

Raemi noreply at buildbot.pypy.org
Mon Nov 4 13:20:32 CET 2013


Author: Remi Meier <remi.meier at gmail.com>
Branch: stmgc-c4
Changeset: r67829:7384b3c8c0fc
Date: 2013-11-04 13:16 +0100
http://bitbucket.org/pypy/pypy/changeset/7384b3c8c0fc/

Log:	import stmgc with stop_all_other_threads() and partial commit for
	inevitable transactions

diff --git a/rpython/translator/stm/src_stm/et.c b/rpython/translator/stm/src_stm/et.c
--- a/rpython/translator/stm/src_stm/et.c
+++ b/rpython/translator/stm/src_stm/et.c
@@ -1072,11 +1072,12 @@
   return d->atomic;
 }
 
-static void init_transaction(struct tx_descriptor *d)
+static void init_transaction(struct tx_descriptor *d, int already_locked)
 {
   assert(d->atomic == 0);
   assert(*d->active_ref == 0);
-  stm_start_sharedlock();
+  if (!already_locked)
+    stm_start_sharedlock();
   assert(*d->active_ref == 0);
 
   if (clock_gettime(CLOCK_MONOTONIC, &d->start_real_time) < 0) {
@@ -1097,7 +1098,7 @@
 void stm_begin_transaction(void *buf, void (*longjmp_callback)(void *))
 {
   struct tx_descriptor *d = thread_descriptor;
-  init_transaction(d);
+  init_transaction(d, 0);
   *d->active_ref = 1;
   d->setjmp_buf = buf;
   d->longjmp_callback = longjmp_callback;
@@ -1426,13 +1427,14 @@
   dprintf(("private_from_protected: clear (abort)\n"));
 }
 
-void CommitTransaction(void)
+void CommitTransaction(int stay_inevitable)
 {   /* must save roots around this call */
   revision_t cur_time;
   struct tx_descriptor *d = thread_descriptor;
   assert(*d->active_ref >= 1);
   assert(d->atomic == 0);
-  dprintf(("CommitTransaction(%p)\n", d));
+  dprintf(("CommitTransaction(%d): %p\n", stay_inevitable, d));
+
   spinlock_acquire(d->public_descriptor->collection_lock, 'C');  /*committing*/
   if (d->public_descriptor->stolen_objects.size != 0)
     stm_normalize_stolen_objects(d);
@@ -1446,7 +1448,11 @@
         {
           stm_fatalerror("global_cur_time modified even though we are inev\n");
         }
-      inev_mutex_release();
+
+      if (!stay_inevitable) {
+        /* we simply don't release the mutex. */
+        inev_mutex_release();
+      }
     }
   else
     {
@@ -1504,7 +1510,8 @@
   spinlock_release(d->public_descriptor->collection_lock);
   d->num_commits++;
   *d->active_ref = 0;
-  stm_stop_sharedlock();
+  if (!stay_inevitable)
+    stm_stop_sharedlock();
 
   /* clear the list of callbacks that would have been called
      on abort */
@@ -1569,13 +1576,25 @@
   make_inevitable(d);    /* cannot abort any more */
 }
 
-void BeginInevitableTransaction(void)
+void BeginInevitableTransaction(int already_inevitable)
 {   /* must save roots around this call */
   struct tx_descriptor *d = thread_descriptor;
   revision_t cur_time;
 
-  init_transaction(d);
-  cur_time = acquire_inev_mutex_and_mark_global_cur_time(d);
+  init_transaction(d, already_inevitable);
+  
+  if (already_inevitable) {
+    cur_time = ACCESS_ONCE(global_cur_time);
+    assert((cur_time & 1) == 0);
+    if (!bool_cas(&global_cur_time, cur_time, cur_time + 1)) {
+      stm_fatalerror("there was a commit between a partial inevitable "
+                     "commit and the continuation of the transaction\n");
+    }
+  }
+  else {
+    cur_time = acquire_inev_mutex_and_mark_global_cur_time(d);
+  }
+
   d->start_time = cur_time;
   make_inevitable(d);
 }
diff --git a/rpython/translator/stm/src_stm/et.h b/rpython/translator/stm/src_stm/et.h
--- a/rpython/translator/stm/src_stm/et.h
+++ b/rpython/translator/stm/src_stm/et.h
@@ -124,7 +124,8 @@
 #define ABRT_VALIDATE_INEV        5
 #define ABRT_COLLECT_MINOR        6
 #define ABRT_COLLECT_MAJOR        7
-#define ABORT_REASONS         8
+#define ABRT_OTHER_THREADS        8
+#define ABORT_REASONS         9
 #define ABORT_NAMES      { "MANUAL",            \
                            "COMMIT",            \
                            "STOLEN_MODIFIED",   \
@@ -133,6 +134,7 @@
                            "VALIDATE_INEV",     \
                            "COLLECT_MINOR",     \
                            "COLLECT_MAJOR",     \
+                           "OTHER_THREADS",     \
                          }
 
 #define SPLP_ABORT                0
@@ -208,8 +210,8 @@
 /************************************************************/
 
 
-void BeginInevitableTransaction(void);  /* must save roots around this call */
-void CommitTransaction(void);           /* must save roots around this call */
+void BeginInevitableTransaction(int);  /* must save roots around this call */
+void CommitTransaction(int);           /* must save roots around this call */
 void BecomeInevitable(const char *why); /* must save roots around this call */
 void AbortTransaction(int);
 void AbortTransactionAfterCollect(struct tx_descriptor *, int);
diff --git a/rpython/translator/stm/src_stm/gcpage.c b/rpython/translator/stm/src_stm/gcpage.c
--- a/rpython/translator/stm/src_stm/gcpage.c
+++ b/rpython/translator/stm/src_stm/gcpage.c
@@ -1030,8 +1030,14 @@
     if (ACCESS_ONCE(countdown_next_major_coll) > 0)
         return;
 
-    stm_start_single_thread();
-
+    /* in case we run in single_thread mode already and we are the
+       single thread, we must not try to enter it again.
+       This can happen after manually entering the mode by calling
+       stm_stop_all_other_threads(). */
+    int single_threaded = in_single_thread == thread_descriptor;
+    if (!single_threaded)
+        stm_start_single_thread();
+    
     /* If several threads were blocked on the previous line, the first
        one to proceed sees 0 in 'countdown_next_major_coll'.  It's the
        thread that will do the major collection.  Afterwards the other
@@ -1040,7 +1046,8 @@
     if (countdown_next_major_coll == 0)
         major_collect();
 
-    stm_stop_single_thread();
+    if (!single_threaded)
+        stm_stop_single_thread();
 
     AbortNowIfDelayed();
 }
diff --git a/rpython/translator/stm/src_stm/revision b/rpython/translator/stm/src_stm/revision
--- a/rpython/translator/stm/src_stm/revision
+++ b/rpython/translator/stm/src_stm/revision
@@ -1,1 +1,1 @@
-89a1de501060
+79aa5685d286
diff --git a/rpython/translator/stm/src_stm/stmgc.h b/rpython/translator/stm/src_stm/stmgc.h
--- a/rpython/translator/stm/src_stm/stmgc.h
+++ b/rpython/translator/stm/src_stm/stmgc.h
@@ -196,6 +196,14 @@
 /* only user currently is stm_allocate_public_integer_address() */
 void stm_register_integer_address(intptr_t);
 
+/* enter single-threaded mode. Used e.g. when patching assembler
+   code that mustn't be executed in another thread while being
+   patched. This can be used to atomically update non-transactional
+   memory.
+   These calls may collect! */
+void stm_stop_all_other_threads(void);
+void stm_partial_commit_and_resume_other_threads(void);
+
 /* macro functionality */
 
 extern __thread gcptr *stm_shadowstack;
diff --git a/rpython/translator/stm/src_stm/stmsync.c b/rpython/translator/stm/src_stm/stmsync.c
--- a/rpython/translator/stm/src_stm/stmsync.c
+++ b/rpython/translator/stm/src_stm/stmsync.c
@@ -96,7 +96,7 @@
         init_shadowstack();
         stmgcpage_release_global_lock();
     }
-    BeginInevitableTransaction();
+    BeginInevitableTransaction(0);
     return token;
 }
 
@@ -106,7 +106,7 @@
     if (token == 1)
         stmgc_minor_collect();   /* force everything out of the nursery */
 
-    CommitTransaction();
+    CommitTransaction(0);
 
     if (token == 1) {
         stmgcpage_acquire_global_lock();
@@ -141,7 +141,7 @@
     stm_push_root(END_MARKER_OFF);
 
     if (!thread_descriptor->atomic)
-        CommitTransaction();
+        CommitTransaction(0);
 
 #ifdef _GC_ON_CPYTHON
     volatile PyThreadState *v_ts = PyGILState_GetThisThreadState();
@@ -193,7 +193,7 @@
         assert(stm_shadowstack == v_saved_value + 2);
 
         if (!d->atomic)
-            CommitTransaction();
+            CommitTransaction(0);
 
         counter = 0;
     }
@@ -205,7 +205,7 @@
         }
     }
     else {
-        BeginInevitableTransaction();
+        BeginInevitableTransaction(0);
     }
 
     gcptr x = stm_pop_root();   /* pop the END_MARKER */
@@ -222,7 +222,7 @@
         stm_possible_safe_point();
     }
     else {
-        CommitTransaction();
+        CommitTransaction(0);
 
         unsigned long limit = d->reads_size_limit_nonatomic;
         if (limit != 0 && limit < (stm_regular_length_limit >> 1))
@@ -247,7 +247,7 @@
 {   /* must save roots around this call */
     struct tx_descriptor *d = thread_descriptor;
     if (!d->atomic)
-        CommitTransaction();
+        CommitTransaction(0);
     else
         BecomeInevitable("stm_commit_transaction but atomic");
 }
@@ -256,7 +256,7 @@
 {   /* must save roots around this call */
     struct tx_descriptor *d = thread_descriptor;
     if (!d->atomic)
-        BeginInevitableTransaction();
+        BeginInevitableTransaction(0);
 }
 
 void stm_become_inevitable(const char *reason)
@@ -279,7 +279,7 @@
 static pthread_rwlock_t rwlock_shared =
     PTHREAD_RWLOCK_WRITER_NONRECURSIVE_INITIALIZER_NP;
 
-static struct tx_descriptor *in_single_thread = NULL;  /* for debugging */
+struct tx_descriptor *in_single_thread = NULL;
 
 void stm_start_sharedlock(void)
 {
@@ -319,8 +319,45 @@
                        "pthread_rwlock_unlock failure\n");
 }
 
+
+void stm_stop_all_other_threads(void)
+{                               /* push gc roots! */
+    struct tx_descriptor *d;
+
+    BecomeInevitable("stop_all_other_threads");
+    stm_start_single_thread();
+    
+    for (d = stm_tx_head; d; d = d->tx_next) {
+        if (*d->active_ref == 1) // && d != thread_descriptor) <- TRUE
+            AbortTransactionAfterCollect(d, ABRT_OTHER_THREADS);
+    }
+}
+
+
+void stm_partial_commit_and_resume_other_threads(void)
+{                               /* push gc roots! */
+    struct tx_descriptor *d = thread_descriptor;
+    assert(*d->active_ref == 2);
+    int atomic = d->atomic;
+
+    /* Give up atomicity during commit. This still works because
+       we keep the inevitable status, thereby being guaranteed to 
+       commit before all others. */
+    stm_atomic(-atomic);
+
+    /* Commit and start new inevitable transaction while never
+       giving up the inevitable status. */
+    CommitTransaction(1);       /* 1=stay_inevitable! */
+    BeginInevitableTransaction(1);
+
+    /* restore atomic-count */
+    stm_atomic(atomic);
+
+    stm_stop_single_thread();
+}
+
 void stm_start_single_thread(void)
-{
+{                               /* push gc roots! */
     /* Called by the GC, just after a minor collection, when we need to do
        a major collection.  When it returns, it acquired the "write lock"
        which prevents any other thread from running in a transaction.
@@ -337,7 +374,7 @@
 }
 
 void stm_stop_single_thread(void)
-{
+{                               /* push gc roots! */
     /* Warning, may block waiting for rwlock_in_transaction while another
        thread runs a major GC */
     assert(in_single_thread == thread_descriptor);
diff --git a/rpython/translator/stm/src_stm/stmsync.h b/rpython/translator/stm/src_stm/stmsync.h
--- a/rpython/translator/stm/src_stm/stmsync.h
+++ b/rpython/translator/stm/src_stm/stmsync.h
@@ -7,11 +7,14 @@
 void stm_start_sharedlock(void);
 void stm_stop_sharedlock(void);
 
+void stm_stop_all_other_threads(void);
+void stm_partial_commit_and_resume_other_threads(void);
 void stm_start_single_thread(void);
 void stm_stop_single_thread(void);
 
 void stm_possible_safe_point(void);
 
+extern struct tx_descriptor *in_single_thread;
 extern struct GcPtrList stm_prebuilt_gcroots;
 void stm_add_prebuilt_root(gcptr);
 void stm_clear_between_tests(void);


More information about the pypy-commit mailing list