[pypy-commit] pypy stm-gc: Progress...

arigo noreply at buildbot.pypy.org
Sun Apr 22 19:28:16 CEST 2012


Author: Armin Rigo <arigo at tunes.org>
Branch: stm-gc
Changeset: r54624:0a022231bea0
Date: 2012-04-22 19:10 +0200
http://bitbucket.org/pypy/pypy/changeset/0a022231bea0/

Log:	Progress...

diff --git a/pypy/rlib/rstm.py b/pypy/rlib/rstm.py
--- a/pypy/rlib/rstm.py
+++ b/pypy/rlib/rstm.py
@@ -171,8 +171,7 @@
     # This logic is in a sub-function because we want to catch
     # the MemoryErrors that could occur.
     transaction = _cast_voidp_to_transaction(transactionptr)
-    ll_assert(transaction._next_transaction is None,
-              "_next_transaction should be cleared by C code")
+    transaction._next_transaction = None
     transaction.retry_counter = retry_counter
     new_transactions = transaction.run()
     return _link_new_transactions(new_transactions)
diff --git a/pypy/rlib/test/test_rstm.py b/pypy/rlib/test/test_rstm.py
--- a/pypy/rlib/test/test_rstm.py
+++ b/pypy/rlib/test/test_rstm.py
@@ -21,8 +21,6 @@
         self._add(initial_transaction_ptr)
         while self._pending:
             r, transactionptr = self._pending.popitem()
-            transaction = self.cast_voidp_to_transaction(transactionptr)
-            transaction._next_transaction = None
             nextptr = rstm._stm_run_transaction(transactionptr, 0)
             next = self.cast_voidp_to_transaction(nextptr)
             while next is not None:
diff --git a/pypy/translator/c/src/main.h b/pypy/translator/c/src/main.h
--- a/pypy/translator/c/src/main.h
+++ b/pypy/translator/c/src/main.h
@@ -15,6 +15,10 @@
 
 #ifndef PYPY_NOT_MAIN_FILE
 
+#ifdef RPY_STM
+#include "src_stm/et.c"
+#endif
+
 #ifndef PYPY_MAIN_FUNCTION
 #define PYPY_MAIN_FUNCTION main
 #endif
diff --git a/pypy/translator/stm/src_stm/core.c b/pypy/translator/stm/src_stm/core.c
--- a/pypy/translator/stm/src_stm/core.c
+++ b/pypy/translator/stm/src_stm/core.c
@@ -26,6 +26,11 @@
 
 /************************************************************/
 
+#define GETVERSION(o)     ((owner_version_t)((o)->h_version))
+#define GETVERSIONREF(o)  ((volatile owner_version_t *)(&(o)->h_version))
+#define SETVERSION(o, v)  (o)->h_version = (void *)(v)
+#define GETTID(o)         ((o)->h_tid)
+
 static unsigned long get_global_timestamp(struct tx_descriptor *d)
 {
   return (/*d->last_known_global_timestamp =*/ global_timestamp);
@@ -94,7 +99,7 @@
       /* unlock the orec */
       volatile orec_t* o = get_orec(globalobj);
       CFENCE;
-      o->version = newver;
+      SETVERSION(o, newver);
     } REDOLOG_LOOP_END;
 }
 
@@ -107,7 +112,7 @@
       if (item->p != -1)
         {
           volatile orec_t* o = get_orec(item->addr);
-          o->version = item->p;
+          SETVERSION(o, item->p);
         }
     } REDOLOG_LOOP_END;
 }
@@ -120,7 +125,7 @@
     {
       volatile orec_t* o = get_orec(item->addr);
       assert(item->p != -1);
-      o->version = item->p;
+      SETVERSION(o, item->p);
       item->p = -1;
     } REDOLOG_LOOP_END;
 }
@@ -137,7 +142,7 @@
       owner_version_t ovt;
 
     retry:
-      ovt = o->version;
+      ovt = GETVERSION(o);
 
       // if orec not locked, lock it
       //
