[pypy-commit] stmgc default: implement stm_stop_all_other_threads() and

Raemi noreply at buildbot.pypy.org
Mon Nov 4 13:19:35 CET 2013


Author: Remi Meier <remi.meier at gmail.com>
Branch: 
Changeset: r543:79aa5685d286
Date: 2013-11-04 13:15 +0100
http://bitbucket.org/pypy/stmgc/changeset/79aa5685d286/

Log:	implement stm_stop_all_other_threads() and
	stm_partial_commit_and_resume_other_threads()

diff --git a/c4/demo_random.c b/c4/demo_random.c
--- a/c4/demo_random.c
+++ b/c4/demo_random.c
@@ -4,6 +4,7 @@
 #include <pthread.h>
 #include <semaphore.h>
 #include <time.h>
+#include <unistd.h>
 
 #include "stmgc.h"
 #include "stmimpl.h"
@@ -442,7 +443,7 @@
 gcptr rare_events(gcptr p, gcptr _r, gcptr _sr)
 {
     check_public_ints();
-    int k = get_rand(100);
+    int k = get_rand(200);
     if (k < 10) {
         push_roots();
         stm_push_root(p);
@@ -464,7 +465,24 @@
         pop_public_int();
         p = NULL;
     }
-    else if (k < 61 && DO_MAJOR_COLLECTS) {
+    else if (k < 61) {
+        push_roots();
+        stm_push_root(p);
+
+        stm_stop_all_other_threads();
+
+        p = stm_pop_root();
+        p = write_barrier(p);
+        stm_push_root(p);
+
+        sleep(0);
+        
+        stm_partial_commit_and_resume_other_threads();
+
+        p = stm_pop_root();
+        pop_roots();
+    }
+    else if (k < 62 && DO_MAJOR_COLLECTS) {
         fprintf(stdout, "major collect\n");
         push_roots();
         stmgcpage_possibly_major_collect(1);
diff --git a/c4/et.c b/c4/et.c
--- a/c4/et.c
+++ b/c4/et.c
@@ -1071,11 +1071,12 @@
   return d->atomic;
 }
 
-static void init_transaction(struct tx_descriptor *d)
+static void init_transaction(struct tx_descriptor *d, int already_locked)
 {
   assert(d->atomic == 0);
   assert(*d->active_ref == 0);
-  stm_start_sharedlock();
+  if (!already_locked)
+    stm_start_sharedlock();
   assert(*d->active_ref == 0);
 
   if (clock_gettime(CLOCK_MONOTONIC, &d->start_real_time) < 0) {
@@ -1096,7 +1097,7 @@
 void stm_begin_transaction(void *buf, void (*longjmp_callback)(void *))
 {
   struct tx_descriptor *d = thread_descriptor;
-  init_transaction(d);
+  init_transaction(d, 0);
   *d->active_ref = 1;
   d->setjmp_buf = buf;
   d->longjmp_callback = longjmp_callback;
@@ -1425,13 +1426,14 @@
   dprintf(("private_from_protected: clear (abort)\n"));
 }
 
-void CommitTransaction(void)
+void CommitTransaction(int stay_inevitable)
 {   /* must save roots around this call */
   revision_t cur_time;
   struct tx_descriptor *d = thread_descriptor;
   assert(*d->active_ref >= 1);
   assert(d->atomic == 0);
-  dprintf(("CommitTransaction(%p)\n", d));
+  dprintf(("CommitTransaction(%d): %p\n", stay_inevitable, d));
+
   spinlock_acquire(d->public_descriptor->collection_lock, 'C');  /*committing*/
   if (d->public_descriptor->stolen_objects.size != 0)
     stm_normalize_stolen_objects(d);
@@ -1445,7 +1447,11 @@
         {
           stm_fatalerror("global_cur_time modified even though we are inev\n");
         }
-      inev_mutex_release();
+
+      if (!stay_inevitable) {
+        /* we simply don't release the mutex. */
+        inev_mutex_release();
+      }
     }
   else
     {
@@ -1503,7 +1509,8 @@
   spinlock_release(d->public_descriptor->collection_lock);
   d->num_commits++;
   *d->active_ref = 0;
-  stm_stop_sharedlock();
+  if (!stay_inevitable)
+    stm_stop_sharedlock();
 
   /* clear the list of callbacks that would have been called
      on abort */
@@ -1568,13 +1575,25 @@
   make_inevitable(d);    /* cannot abort any more */
 }
 
-void BeginInevitableTransaction(void)
+void BeginInevitableTransaction(int already_inevitable)
 {   /* must save roots around this call */
   struct tx_descriptor *d = thread_descriptor;
   revision_t cur_time;
 
-  init_transaction(d);
-  cur_time = acquire_inev_mutex_and_mark_global_cur_time(d);
+  init_transaction(d, already_inevitable);
+  
+  if (already_inevitable) {
+    cur_time = ACCESS_ONCE(global_cur_time);
+    assert((cur_time & 1) == 0);
+    if (!bool_cas(&global_cur_time, cur_time, cur_time + 1)) {
+      stm_fatalerror("there was a commit between a partial inevitable "
+                     "commit and the continuation of the transaction\n");
+    }
+  }
+  else {
+    cur_time = acquire_inev_mutex_and_mark_global_cur_time(d);
+  }
+
   d->start_time = cur_time;
   make_inevitable(d);
 }
diff --git a/c4/et.h b/c4/et.h
--- a/c4/et.h
+++ b/c4/et.h
@@ -123,7 +123,8 @@
 #define ABRT_VALIDATE_INEV        5
 #define ABRT_COLLECT_MINOR        6
 #define ABRT_COLLECT_MAJOR        7
-#define ABORT_REASONS         8
+#define ABRT_OTHER_THREADS        8
+#define ABORT_REASONS         9
 #define ABORT_NAMES      { "MANUAL",            \
                            "COMMIT",            \
                            "STOLEN_MODIFIED",   \
@@ -132,6 +133,7 @@
                            "VALIDATE_INEV",     \
                            "COLLECT_MINOR",     \
                            "COLLECT_MAJOR",     \
+                           "OTHER_THREADS",     \
                          }
 
 #define SPLP_ABORT                0
@@ -207,8 +209,8 @@
 /************************************************************/
 
 
-void BeginInevitableTransaction(void);  /* must save roots around this call */
-void CommitTransaction(void);           /* must save roots around this call */
+void BeginInevitableTransaction(int);  /* must save roots around this call */
+void CommitTransaction(int);           /* must save roots around this call */
 void BecomeInevitable(const char *why); /* must save roots around this call */
 void AbortTransaction(int);
 void AbortTransactionAfterCollect(struct tx_descriptor *, int);
diff --git a/c4/gcpage.c b/c4/gcpage.c
--- a/c4/gcpage.c
+++ b/c4/gcpage.c
@@ -1029,8 +1029,14 @@
     if (ACCESS_ONCE(countdown_next_major_coll) > 0)
         return;
 
-    stm_start_single_thread();
-
+    /* in case we run in single_thread mode already and we are the
+       single thread, we must not try to enter it again.
+       This can happen after manually entering the mode by calling
+       stm_stop_all_other_threads(). */
+    int single_threaded = in_single_thread == thread_descriptor;
+    if (!single_threaded)
+        stm_start_single_thread();
+    
     /* If several threads were blocked on the previous line, the first
        one to proceed sees 0 in 'countdown_next_major_coll'.  It's the
        thread that will do the major collection.  Afterwards the other
@@ -1039,7 +1045,8 @@
     if (countdown_next_major_coll == 0)
         major_collect();
 
-    stm_stop_single_thread();
+    if (!single_threaded)
+        stm_stop_single_thread();
 
     AbortNowIfDelayed();
 }
diff --git a/c4/stmgc.h b/c4/stmgc.h
--- a/c4/stmgc.h
+++ b/c4/stmgc.h
@@ -195,6 +195,14 @@
 /* only user currently is stm_allocate_public_integer_address() */
 void stm_register_integer_address(intptr_t);
 
+/* enter single-threaded mode. Used e.g. when patching assembler
+   code that mustn't be executed in another thread while being
+   patched. This can be used to atomically update non-transactional
+   memory.
+   These calls may collect! */
+void stm_stop_all_other_threads(void);
+void stm_partial_commit_and_resume_other_threads(void);
+
 /* macro functionality */
 
 extern __thread gcptr *stm_shadowstack;
diff --git a/c4/stmsync.c b/c4/stmsync.c
--- a/c4/stmsync.c
+++ b/c4/stmsync.c
@@ -95,7 +95,7 @@
         init_shadowstack();
         stmgcpage_release_global_lock();
     }
-    BeginInevitableTransaction();
+    BeginInevitableTransaction(0);
     return token;
 }
 
@@ -105,7 +105,7 @@
     if (token == 1)
         stmgc_minor_collect();   /* force everything out of the nursery */
 
-    CommitTransaction();
+    CommitTransaction(0);
 
     if (token == 1) {
         stmgcpage_acquire_global_lock();
@@ -140,7 +140,7 @@
     stm_push_root(END_MARKER_OFF);
 
     if (!thread_descriptor->atomic)
-        CommitTransaction();
+        CommitTransaction(0);
 
 #ifdef _GC_ON_CPYTHON
     volatile PyThreadState *v_ts = PyGILState_GetThisThreadState();
@@ -192,7 +192,7 @@
         assert(stm_shadowstack == v_saved_value + 2);
 
         if (!d->atomic)
-            CommitTransaction();
+            CommitTransaction(0);
 
         counter = 0;
     }
@@ -204,7 +204,7 @@
         }
     }
     else {
-        BeginInevitableTransaction();
+        BeginInevitableTransaction(0);
     }
 
     gcptr x = stm_pop_root();   /* pop the END_MARKER */
