[pypy-commit] pypy stm-gc: Progress...
arigo
noreply at buildbot.pypy.org
Sun Apr 22 19:28:16 CEST 2012
Author: Armin Rigo <arigo at tunes.org>
Branch: stm-gc
Changeset: r54624:0a022231bea0
Date: 2012-04-22 19:10 +0200
http://bitbucket.org/pypy/pypy/changeset/0a022231bea0/
Log: Progress...
diff --git a/pypy/rlib/rstm.py b/pypy/rlib/rstm.py
--- a/pypy/rlib/rstm.py
+++ b/pypy/rlib/rstm.py
@@ -171,8 +171,7 @@
# This logic is in a sub-function because we want to catch
# the MemoryErrors that could occur.
transaction = _cast_voidp_to_transaction(transactionptr)
- ll_assert(transaction._next_transaction is None,
- "_next_transaction should be cleared by C code")
+ transaction._next_transaction = None
transaction.retry_counter = retry_counter
new_transactions = transaction.run()
return _link_new_transactions(new_transactions)
diff --git a/pypy/rlib/test/test_rstm.py b/pypy/rlib/test/test_rstm.py
--- a/pypy/rlib/test/test_rstm.py
+++ b/pypy/rlib/test/test_rstm.py
@@ -21,8 +21,6 @@
self._add(initial_transaction_ptr)
while self._pending:
r, transactionptr = self._pending.popitem()
- transaction = self.cast_voidp_to_transaction(transactionptr)
- transaction._next_transaction = None
nextptr = rstm._stm_run_transaction(transactionptr, 0)
next = self.cast_voidp_to_transaction(nextptr)
while next is not None:
diff --git a/pypy/translator/c/src/main.h b/pypy/translator/c/src/main.h
--- a/pypy/translator/c/src/main.h
+++ b/pypy/translator/c/src/main.h
@@ -15,6 +15,10 @@
#ifndef PYPY_NOT_MAIN_FILE
+#ifdef RPY_STM
+#include "src_stm/et.c"
+#endif
+
#ifndef PYPY_MAIN_FUNCTION
#define PYPY_MAIN_FUNCTION main
#endif
diff --git a/pypy/translator/stm/src_stm/core.c b/pypy/translator/stm/src_stm/core.c
--- a/pypy/translator/stm/src_stm/core.c
+++ b/pypy/translator/stm/src_stm/core.c
@@ -26,6 +26,11 @@
/************************************************************/
+#define GETVERSION(o) ((owner_version_t)((o)->h_version))
+#define GETVERSIONREF(o) ((volatile owner_version_t *)(&(o)->h_version))
+#define SETVERSION(o, v) (o)->h_version = (void *)(v)
+#define GETTID(o) ((o)->h_tid)
+
static unsigned long get_global_timestamp(struct tx_descriptor *d)
{
return (/*d->last_known_global_timestamp =*/ global_timestamp);
@@ -94,7 +99,7 @@
/* unlock the orec */
volatile orec_t* o = get_orec(globalobj);
CFENCE;
- o->version = newver;
+ SETVERSION(o, newver);
} REDOLOG_LOOP_END;
}
@@ -107,7 +112,7 @@
if (item->p != -1)
{
volatile orec_t* o = get_orec(item->addr);
- o->version = item->p;
+ SETVERSION(o, item->p);
}
} REDOLOG_LOOP_END;
}
@@ -120,7 +125,7 @@
{
volatile orec_t* o = get_orec(item->addr);
assert(item->p != -1);
- o->version = item->p;
+ SETVERSION(o, item->p);
item->p = -1;
} REDOLOG_LOOP_END;
}
@@ -137,7 +142,7 @@
owner_version_t ovt;
retry:
- ovt = o->version;
+ ovt = GETVERSION(o);
// if orec not locked, lock it
//
@@ -145,7 +150,7 @@
// reads. Since most writes are also reads, we'll just abort under this
// condition. This can introduce false conflicts
if (!IS_LOCKED_OR_NEWER(ovt, d->start_time)) {
- if (!bool_cas(&o->version, ovt, d->my_lock_word))
+ if (!bool_cas(GETVERSIONREF(o), ovt, d->my_lock_word))
goto retry;
// save old version to item->p. Now we hold the lock.
item->p = ovt;
@@ -219,7 +224,7 @@
for (i=0; i<d->reads.size; i++)
{
retry:
- ovt = d->reads.items[i]->version;
+ ovt = GETVERSION(d->reads.items[i]);
if (IS_LOCKED_OR_NEWER(ovt, d->start_time))
{
// If locked, we wait until it becomes unlocked. The chances are
@@ -249,7 +254,7 @@
assert(!is_inevitable(d));
for (i=0; i<d->reads.size; i++)
{
- ovt = d->reads.items[i]->version; // read this orec
+ ovt = GETVERSION(d->reads.items[i]); // read this orec
if (IS_LOCKED_OR_NEWER(ovt, d->start_time))
{
if (!IS_LOCKED(ovt))
@@ -361,7 +366,7 @@
else { \
retry: \
/* read the orec BEFORE we read anything else */ \
- ovt = o->version; \
+ ovt = GETVERSION(o); \
CFENCE; \
\
/* this tx doesn't hold any locks, so if the lock for this addr is */ \
@@ -384,7 +389,7 @@
\
/* postvalidate AFTER reading addr: */ \
CFENCE; \
- if (__builtin_expect(o->version != ovt, 0)) \
+ if (__builtin_expect(GETVERSION(o) != ovt, 0)) \
goto retry; /* oups, try again */ \
\
oreclist_insert(&d->reads, (orec_t*)o); \
@@ -404,13 +409,13 @@
if (d == NULL) \
return *(TYPE *)(((char *)addr) + offset); \
\
- if ((o->tid & GCFLAG_WAS_COPIED) != 0) \
+ if ((GETTID(o) & GCFLAG_WAS_COPIED) != 0) \
{ \
/* Look up in the thread-local dictionary. */ \
wlog_t *found; \
REDOLOG_FIND(d->redolog, addr, found, goto not_found); \
orec_t *localobj = (orec_t *)found->val; \
- assert((localobj->tid & GCFLAG_GLOBAL) == 0); \
+ assert((GETTID(localobj) & GCFLAG_GLOBAL) == 0); \
return *(TYPE *)(((char *)localobj) + offset); \
\
not_found:; \
@@ -644,3 +649,72 @@
{
tx_abort(7); /* manual abort */
}
+
+/************************************************************/
+
+long stm_thread_id(void)
+{
+ struct tx_descriptor *d = thread_descriptor;
+ if (d == NULL)
+ return 0; /* no thread_descriptor: it's the main thread */
+ return d->my_lock_word;
+}
+
+static __thread void *rpython_tls_object;
+
+void stm_set_tls(void *newtls, long in_main_thread)
+{
+ descriptor_init(in_main_thread);
+ rpython_tls_object = newtls;
+}
+
+void *stm_get_tls(void)
+{
+ return rpython_tls_object;
+}
+
+void stm_del_tls(void)
+{
+ descriptor_done();
+}
+
+void *stm_tldict_lookup(void *key)
+{
+ struct tx_descriptor *d = thread_descriptor;
+ wlog_t* found;
+ REDOLOG_FIND(d->redolog, key, found, goto not_found);
+ return found->val;
+
+ not_found:
+ return NULL;
+}
+
+void stm_tldict_add(void *key, void *value)
+{
+ struct tx_descriptor *d = thread_descriptor;
+ assert(d != NULL);
+ redolog_insert(&d->redolog, key, value);
+}
+
+void stm_tldict_enum(void)
+{
+ struct tx_descriptor *d = thread_descriptor;
+ wlog_t *item;
+ void *tls = stm_get_tls();
+
+ REDOLOG_LOOP_FORWARD(d->redolog, item)
+ {
+ pypy_g__stm_enum_callback(tls, item->addr, item->val);
+ } REDOLOG_LOOP_END;
+}
+
+long stm_in_transaction(void)
+{
+ struct tx_descriptor *d = thread_descriptor;
+ return d != NULL;
+}
+
+#undef GETVERSION
+#undef GETVERSIONREF
+#undef SETVERSION
+#undef GETTID
diff --git a/pypy/translator/stm/src_stm/et.c b/pypy/translator/stm/src_stm/et.c
--- a/pypy/translator/stm/src_stm/et.c
+++ b/pypy/translator/stm/src_stm/et.c
@@ -1,7 +1,5 @@
/* -*- c-basic-offset: 2 -*- */
-#ifndef PYPY_NOT_MAIN_FILE
-
/* XXX assumes that time never wraps around (in a 'long'), which may be
* correct on 64-bit machines but not on 32-bit machines if the process
* runs for long enough.
@@ -40,10 +38,7 @@
/* This is the same as the object header structure HDR
* declared in stmgc.py */
-typedef struct {
- long tid;
- long version;
-} orec_t;
+typedef struct pypy_header0 orec_t;
/************************************************************/
@@ -62,5 +57,3 @@
#include "src_stm/rpyintf.c"
/************************************************************/
-
-#endif /* PYPY_NOT_MAIN_FILE */
diff --git a/pypy/translator/stm/src_stm/et.h b/pypy/translator/stm/src_stm/et.h
--- a/pypy/translator/stm/src_stm/et.h
+++ b/pypy/translator/stm/src_stm/et.h
@@ -25,7 +25,11 @@
void stm_tldict_add(void *, void *);
void stm_tldict_enum(void);
-/* these functions are declared by generated C code from the GC */
+/* these functions are declared by generated C code from pypy.rlib.rstm
+ and from the GC (see llop.nop(...)) */
+extern void pypy_g__stm_thread_starting(void);
+extern void pypy_g__stm_thread_stopping(void);
+extern void *_stm_run_transaction(void *, long);
extern long pypy_g__stm_getsize(void *);
extern void pypy_g__stm_enum_callback(void *, void *, void *);
diff --git a/pypy/translator/stm/src_stm/fifo.c b/pypy/translator/stm/src_stm/fifo.c
new file mode 100644
--- /dev/null
+++ b/pypy/translator/stm/src_stm/fifo.c
@@ -0,0 +1,70 @@
+/* -*- c-basic-offset: 2 -*- */
+
+
+/* xxx Direct access to this field. Relies on genc producing always the
+ same names, but that should be ok. */
+#define NEXT(item) (((struct pypy_pypy_rlib_rstm_Transaction0 *)(item)) \
+ ->t_inst__next_transaction)
+
+
+typedef struct {
+ void *first;
+ void *last;
+} stm_fifo_t;
+
+
+static void fifo_init(stm_fifo_t *fifo)
+{
+ fifo->first = NULL;
+ fifo->last = NULL;
+}
+
+static void *fifo_next(void *item)
+{
+ return NEXT(item);
+}
+
+static void fifo_append(stm_fifo_t *fifo, void *newitem)
+{
+ NEXT(newitem) = NULL;
+ if (fifo->last == NULL)
+ fifo->first = newitem;
+ else
+ NEXT(fifo->last) = newitem;
+ fifo->last = newitem;
+}
+
+static bool_t fifo_is_empty(stm_fifo_t *fifo)
+{
+ assert((fifo->first == NULL) == (fifo->last == NULL));
+ return (fifo->first == NULL);
+}
+
+/* static bool_t fifo_is_of_length_1(stm_fifo_t *fifo) */
+/* { */
+/* return (fifo->first != NULL && fifo->first == fifo->last); */
+/* } */
+
+static void *fifo_popleft(stm_fifo_t *fifo)
+{
+ void *item = fifo->first;
+ fifo->first = NEXT(item);
+ if (fifo->first == NULL)
+ fifo->last = NULL;
+ return item;
+}
+
+static void fifo_extend(stm_fifo_t *fifo, void *newitems)
+{
+ if (fifo->last == NULL)
+ fifo->first = newitems;
+ else
+ NEXT(fifo->last) = newitems;
+
+ while (NEXT(newitems) != NULL)
+ newitems = NEXT(newitems);
+
+ fifo->last = newitems;
+}
+
+#undef NEXT
diff --git a/pypy/translator/stm/src_stm/rpyintf.c b/pypy/translator/stm/src_stm/rpyintf.c
--- a/pypy/translator/stm/src_stm/rpyintf.c
+++ b/pypy/translator/stm/src_stm/rpyintf.c
@@ -1,68 +1,7 @@
/* -*- c-basic-offset: 2 -*- */
-long stm_thread_id(void)
-{
- struct tx_descriptor *d = thread_descriptor;
- if (d == NULL)
- return 0; /* no thread_descriptor: it's the main thread */
- return d->my_lock_word;
-}
+#include "src_stm/fifo.c"
-static __thread void *rpython_tls_object;
-
-void stm_set_tls(void *newtls, long in_main_thread)
-{
- descriptor_init(in_main_thread);
- rpython_tls_object = newtls;
-}
-
-void *stm_get_tls(void)
-{
- return rpython_tls_object;
-}
-
-void stm_del_tls(void)
-{
- descriptor_done();
-}
-
-void *stm_tldict_lookup(void *key)
-{
- struct tx_descriptor *d = thread_descriptor;
- wlog_t* found;
- REDOLOG_FIND(d->redolog, key, found, goto not_found);
- return found->val;
-
- not_found:
- return NULL;
-}
-
-void stm_tldict_add(void *key, void *value)
-{
- struct tx_descriptor *d = thread_descriptor;
- assert(d != NULL);
- redolog_insert(&d->redolog, key, value);
-}
-
-void stm_tldict_enum(void)
-{
- struct tx_descriptor *d = thread_descriptor;
- wlog_t *item;
- void *tls = stm_get_tls();
-
- REDOLOG_LOOP_FORWARD(d->redolog, item)
- {
- pypy_g__stm_enum_callback(tls, item->addr, item->val);
- } REDOLOG_LOOP_END;
-}
-
-long stm_in_transaction(void)
-{
- struct tx_descriptor *d = thread_descriptor;
- return d != NULL;
-}
-
-/************************************************************/
/* this lock is acquired when we start running transactions, and
released only when we are finished. */
@@ -72,19 +11,107 @@
data in run_thread(). */
static pthread_mutex_t mutex_global = PTHREAD_MUTEX_INITIALIZER;
+/* this lock is acquired if and only if there are no tasks pending,
+ i.e. the linked list stm_g_first_transaction ... stm_g_last_transaction is
+ empty and both pointers are NULL. */
+static pthread_mutex_t mutex_no_tasks_pending = PTHREAD_MUTEX_INITIALIZER;
+
/* some global data put there by run_all_transactions(). */
-static void *g_first_transaction, *g_last_transaction;
-static int g_num_threads, g_num_waiting_threads;
+static stm_fifo_t stm_g_pending;
+static int stm_g_num_threads, stm_g_num_waiting_threads, stm_g_finished;
+
+
+static void* perform_transaction(void *transaction)
+{
+ void *new_transaction_list;
+ jmp_buf _jmpbuf;
+ long counter;
+ volatile long v_counter = 0;
+
+ setjmp(_jmpbuf);
+
+ begin_transaction(&_jmpbuf);
+
+ counter = v_counter;
+ v_counter = counter + 1;
+
+ new_transaction_list = pypy_g__stm_run_transaction(transaction, counter);
+
+ commit_transaction();
+
+ return new_transaction_list;
+}
+
+static void add_list(void *new_transaction_list)
+{
+ bool_t was_empty;
+
+ if (new_transaction_list == NULL)
+ return;
+
+ was_empty = fifo_is_empty(&stm_g_pending);
+ fifo_extend(&stm_g_pending, new_transaction_list);
+ if (was_empty)
+ pthread_mutex_unlock(&mutex_no_tasks_pending);
+}
+
/* the main function running a thread */
static void *run_thread(void *ignored)
{
pthread_mutex_lock(&mutex_global);
+ pypy_g__stm_thread_starting();
- g_num_waiting_threads++;
- if (g_num_waiting_threads == g_num_threads)
+ while (1)
+ {
+ if (fifo_is_empty(&stm_g_pending))
+ {
+ stm_g_num_waiting_threads += 1;
+ if (stm_g_num_waiting_threads == stm_g_num_threads)
+ {
+ stm_g_finished = 1;
+ pthread_mutex_unlock(&mutex_no_tasks_pending);
+ }
+ pthread_mutex_unlock(&mutex_global);
+
+ pthread_mutex_lock(&mutex_no_tasks_pending);
+ pthread_mutex_unlock(&mutex_no_tasks_pending);
+
+ pthread_mutex_lock(&mutex_global);
+ stm_g_num_waiting_threads -= 1;
+ if (stm_g_finished)
+ break;
+ }
+ else
+ {
+ void *new_transaction_list;
+ void *transaction = fifo_popleft(&stm_g_pending);
+ if (fifo_is_empty(&stm_g_pending))
+ pthread_mutex_lock(&mutex_no_tasks_pending);
+ pthread_mutex_unlock(&mutex_global);
+
+ while (1)
+ {
+ new_transaction_list = perform_transaction(transaction);
+
+ /* for now, always break out of this loop,
+ unless 'new_transaction_list' contains precisely one item */
+ if (new_transaction_list == NULL)
+ break;
+ if (fifo_next(new_transaction_list) != NULL)
+ break;
+
+ transaction = new_transaction_list; /* single element */
+ }
+
+ pthread_mutex_lock(&mutex_global);
+ add_list(new_transaction_list);
+ }
+ }
+
+ pypy_g__stm_thread_stopping();
+ if (stm_g_num_waiting_threads == 0) /* only the last thread to leave */
pthread_mutex_unlock(&mutex_unfinished);
-
pthread_mutex_unlock(&mutex_global);
return NULL;
}
@@ -93,10 +120,11 @@
long num_threads)
{
int i;
- g_first_transaction = initial_transaction;
- g_last_transaction = initial_transaction;
- g_num_threads = (int)num_threads;
- g_num_waiting_threads = 0;
+ fifo_init(&stm_g_pending);
+ fifo_append(&stm_g_pending, initial_transaction);
+ stm_g_num_threads = (int)num_threads;
+ stm_g_num_waiting_threads = 0;
+ stm_g_finished = 0;
pthread_mutex_lock(&mutex_unfinished);
@@ -104,7 +132,12 @@
{
pthread_t th;
int status = pthread_create(&th, NULL, run_thread, NULL);
- assert(status == 0);
+ if (status != 0)
+ {
+ /* XXX turn into a nice exception */
+ fprintf(stderr, "fatal error: cannot create threads\n");
+ exit(1);
+ }
pthread_detach(th);
}
diff --git a/pypy/translator/stm/stmgcintf.py b/pypy/translator/stm/stmgcintf.py
--- a/pypy/translator/stm/stmgcintf.py
+++ b/pypy/translator/stm/stmgcintf.py
@@ -10,7 +10,7 @@
eci = ExternalCompilationInfo(
include_dirs = [cdir, cdir2],
- includes = ['src_stm/et.h', 'src_stm/et.c'],
+ includes = ['src_stm/et.h'],
pre_include_bits = ['#define PYPY_LONG_BIT %d' % LONG_BIT,
'#define RPY_STM 1'],
separate_module_sources = ['\n'], # hack for test_rffi_stm
diff --git a/pypy/translator/stm/test/targetdemo.py b/pypy/translator/stm/test/targetdemo.py
--- a/pypy/translator/stm/test/targetdemo.py
+++ b/pypy/translator/stm/test/targetdemo.py
@@ -66,7 +66,7 @@
ll_assert(res is self.arg, "ERROR: bogus pointer equality")
raw1 = rffi.cast(rffi.CCHARP, self.retry_counter)
raw2 = rffi.cast(rffi.CCHARP, -1)
- ll_assert(raw1 == raw2, "ERROR: retry_counter == -1")
+ ll_assert(raw1 != raw2, "ERROR: retry_counter == -1")
class MakeChain(rstm.Transaction):
def __init__(self, anchor, value):
More information about the pypy-commit
mailing list