[pypy-commit] stmgc default: Introduce tx_public_descriptor to cope with threads ending

arigo noreply at buildbot.pypy.org
Sat Jun 8 21:25:56 CEST 2013


Author: Armin Rigo <arigo at tunes.org>
Branch: 
Changeset: r79:91fdce635f8f
Date: 2013-06-08 21:25 +0200
http://bitbucket.org/pypy/stmgc/changeset/91fdce635f8f/

Log:	Introduce tx_public_descriptor to cope with threads ending

diff --git a/c4/doc-objects.txt b/c4/doc-objects.txt
--- a/c4/doc-objects.txt
+++ b/c4/doc-objects.txt
@@ -83,8 +83,7 @@
 - the PRN (private revision number): odd, negative, changes for every
   transaction that commits
 
-- list active_backup_copies =
-  [(private converted from protected, backup copy)]
+- list active_backup_copies = [(private, backup copy)]
 
 - dict public_to_private = {public obj: private copy}
 
diff --git a/c4/et.c b/c4/et.c
--- a/c4/et.c
+++ b/c4/et.c
@@ -82,6 +82,13 @@
 
 static void steal(gcptr P)
 {
+  struct tx_public_descriptor *foreign_pd;
+  revision_t target_descriptor_index;
+  revision_t v = ACCESS_ONCE(P->h_revision);
+  if ((v & 3) != 2)
+    return;
+  target_descriptor_index = *(revision_t *)(v & ~(HANDLE_BLOCK_SIZE-1));
+  //foreign_pd = ACCESS_ONCE(stm_descriptor_array[target_descriptor_index]);
   abort();
 }
 
@@ -167,7 +174,7 @@
  old_to_young:;
   revision_t target_descriptor_index;
   target_descriptor_index = *(revision_t *)(v & ~(HANDLE_BLOCK_SIZE-1));
-  if (target_descriptor_index == d->descriptor_index)
+  if (target_descriptor_index == d->public_descriptor_index)
     {
       P = (gcptr)(*(revision_t *)(v - 2));
       assert(!(P->h_tid & GCFLAG_PUBLIC));
@@ -195,7 +202,7 @@
       /* stealing */
       fprintf(stderr, "read_barrier: %p -> stealing %p...", G, (gcptr)v);
       steal(P);
-      abort();
+      goto retry;
     }
 }
 
@@ -331,7 +338,7 @@
       memcpy(B + 1, P + 1, size - sizeof(*B));
     }
   assert(B->h_tid & GCFLAG_BACKUP_COPY);
-  g2l_insert(&d->private_to_backup, P, B);
+  gcptrlist_insert2(&d->public_descriptor->active_backup_copies, P, B);
   P->h_revision = stm_private_rev_num;
   return P;
 }
@@ -394,10 +401,13 @@
 
 gcptr stm_get_backup_copy(gcptr P)
 {
-  struct tx_descriptor *d = thread_descriptor;
-  wlog_t *entry;
-  G2L_FIND(d->private_to_backup, P, entry, return NULL);
-  return entry->val;
+  struct tx_public_descriptor *pd = thread_descriptor->public_descriptor;
+  long i, size = pd->active_backup_copies.size;
+  gcptr *items = pd->active_backup_copies.items;
+  for (i = 0; i < size; i += 2)
+    if (items[i] == P)
+      return items[i + 1];
+  return NULL;
 }
 
 gcptr stm_get_read_obj(long index)
@@ -568,7 +578,7 @@
   }
 
   gcptrlist_clear(&d->list_of_read_objects);
-  g2l_clear(&d->private_to_backup);
+  gcptrlist_clear(&d->public_descriptor->active_backup_copies);
   abort();//stmgc_abort_transaction(d);
 
   fprintf(stderr,
@@ -578,7 +588,7 @@
           "!!!!!!!!!!!!!!!!!!!!!  [%lx] abort %d\n"
           "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n"
           "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n"
-          "\n", (long)d->descriptor_index, num);
+          "\n", (long)d->public_descriptor_index, num);
   if (num != ABRT_MANUAL && d->max_aborts >= 0 && !d->max_aborts--)
     {
       fprintf(stderr, "unexpected abort!\n");
@@ -632,7 +642,7 @@
     d->start_real_time.tv_nsec = -1;
   }
   assert(d->list_of_read_objects.size == 0);