@@ -145,7 +150,7 @@
       // reads.  Since most writes are also reads, we'll just abort under this
       // condition.  This can introduce false conflicts
       if (!IS_LOCKED_OR_NEWER(ovt, d->start_time)) {
-        if (!bool_cas(&o->version, ovt, d->my_lock_word))
+        if (!bool_cas(GETVERSIONREF(o), ovt, d->my_lock_word))
           goto retry;
         // save old version to item->p.  Now we hold the lock.
         item->p = ovt;
@@ -219,7 +224,7 @@
   for (i=0; i<d->reads.size; i++)
     {
     retry:
-      ovt = d->reads.items[i]->version;
+      ovt = GETVERSION(d->reads.items[i]);
       if (IS_LOCKED_OR_NEWER(ovt, d->start_time))
         {
           // If locked, we wait until it becomes unlocked.  The chances are
@@ -249,7 +254,7 @@
   assert(!is_inevitable(d));
   for (i=0; i<d->reads.size; i++)
     {
-      ovt = d->reads.items[i]->version;      // read this orec
+      ovt = GETVERSION(d->reads.items[i]);      // read this orec
       if (IS_LOCKED_OR_NEWER(ovt, d->start_time))
         {
           if (!IS_LOCKED(ovt))
@@ -361,7 +366,7 @@
   else {                                                                \
  retry:                                                                 \
   /* read the orec BEFORE we read anything else */                      \
-  ovt = o->version;                                                     \
+  ovt = GETVERSION(o);                                                  \
   CFENCE;                                                               \
                                                                         \
   /* this tx doesn't hold any locks, so if the lock for this addr is */ \
@@ -384,7 +389,7 @@
                                                                         \
   /* postvalidate AFTER reading addr: */                                \
   CFENCE;                                                               \
-  if (__builtin_expect(o->version != ovt, 0))                           \
+  if (__builtin_expect(GETVERSION(o) != ovt, 0))                        \
     goto retry;       /* oups, try again */                             \
                                                                         \
   oreclist_insert(&d->reads, (orec_t*)o);                               \
@@ -404,13 +409,13 @@
   if (d == NULL)                                                        \
     return *(TYPE *)(((char *)addr) + offset);                          \
                                                                         \
-  if ((o->tid & GCFLAG_WAS_COPIED) != 0)                                \
+  if ((GETTID(o) & GCFLAG_WAS_COPIED) != 0)                             \
     {                                                                   \
       /* Look up in the thread-local dictionary. */                     \
       wlog_t *found;                                                    \
       REDOLOG_FIND(d->redolog, addr, found, goto not_found);            \
       orec_t *localobj = (orec_t *)found->val;                          \
-      assert((localobj->tid & GCFLAG_GLOBAL) == 0);                     \
+      assert((GETTID(localobj) & GCFLAG_GLOBAL) == 0);                  \
       return *(TYPE *)(((char *)localobj) + offset);                    \
                                                                         \
     not_found:;                                                         \
@@ -644,3 +649,72 @@
 {
   tx_abort(7);     /* manual abort */
 }
+
+/************************************************************/
+
+long stm_thread_id(void)
+{
+  struct tx_descriptor *d = thread_descriptor;
+  if (d == NULL)
+    return 0;    /* no thread_descriptor: it's the main thread */
+  return d->my_lock_word;
+}
+
+static __thread void *rpython_tls_object;
+
+void stm_set_tls(void *newtls, long in_main_thread)
+{
+  descriptor_init(in_main_thread);
+  rpython_tls_object = newtls;
+}
+
+void *stm_get_tls(void)
+{
+  return rpython_tls_object;
+}
+
+void stm_del_tls(void)
+{
+  descriptor_done();
+}
+
+void *stm_tldict_lookup(void *key)
+{
+  struct tx_descriptor *d = thread_descriptor;
+  wlog_t* found;
+  REDOLOG_FIND(d->redolog, key, found, goto not_found);
+  return found->val;
+
+ not_found:
+  return NULL;
+}
+
+void stm_tldict_add(void *key, void *value)
+{
+  struct tx_descriptor *d = thread_descriptor;
+  assert(d != NULL);
+  redolog_insert(&d->redolog, key, value);
+}
+
+void stm_tldict_enum(void)
+{
+  struct tx_descriptor *d = thread_descriptor;
+  wlog_t *item;
+  void *tls = stm_get_tls();
+
+  REDOLOG_LOOP_FORWARD(d->redolog, item)
+    {
+      pypy_g__stm_enum_callback(tls, item->addr, item->val);
+    } REDOLOG_LOOP_END;
+}
+
+long stm_in_transaction(void)
+{
+  struct tx_descriptor *d = thread_descriptor;
+  return d != NULL;
+}
+
+#undef GETVERSION
+#undef GETVERSIONREF
+#undef SETVERSION
+#undef GETTID
diff --git a/pypy/translator/stm/src_stm/et.c b/pypy/translator/stm/src_stm/et.c
--- a/pypy/translator/stm/src_stm/et.c
+++ b/pypy/translator/stm/src_stm/et.c
@@ -1,7 +1,5 @@
 /* -*- c-basic-offset: 2 -*- */
 
-#ifndef PYPY_NOT_MAIN_FILE
-
 /* XXX assumes that time never wraps around (in a 'long'), which may be
  * correct on 64-bit machines but not on 32-bit machines if the process
  * runs for long enough.
@@ -40,10 +38,7 @@
 /* This is the same as the object header structure HDR
  * declared in stmgc.py */
 
-typedef struct {
-  long tid;
-  long version;
-} orec_t;
+typedef struct pypy_header0 orec_t;
 
 /************************************************************/
 
@@ -62,5 +57,3 @@
 #include "src_stm/rpyintf.c"
 
 /************************************************************/
-
-#endif  /* PYPY_NOT_MAIN_FILE */
diff --git a/pypy/translator/stm/src_stm/et.h b/pypy/translator/stm/src_stm/et.h
--- a/pypy/translator/stm/src_stm/et.h
+++ b/pypy/translator/stm/src_stm/et.h
@@ -25,7 +25,11 @@
 void stm_tldict_add(void *, void *);
 void stm_tldict_enum(void);
 
-/* these functions are declared by generated C code from the GC */
+/* these functions are declared by generated C code from pypy.rlib.rstm
+   and from the GC (see llop.nop(...)) */
+extern void pypy_g__stm_thread_starting(void);
+extern void pypy_g__stm_thread_stopping(void);
+extern void *_stm_run_transaction(void *, long);
 extern long pypy_g__stm_getsize(void *);
 extern void pypy_g__stm_enum_callback(void *, void *, void *);
 
diff --git a/pypy/translator/stm/src_stm/fifo.c b/pypy/translator/stm/src_stm/fifo.c
new file mode 100644
--- /dev/null
+++ b/pypy/translator/stm/src_stm/fifo.c
@@ -0,0 +1,70 @@
+/* -*- c-basic-offset: 2 -*- */
+
+
+/* xxx Direct access to this field.  Relies on genc producing always the
+   same names, but that should be ok. */
+#define NEXT(item)  (((struct pypy_pypy_rlib_rstm_Transaction0 *)(item)) \
+                     ->t_inst__next_transaction)
+
+
+typedef struct {
+    void *first;
+    void *last;
+} stm_fifo_t;
+
+
+static void fifo_init(stm_fifo_t *fifo)
+{
+  fifo->first = NULL;
+  fifo->last = NULL;
+}
+
+static void *fifo_next(void *item)
+{
+  return NEXT(item);
+}
+
+static void fifo_append(stm_fifo_t *fifo, void *newitem)
+{
+  NEXT(newitem) = NULL;
+  if (fifo->last == NULL)
+    fifo->first = newitem;
+  else
+    NEXT(fifo->last) = newitem;
+  fifo->last = newitem;
+}
+
+static bool_t fifo_is_empty(stm_fifo_t *fifo)
+{
+  assert((fifo->first == NULL) == (fifo->last == NULL));
+  return (fifo->first == NULL);
+}
+
+/* static bool_t fifo_is_of_length_1(stm_fifo_t *fifo) */
+/* { */
+/*   return (fifo->first != NULL && fifo->first == fifo->last); */
+/* } */
+
+static void *fifo_popleft(stm_fifo_t *fifo)
+{
+  void *item = fifo->first;
+  fifo->first = NEXT(item);
+  if (fifo->first == NULL)
+    fifo->last = NULL;
+  return item;
+}
+
+static void fifo_extend(stm_fifo_t *fifo, void *newitems)
+{
+  if (fifo->last == NULL)
+    fifo->first = newitems;
+  else
+    NEXT(fifo->last) = newitems;
+
+  while (NEXT(newitems) != NULL)
+    newitems = NEXT(newitems);
+
+  fifo->last = newitems;
+}
+
+#undef NEXT
diff --git a/pypy/translator/stm/src_stm/rpyintf.c b/pypy/translator/stm/src_stm/rpyintf.c
--- a/pypy/translator/stm/src_stm/rpyintf.c
+++ b/pypy/translator/stm/src_stm/rpyintf.c
@@ -1,68 +1,7 @@
 /* -*- c-basic-offset: 2 -*- */
 
-long stm_thread_id(void)
-{
-  struct tx_descriptor *d = thread_descriptor;
-  if (d == NULL)
-    return 0;    /* no thread_descriptor: it's the main thread */
-  return d->my_lock_word;
-}
+#include "src_stm/fifo.c"
 
-static __thread void *rpython_tls_object;
-
-void stm_set_tls(void *newtls, long in_main_thread)
-{
-  descriptor_init(in_main_thread);
-  rpython_tls_object = newtls;
-}
-
-void *stm_get_tls(void)
-{
-  return rpython_tls_object;
-}
-
-void stm_del_tls(void)
-{
-  descriptor_done();
-}
-
-void *stm_tldict_lookup(void *key)
-{
-  struct tx_descriptor *d = thread_descriptor;
-  wlog_t* found;
-  REDOLOG_FIND(d->redolog, key, found, goto not_found);
-  return found->val;
-
- not_found:
-  return NULL;
-}
-
-void stm_tldict_add(void *key, void *value)
-{
-  struct tx_descriptor *d = thread_descriptor;
-  assert(d != NULL);
-  redolog_insert(&d->redolog, key, value);
-}
-
-void stm_tldict_enum(void)
-{
-  struct tx_descriptor *d = thread_descriptor;
-  wlog_t *item;
-  void *tls = stm_get_tls();
-
-  REDOLOG_LOOP_FORWARD(d->redolog, item)
-    {
-      pypy_g__stm_enum_callback(tls, item->addr, item->val);
-    } REDOLOG_LOOP_END;
-}
-
-long stm_in_transaction(void)
-{
-  struct tx_descriptor *d = thread_descriptor;
-  return d != NULL;
-}
-
-/************************************************************/
 
 /* this lock is acquired when we start running transactions, and
    released only when we are finished. */
@@ -72,19 +11,107 @@
    data in run_thread(). */
 static pthread_mutex_t mutex_global = PTHREAD_MUTEX_INITIALIZER;
 
+/* this lock is acquired if and only if there are no tasks pending,
+   i.e. the linked list stm_g_first_transaction ... stm_g_last_transaction is
+   empty and both pointers are NULL. */
+static pthread_mutex_t mutex_no_tasks_pending = PTHREAD_MUTEX_INITIALIZER;
+
 /* some global data put there by run_all_transactions(). */
-static void *g_first_transaction, *g_last_transaction;
-static int g_num_threads, g_num_waiting_threads;
+static stm_fifo_t stm_g_pending;
+static int stm_g_num_threads, stm_g_num_waiting_threads, stm_g_finished;
+
+
+static void* perform_transaction(void *transaction)
+{
+  void *new_transaction_list;
+  jmp_buf _jmpbuf;
+  long counter;
+  volatile long v_counter = 0;
+
+  setjmp(_jmpbuf);
+
+  begin_transaction(&_jmpbuf);
+
+  counter = v_counter;
+  v_counter = counter + 1;
+
+  new_transaction_list = pypy_g__stm_run_transaction(transaction, counter);
+
+  commit_transaction();
+
+  return new_transaction_list;
+}
+
+static void add_list(void *new_transaction_list)
+{
+  bool_t was_empty;
+
+  if (new_transaction_list == NULL)
+    return;
+
+  was_empty = fifo_is_empty(&stm_g_pending);
+  fifo_extend(&stm_g_pending, new_transaction_list);
+  if (was_empty)
+    pthread_mutex_unlock(&mutex_no_tasks_pending);
+}
+
 
 /* the main function running a thread */
 static void *run_thread(void *ignored)
 {
   pthread_mutex_lock(&mutex_global);
+  pypy_g__stm_thread_starting();
 
-  g_num_waiting_threads++;
-  if (g_num_waiting_threads == g_num_threads)
+  while (1)
+    {
+      if (fifo_is_empty(&stm_g_pending))
+        {
+          stm_g_num_waiting_threads += 1;
+          if (stm_g_num_waiting_threads == stm_g_num_threads)
+            {
+              stm_g_finished = 1;
+              pthread_mutex_unlock(&mutex_no_tasks_pending);
+            }
+          pthread_mutex_unlock(&mutex_global);
+
+          pthread_mutex_lock(&mutex_no_tasks_pending);
+          pthread_mutex_unlock(&mutex_no_tasks_pending);
+
+          pthread_mutex_lock(&mutex_global);
+          stm_g_num_waiting_threads -= 1;
+          if (stm_g_finished)
+            break;
+        }
+      else
+        {
+          void *new_transaction_list;
+          void *transaction = fifo_popleft(&stm_g_pending);
+          if (fifo_is_empty(&stm_g_pending))
+            pthread_mutex_lock(&mutex_no_tasks_pending);
+          pthread_mutex_unlock(&mutex_global);
+
+          while (1)
+            {
+              new_transaction_list = perform_transaction(transaction);
+              
+              /* for now, always break out of this loop,
+                 unless 'new_transaction_list' contains precisely one item */
+              if (new_transaction_list == NULL)
+                break;
+              if (fifo_next(new_transaction_list) != NULL)
+                break;
+
+              transaction = new_transaction_list;   /* single element */
+            }
+
+          pthread_mutex_lock(&mutex_global);
+          add_list(new_transaction_list);
+        }
+    }
+
+  pypy_g__stm_thread_stopping();
+  if (stm_g_num_waiting_threads == 0)   /* only the last thread to leave */
     pthread_mutex_unlock(&mutex_unfinished);
-
   pthread_mutex_unlock(&mutex_global);
   return NULL;
 }
@@ -93,10 +120,11 @@
                               long num_threads)
 {
   int i;
-  g_first_transaction = initial_transaction;
-  g_last_transaction = initial_transaction;
-  g_num_threads = (int)num_threads;
-  g_num_waiting_threads = 0;
+  fifo_init(&stm_g_pending);
+  fifo_append(&stm_g_pending, initial_transaction);
+  stm_g_num_threads = (int)num_threads;
+  stm_g_num_waiting_threads = 0;
+  stm_g_finished = 0;
 
   pthread_mutex_lock(&mutex_unfinished);
 
@@ -104,7 +132,12 @@
     {
       pthread_t th;
       int status = pthread_create(&th, NULL, run_thread, NULL);
-      assert(status == 0);
+      if (status != 0)
+        {
+          /* XXX turn into a nice exception */
+          fprintf(stderr, "fatal error: cannot create threads\n");
+          exit(1);
+        }
       pthread_detach(th);
     }
 
diff --git a/pypy/translator/stm/stmgcintf.py b/pypy/translator/stm/stmgcintf.py
--- a/pypy/translator/stm/stmgcintf.py
+++ b/pypy/translator/stm/stmgcintf.py
@@ -10,7 +10,7 @@
 
 eci = ExternalCompilationInfo(
     include_dirs = [cdir, cdir2],
-    includes = ['src_stm/et.h', 'src_stm/et.c'],
+    includes = ['src_stm/et.h'],
     pre_include_bits = ['#define PYPY_LONG_BIT %d' % LONG_BIT,
                         '#define RPY_STM 1'],
     separate_module_sources = ['\n'],    # hack for test_rffi_stm
diff --git a/pypy/translator/stm/test/targetdemo.py b/pypy/translator/stm/test/targetdemo.py
--- a/pypy/translator/stm/test/targetdemo.py
+++ b/pypy/translator/stm/test/targetdemo.py
@@ -66,7 +66,7 @@
         ll_assert(res is self.arg, "ERROR: bogus pointer equality")
         raw1 = rffi.cast(rffi.CCHARP, self.retry_counter)
         raw2 = rffi.cast(rffi.CCHARP, -1)
-        ll_assert(raw1 == raw2, "ERROR: retry_counter == -1")
+        ll_assert(raw1 != raw2, "ERROR: retry_counter == -1")
 
 class MakeChain(rstm.Transaction):
     def __init__(self, anchor, value):


More information about the pypy-commit mailing list