@@ -221,7 +221,7 @@
         stm_possible_safe_point();
     }
     else {
-        CommitTransaction();
+        CommitTransaction(0);
 
         unsigned long limit = d->reads_size_limit_nonatomic;
         if (limit != 0 && limit < (stm_regular_length_limit >> 1))
@@ -246,7 +246,7 @@
 {   /* must save roots around this call */
     struct tx_descriptor *d = thread_descriptor;
     if (!d->atomic)
-        CommitTransaction();
+        CommitTransaction(0);
     else
         BecomeInevitable("stm_commit_transaction but atomic");
 }
@@ -255,7 +255,7 @@
 {   /* must save roots around this call */
     struct tx_descriptor *d = thread_descriptor;
     if (!d->atomic)
-        BeginInevitableTransaction();
+        BeginInevitableTransaction(0);
 }
 
 void stm_become_inevitable(const char *reason)
@@ -278,7 +278,7 @@
 static pthread_rwlock_t rwlock_shared =
     PTHREAD_RWLOCK_WRITER_NONRECURSIVE_INITIALIZER_NP;
 
-static struct tx_descriptor *in_single_thread = NULL;  /* for debugging */
+struct tx_descriptor *in_single_thread = NULL;
 
 void stm_start_sharedlock(void)
 {
@@ -318,8 +318,45 @@
                        "pthread_rwlock_unlock failure\n");
 }
 