-  assert(!g2l_any_entry(&d->private_to_backup));
+  assert(d->public_descriptor->active_backup_copies.size == 0);
   assert(!g2l_any_entry(&d->public_to_private));
 
   d->count_reads = 1;
@@ -774,7 +784,7 @@
       handle_block = (revision_t *)
         ((((intptr_t)handle_block) + HANDLE_BLOCK_SIZE-1)
          & ~(HANDLE_BLOCK_SIZE-1));
-      handle_block[0] = d->descriptor_index;
+      handle_block[0] = d->public_descriptor_index;
       handle_block[1] = v;
 
       revision_t w = ((revision_t)(handle_block + 1)) + 2;
@@ -826,18 +836,19 @@
 void TurnPrivateWithBackupToProtected(struct tx_descriptor *d,
                                       revision_t cur_time)
 {
-  wlog_t *item;
-  G2L_LOOP_FORWARD(d->private_to_backup, item)
+  long i, size = d->public_descriptor->active_backup_copies.size;
+  gcptr *items = d->public_descriptor->active_backup_copies.items;
+
+  for (i = 0; i < size; i += 2)
     {
-      gcptr P = item->addr;
-      gcptr B = item->val;
+      gcptr P = items[i];
+      gcptr B = items[i + 1];
       assert(P->h_revision == stm_private_rev_num);
       assert(B->h_tid & GCFLAG_BACKUP_COPY);
       B->h_revision = cur_time;
       P->h_revision = (revision_t)B;
-
-    } G2L_LOOP_END;
-  g2l_clear(&d->private_to_backup);
+    };
+  gcptrlist_clear(&d->public_descriptor->active_backup_copies);
 }
 
 void CommitTransaction(void)
@@ -846,7 +857,7 @@
   struct tx_descriptor *d = thread_descriptor;
   assert(d->active >= 1);
 
-  spinlock_acquire(d->collection_lock, 'C');  /* committing */
+  spinlock_acquire(d->public_descriptor->collection_lock, 'C');  /*committing*/
   AcquireLocks(d);
 
   if (is_inevitable(d))
@@ -868,10 +879,10 @@
           if (cur_time & 1)
             {                    // there is another inevitable transaction
               CancelLocks(d);
-              spinlock_release(d->collection_lock);
+              spinlock_release(d->public_descriptor->collection_lock);
               inev_mutex_acquire();   // wait until released
               inev_mutex_release();
-              spinlock_acquire(d->collection_lock, 'C');
+              spinlock_acquire(d->public_descriptor->collection_lock, 'C');
               AcquireLocks(d);
               continue;
             }
@@ -907,7 +918,7 @@
 
   UpdateChainHeads(d, cur_time, localrev);
 
-  spinlock_release(d->collection_lock);
+  spinlock_release(d->public_descriptor->collection_lock);
   d->num_commits++;
   d->active = 0;
   stm_stop_sharedlock();
@@ -948,7 +959,8 @@
                 (XXX statically we should know when we're outside
                 a transaction) */
 
-  fprintf(stderr, "[%lx] inevitable: %s\n", (long)d->descriptor_index, why);
+  fprintf(stderr, "[%lx] inevitable: %s\n",
+          (long)d->public_descriptor_index, why);
 
   cur_time = acquire_inev_mutex_and_mark_global_cur_time();
   if (d->start_time != cur_time)
@@ -1072,8 +1084,8 @@
 
 /************************************************************/
 
-struct tx_descriptor *stm_descriptor_array[MAX_THREADS] = {0};
-static revision_t descriptor_array_next = 0;
+struct tx_public_descriptor *stm_descriptor_array[MAX_THREADS] = {0};
+static revision_t descriptor_array_free_list = 0;
 static revision_t descriptor_array_lock = 0;
 
 int DescriptorInit(void)