+
+void stm_stop_all_other_threads(void)
+{                               /* push gc roots! */
+    struct tx_descriptor *d;
+
+    BecomeInevitable("stop_all_other_threads");
+    stm_start_single_thread();
+    
+    for (d = stm_tx_head; d; d = d->tx_next) {
+        if (*d->active_ref == 1) // && d != thread_descriptor) <- TRUE
+            AbortTransactionAfterCollect(d, ABRT_OTHER_THREADS);
+    }
+}
+
+
+void stm_partial_commit_and_resume_other_threads(void)
+{                               /* push gc roots! */
+    struct tx_descriptor *d = thread_descriptor;
+    assert(*d->active_ref == 2);
+    int atomic = d->atomic;
+
+    /* Give up atomicity during commit. This still works because
+       we keep the inevitable status, thereby being guaranteed to 
+       commit before all others. */
+    stm_atomic(-atomic);
+
+    /* Commit and start new inevitable transaction while never
+       giving up the inevitable status. */
+    CommitTransaction(1);       /* 1=stay_inevitable! */
+    BeginInevitableTransaction(1);
+
+    /* restore atomic-count */
+    stm_atomic(atomic);
+
+    stm_stop_single_thread();
+}
+
 void stm_start_single_thread(void)
-{
+{                               /* push gc roots! */
     /* Called by the GC, just after a minor collection, when we need to do
        a major collection.  When it returns, it acquired the "write lock"
        which prevents any other thread from running in a transaction.
@@ -336,7 +373,7 @@
 }
 
 void stm_stop_single_thread(void)
-{
+{                               /* push gc roots! */
     /* Warning, may block waiting for rwlock_in_transaction while another
        thread runs a major GC */
     assert(in_single_thread == thread_descriptor);
diff --git a/c4/stmsync.h b/c4/stmsync.h
--- a/c4/stmsync.h
+++ b/c4/stmsync.h
@@ -6,11 +6,14 @@
 void stm_start_sharedlock(void);
 void stm_stop_sharedlock(void);
 
+void stm_stop_all_other_threads(void);
+void stm_partial_commit_and_resume_other_threads(void);
 void stm_start_single_thread(void);
 void stm_stop_single_thread(void);
 
 void stm_possible_safe_point(void);
 
+extern struct tx_descriptor *in_single_thread;
 extern struct GcPtrList stm_prebuilt_gcroots;
 void stm_add_prebuilt_root(gcptr);
 void stm_clear_between_tests(void);


More information about the pypy-commit mailing list