@@ -1092,22 +1104,30 @@
       memset(d, 0, sizeof(struct tx_descriptor));
       spinlock_acquire(descriptor_array_lock, 1);
 
-      i = descriptor_array_next;
-      while (stm_descriptor_array[i] != NULL)
-        {
-          i++;
-          if (i == MAX_THREADS)
-            i = 0;
-          if (i == descriptor_array_next)
-            {
+      struct tx_public_descriptor *pd;
+      i = descriptor_array_free_list;
+      pd = stm_descriptor_array[i];
+      if (pd != NULL) {
+          /* we are reusing 'pd' */
+          descriptor_array_free_list = pd->free_list_next;
+          assert(descriptor_array_free_list >= 0);
+      }
+      else {
+          /* no item in the free list */
+          descriptor_array_free_list = i + 1;
+          if (descriptor_array_free_list == MAX_THREADS) {
               fprintf(stderr, "error: too many threads at the same time "
                               "in this process");
               abort();
-            }
-        }
-      descriptor_array_next = i;
-      stm_descriptor_array[i] = d;
-      d->descriptor_index = i;
+          }
+          pd = stm_malloc(sizeof(struct tx_public_descriptor));
+          memset(pd, 0, sizeof(struct tx_public_descriptor));
+          stm_descriptor_array[i] = pd;
+      }
+      pd->free_list_next = -1;
+
+      d->public_descriptor = pd;
+      d->public_descriptor_index = i;
       d->my_lock = LOCKED + 2 * i;
       assert(d->my_lock & 1);
       assert(d->my_lock >= LOCKED);
@@ -1117,7 +1137,7 @@
       thread_descriptor = d;
 
       fprintf(stderr, "[%lx] pthread %lx starting\n",
-              (long)d->descriptor_index, (long)pthread_self());
+              (long)d->public_descriptor_index, (long)pthread_self());
 
       spinlock_release(descriptor_array_lock);
       return 1;
@@ -1128,15 +1148,25 @@
 
 void DescriptorDone(void)
 {
+    revision_t i;
     struct tx_descriptor *d = thread_descriptor;
     assert(d != NULL);
     assert(d->active == 0);
 
-    stm_descriptor_array[d->descriptor_index] = NULL;
+    d->public_descriptor->collection_lock = 0;    /* unlock */
+
+    spinlock_acquire(descriptor_array_lock, 1);
+    i = d->public_descriptor_index;
+    assert(stm_descriptor_array[i] == d->public_descriptor);
+    d->public_descriptor->free_list_next = descriptor_array_free_list;
+    descriptor_array_free_list = i;
+    spinlock_release(descriptor_array_lock);
+
     thread_descriptor = NULL;
 
     g2l_delete(&d->public_to_private);
-    g2l_delete(&d->private_to_backup);
+    assert(d->public_descriptor->active_backup_copies.size == 0);
+    gcptrlist_delete(&d->public_descriptor->active_backup_copies);
     gcptrlist_delete(&d->list_of_read_objects);
     gcptrlist_delete(&d->abortinfo);
     free(d->longest_abort_info);
@@ -1145,7 +1175,6 @@
 #endif
 
     int num_aborts = 0, num_spinloops = 0;
-    int i;
     char line[256], *p = line;
 
     for (i=0; i<ABORT_REASONS; i++)
@@ -1154,7 +1183,7 @@
         num_spinloops += d->num_spinloops[i];
 
     p += sprintf(p, "[%lx] finishing: %d commits, %d aborts ",
-                 (long)d->descriptor_index,
+                 (long)d->public_descriptor_index,
                  d->num_commits,
                  num_aborts);
 
diff --git a/c4/et.h b/c4/et.h
--- a/c4/et.h
+++ b/c4/et.h
@@ -101,12 +101,25 @@
 #define SPLP_LOCKED_COMMIT        3
 #define SPINLOOP_REASONS      4
 
+/* this struct contains thread-local data that may be occasionally
+ * accessed by a foreign thread and that must stay around after the
+ * thread shuts down.  It is reused the next time a thread starts. */
+struct tx_public_descriptor {
+  revision_t collection_lock;
+  struct GcPtrList stolen_objects;
+  struct GcPtrList active_backup_copies;
+  revision_t free_list_next;
+  /* xxx gcpage data here */
+};
+
+/* this struct contains all thread-local data that is never accessed
+ * by a foreign thread */
 struct tx_descriptor {
+  struct tx_public_descriptor *public_descriptor;
+  revision_t public_descriptor_index;
   jmp_buf *setjmp_buf;
   revision_t start_time;
-  revision_t descriptor_index;
   revision_t my_lock;
-  revision_t collection_lock;
   gcptr *shadowstack;
   gcptr **shadowstack_end_ref;
 
@@ -123,7 +136,6 @@
   unsigned int num_spinloops[SPINLOOP_REASONS];
   struct GcPtrList list_of_read_objects;
   struct GcPtrList abortinfo;
-  struct G2L private_to_backup;
   struct G2L public_to_private;
   char *longest_abort_info;
   long long longest_abort_info_time;
@@ -133,43 +145,10 @@
 
 extern __thread struct tx_descriptor *thread_descriptor;
 extern __thread revision_t stm_private_rev_num;
-extern struct tx_descriptor *stm_descriptor_array[];
+extern struct tx_public_descriptor *stm_descriptor_array[];
 
 /************************************************************/
 
-//#define STM_BARRIER_P2R(P)
-//    (__builtin_expect((((gcptr)(P))->h_tid & GCFLAG_GLOBAL) == 0, 1) ?
-//     (P) : (typeof(P))stm_DirectReadBarrier(P))
-
-//#define STM_BARRIER_G2R(G)
-//    (assert(((gcptr)(G))->h_tid & GCFLAG_GLOBAL),
-//     (typeof(G))stm_DirectReadBarrier(G))
-
-//#define STM_BARRIER_O2R(O)
-//    (__builtin_expect((((gcptr)(O))->h_tid & GCFLAG_POSSIBLY_OUTDATED) == 0,
-//                      1) ?
-//     (O) : (typeof(O))stm_RepeatReadBarrier(O))
-
-//#define STM_READ_BARRIER_P_FROM_R(P, R_container, offset)
-//    (__builtin_expect((((gcptr)(P))->h_tid & GCFLAG_GLOBAL) == 0, 1) ?
-//     (P) : (typeof(P))stm_DirectReadBarrierFromR((P),
-//                                              (R_container),
-//                                              offset))
-
-//#define STM_BARRIER_P2W(P)
-//    (__builtin_expect((((gcptr)(P))->h_tid & GCFLAG_NOT_WRITTEN) == 0, 1) ?
-//     (P) : (typeof(P))stm_WriteBarrier(P))
-
-//#define STM_BARRIER_G2W(G)
-//    (assert(((gcptr)(G))->h_tid & GCFLAG_GLOBAL),
-//     (typeof(G))stm_WriteBarrier(G))
-
-//#define STM_BARRIER_R2W(R)
-//    (__builtin_expect((((gcptr)(R))->h_tid & GCFLAG_NOT_WRITTEN) == 0, 1) ?
-//     (R) : (typeof(R))stm_WriteBarrierFromReady(R))
-
-//#define STM_BARRIER_O2W(R)  STM_BARRIER_R2W(R)   /* same logic works here */
-
 
 void BeginTransaction(jmp_buf *);
 void BeginInevitableTransaction(void);  /* must save roots around this call */
diff --git a/c4/test/support.py b/c4/test/support.py
--- a/c4/test/support.py
+++ b/c4/test/support.py
@@ -208,7 +208,7 @@
 
     revision_t get_descriptor_index(void)
     {
-        return thread_descriptor->descriptor_index;
+        return thread_descriptor->public_descriptor_index;
     }
 
     /*gcptr *addr_of_thread_local(void)


More information about the pypy-commit mailing list