[pypy-commit] stmgc c7: in-progress

arigo noreply at buildbot.pypy.org
Tue Jan 14 10:45:43 CET 2014


Author: Armin Rigo <arigo at tunes.org>
Branch: c7
Changeset: r602:4561d50017f8
Date: 2014-01-14 10:45 +0100
http://bitbucket.org/pypy/stmgc/changeset/4561d50017f8/

Log:	in-progress

diff --git a/c7/core.c b/c7/core.c
--- a/c7/core.c
+++ b/c7/core.c
@@ -23,7 +23,6 @@
 #  define HAVE_FULL_EXCHANGE_INSN
 #endif
 
-
 typedef TLPREFIX char localchar_t;
 typedef TLPREFIX struct alloc_for_size_s alloc_for_size_t;
 typedef TLPREFIX struct _thread_local2_s _thread_local2_t;
@@ -38,6 +37,7 @@
 struct _thread_local2_s {
     struct _thread_local1_s _tl1;
     int thread_num;
+    bool running_transaction;
     char *thread_base;
     struct stm_list_s *modified_objects;
     struct stm_list_s *new_object_ranges;
@@ -49,15 +49,9 @@
 
 
 static char *object_pages;
-static char *undo_log_pages;
-static char *undo_log_current;
-static int num_threads_started, leader_thread_num;
+static int num_threads_started;
 static uintptr_t index_page_never_used;
-static int next_write_version;
-static int undo_lock;
-static struct stm_list_s *global_history;
-static uint16_t gh_write_version_first;
-static uint16_t gh_write_version_last;
+static struct stm_list_s *volatile pending_updates;
 static uint8_t flag_page_private[NB_PAGES];   /* xxx_PAGE constants above */
 
 
@@ -146,6 +140,9 @@
     void *localpg = object_pages + localpgoff * 4096UL;
     void *otherpg = object_pages + otherpgoff * 4096UL;
 
+    // XXX should not use pgoff2, but instead the next unused page in
+    // thread 2, so that after major GCs the next dirty pages are the
+    // same as the old ones
     int res = remap_file_pages(localpg, 4096, 0, pgoff2, 0);
     if (res < 0) {
         perror("remap_file_pages");
@@ -174,87 +171,58 @@
 
 enum detect_conflicts_e { CANNOT_CONFLICT, CAN_CONFLICT };
 
-/* XXX this can be done by acquiring the undo_lock for much less time,
-   but it needs to be carefully synchronized with _stm_write_slowpath().
-   For now it must be called with the undo_lock acquired. */
 static void update_to_current_version(enum detect_conflicts_e check_conflict)
 {
-    /* Loop over objects in 'global_history': if they have been
+    /* Loop over objects in 'pending_updates': if they have been
        read by the current transaction, the current transaction must
-       abort; then copy them out of the leader's object space ---
-       which may have been modified by the leader's uncommitted
-       transaction; this case will be fixed afterwards.
+       abort; then copy them out of the other thread's object space,
+       which is not modified so far (the other thread just committed
+       and will wait until we are done here before it starts the
+       next transaction).
     */
     bool conflict_found_or_dont_check = (check_conflict == CANNOT_CONFLICT);
     char *local_base = _STM_TL2->thread_base;
     char *remote_base = get_thread_base(1 - _STM_TL2->thread_num);
-    struct stm_list_s *gh, *gh_next;
+    struct stm_list_s *pu = pending_updates;
 
-    assert(leader_thread_num != _STM_TL2->thread_num);
+    assert(pu != _STM_TL2->modified_objects);
 
-    for (gh = global_history; gh != NULL; gh = gh_next) {
+    STM_LIST_FOREACH(pu, ({
 
-        STM_LIST_FOREACH(gh, ({
+        if (!conflict_found_or_dont_check)
+            conflict_found_or_dont_check = _stm_was_read(item);
 
-            if (!conflict_found_or_dont_check)
-                conflict_found_or_dont_check = _stm_was_read(item);
-
-            char *dst = REAL_ADDRESS(local_base, item);
-            char *src = REAL_ADDRESS(remote_base, item);
-            char *src_rebased = src - (uintptr_t)local_base;
-            size_t size = stm_object_size_rounded_up((object_t *)src_rebased);
-
-            memcpy(dst + sizeof(char *),
-                   src + sizeof(char *),
-                   size - sizeof(char *));
-        }));
-
-        gh_next = gh->nextlist;
-        stm_list_free(gh);
-    }
-    global_history = NULL;
-    gh_write_version_first = 0xffff;
-    gh_write_version_last = 0;
-
-    /* Finally, loop over objects modified by the leader,
-       and copy them out of the undo log.
-    */
-    char *undo = undo_log_pages;
-    char *undo_end = undo_log_current;
-
-    while (undo < undo_end) {
-
-        char *src = undo;
-        char *dst = *(char **)src;
+        char *dst = REAL_ADDRESS(local_base, item);
+        char *src = REAL_ADDRESS(remote_base, item);
         char *src_rebased = src - (uintptr_t)local_base;
-
-        *(char **)src = *(char **)dst; /* fix the first word of the object in
-                                         the undo log, for stm_object_size() */
         size_t size = stm_object_size_rounded_up((object_t *)src_rebased);
 
         memcpy(dst + sizeof(char *),
                src + sizeof(char *),
                size - sizeof(char *));
+    }));
 
-        undo += size;
-    }
-    undo_log_current = undo_log_pages;   /* make empty again */
+    write_fence();
+    pending_updates = NULL;
 
     if (conflict_found_or_dont_check && check_conflict == CAN_CONFLICT) {
-        release_lock(&undo_lock);
         stm_abort_transaction();
     }
 }
 
 static void maybe_update(enum detect_conflicts_e check_conflict)
 {
-    if (leader_thread_num != _STM_TL2->thread_num && global_history != NULL) {
-        acquire_lock(&undo_lock);
+    if (pending_updates != NULL) {
         update_to_current_version(check_conflict);
-        release_lock(&undo_lock);
     }
 }
 
+static void wait_until_updated(void)
+{
+    while (pending_updates == _STM_TL2->modified_objects)
+        spin_loop();
+}
+
 
 void _stm_write_slowpath(object_t *obj)
 {
@@ -263,43 +231,10 @@
     _stm_privatize(((uintptr_t)obj) / 4096);
 
     stm_read(obj);
+    obj->write_version = _STM_TL1->transaction_write_version;
 
     _STM_TL2->modified_objects = stm_list_append(
         _STM_TL2->modified_objects, obj);
-
-    uint16_t wv = obj->write_version;
-    obj->write_version = _STM_TL1->transaction_write_version;
-
-    /* We only need to store a copy of the current version of the object if:
-       - we are the leader;
-       - the object is present in the global_history.
-       The second condition is approximated by the following range check.
-       Storing a few more objects than strictly needed is not really a problem.
-    */
-    /* XXX this can be done without acquiring the undo_lock at all,
-       but we need more care in update_to_current_version(). */
-
-    /* XXX can we avoid writing an unbounded number of copies of the
-       same object in case we run a lot of transactions while the other
-       thread is busy?  Unlikely case but in theory annoying.  Should
-       we anyway bound the undo log's size to much less than NB_PAGES,
-       and if full here, sleep?  Should the bound also count the size
-       taken by the global_history lists? */
-    if (ACQUIRE_LOCK_IF(&undo_lock,
-            wv <= gh_write_version_last && wv >= gh_write_version_first
-                && leader_thread_num == _STM_TL2->thread_num)) {
-        /* record in the undo log a copy of the content of the object */
-        size_t size = stm_object_size_rounded_up(obj);
-        char *source = real_address((uintptr_t)obj);
-        char *undo = undo_log_current;
-        *((object_t **)undo) = obj;
-        memcpy(undo + sizeof(object_t *),
-               source + sizeof(object_t *),
-               size - sizeof(object_t *));
-        /*write_fence();*/
-        undo_log_current = undo + size;
-        release_lock(&undo_lock);
-    }
 }
 
 
@@ -384,7 +319,7 @@
 }
 
 
-#define TOTAL_MEMORY          (NB_PAGES * 4096UL * (NB_THREADS + 1))
+#define TOTAL_MEMORY          (NB_PAGES * 4096UL * NB_THREADS)
 #define READMARKER_END        ((NB_PAGES * 4096UL) >> 4)
 #define FIRST_OBJECT_PAGE     ((READMARKER_END + 4095) / 4096UL)
 #define READMARKER_START      ((FIRST_OBJECT_PAGE * 4096UL) >> 4)
@@ -443,22 +378,12 @@
         }
     }
 
-    undo_log_pages = get_thread_base(NB_THREADS);
-    mprotect(undo_log_pages, 4096, PROT_NONE);
-    mprotect(undo_log_pages + (NB_PAGES - 1) * 4096UL, 4096, PROT_NONE);
-    undo_log_pages += 4096;
-    undo_log_current = undo_log_pages;
-
     num_threads_started = 0;
     index_page_never_used = FIRST_OBJECT_PAGE;
-    next_write_version = 1;
-    leader_thread_num = 0;
-    global_history = NULL;
-    gh_write_version_first = 0xffff;
-    gh_write_version_last = 0;
+    pending_updates = NULL;
 }
 
-#define INVALID_GS_VALUE  0xDDDDDDDDDDDDDDDDUL
+#define INVALID_GS_VALUE  0x6D6D6D6D
 
 static void set_gs_register(uint64_t value)
 {
@@ -478,10 +403,13 @@
     assert(_STM_TL2->thread_base == thread_base);
 
     _STM_TL2->modified_objects = stm_list_create();
+    assert(!_STM_TL2->running_transaction);
 }
 
 void _stm_teardown_thread(void)
 {
+    assert(!_STM_TL2->running_transaction);
+    wait_until_updated();
     stm_list_free(_STM_TL2->modified_objects);
     _STM_TL2->modified_objects = NULL;
 
@@ -492,8 +420,6 @@
 {
     munmap(object_pages, TOTAL_MEMORY);
     object_pages = NULL;
-    undo_log_pages = NULL;
-    undo_log_current = NULL;
 }
 
 
@@ -519,39 +445,45 @@
         perror("madvise");
         abort();
     }
-    _STM_TL1->transaction_read_version = 0;
+    _STM_TL1->transaction_read_version = 1;
 }
 
 void stm_major_collection(void)
 {
+    assert(_STM_TL2->running_transaction);
     abort();
 }
 
-void stm_start_transaction(jmp_buf *jmpbufptr)
+void stm_start_transaction(jmpbufptr_t *jmpbufptr)
 {
-    if (_STM_TL1->transaction_read_version == 0xff)
+    assert(!_STM_TL2->running_transaction);
+
+    uint8_t old_rv = _STM_TL1->transaction_read_version;
+    _STM_TL1->transaction_read_version = old_rv + 1;
+    if (UNLIKELY(old_rv == 0xff))
         reset_transaction_read_version();
-    _STM_TL1->transaction_read_version++;
-    _STM_TL1->jmpbufptr = NULL;
 
-    while (1) {
-        int wv = __sync_fetch_and_add(&next_write_version, 1);
-        if (LIKELY(wv <= 0xffff)) {
-            _STM_TL1->transaction_write_version = wv;
-            break;
-        }
+    int old_wv = _STM_TL1->transaction_write_version;
+    _STM_TL1->transaction_write_version = old_wv + 1;
+    if (UNLIKELY(old_wv == 0xffff)) {
         /* We run out of 16-bit numbers before we do the next major
            collection, which resets it.  XXX This case seems unlikely
            for now, but check if it could become a bottleneck at some
            point. */
         stm_major_collection();
     }
-    assert(stm_list_is_empty(_STM_TL2->modified_objects));
+
+    wait_until_updated();
+    stm_list_clear(_STM_TL2->modified_objects);
     assert(stm_list_is_empty(_STM_TL2->new_object_ranges));
 
+    /* check that there is no stm_abort() in the following maybe_update() */
+    _STM_TL1->jmpbufptr = NULL;
+
     maybe_update(CANNOT_CONFLICT);    /* no read object: cannot conflict */
 
     _STM_TL1->jmpbufptr = jmpbufptr;
+    _STM_TL2->running_transaction = 1;
 }
 
 static void update_new_objects_in_other_threads(uintptr_t pagenum,
@@ -567,10 +499,13 @@
     char *src = REAL_ADDRESS(_STM_TL2->thread_base,           local_src);
 
     memcpy(dst, src, size);
+    ...;
 }
 
 void stm_stop_transaction(void)
 {
+    assert(_STM_TL2->running_transaction);
+
     write_fence();   /* see later in this function for why */
 
     acquire_lock(&undo_lock);
@@ -596,8 +531,7 @@
     _STM_TL2->modified_objects = stm_list_create();
 
     uint16_t wv = _STM_TL1->transaction_write_version;
-    if (wv < gh_write_version_last)  gh_write_version_last  = wv;
-    if (wv > gh_write_version_first) gh_write_version_first = wv;
+    if (gh_write_version_first < wv) gh_write_version_first = wv;
 
     /* walk the new_object_ranges and manually copy the new objects
        to the other thread's pages in the (hopefully rare) case that
@@ -664,11 +598,14 @@
         }
     }
 
+    _STM_TL2->running_transaction = 0;
     release_lock(&undo_lock);
 }
 
 void stm_abort_transaction(void)
 {
+    assert(_STM_TL2->running_transaction);
+    // XXX copy back the modified objects!!
     long j;
     for (j = 2; j < LARGE_OBJECT_WORDS; j++) {
         alloc_for_size_t *alloc = &_STM_TL2->alloc[j];
@@ -678,6 +615,7 @@
     stm_list_clear(_STM_TL2->new_object_ranges);
     stm_list_clear(_STM_TL2->modified_objects);
     assert(_STM_TL1->jmpbufptr != NULL);
-    assert(_STM_TL1->jmpbufptr != (jmp_buf *)-1);   /* for tests only */
-    longjmp(*_STM_TL1->jmpbufptr, 1);
+    assert(_STM_TL1->jmpbufptr != (jmpbufptr_t *)-1);   /* for tests only */
+    _STM_TL2->running_transaction = 0;
+    __builtin_longjmp(*_STM_TL1->jmpbufptr, 1);
 }
diff --git a/c7/core.h b/c7/core.h
--- a/c7/core.h
+++ b/c7/core.h
@@ -3,7 +3,6 @@
 
 #include <stdint.h>
 #include <stdbool.h>
-#include <setjmp.h>
 
 
 #define TLPREFIX __attribute__((address_space(256)))
@@ -31,8 +30,7 @@
 */
 
 struct object_s {
-    uint16_t write_version;       /* reserved for the STM library */
-    /*uint8_t stm_flags;*/
+    uint8_t stm_flags;            /* reserved for the STM library */
     uint32_t header;              /* for the user program -- only write in
                                      newly allocated objects */
 };
@@ -41,8 +39,10 @@
     uint8_t rm;
 };
 
+typedef intptr_t jmpbufptr_t[5];  /* for use with __builtin_setjmp() */
+
 struct _thread_local1_s {
-    jmp_buf *jmpbufptr;
+    jmpbufptr_t *jmpbufptr;
     uint8_t transaction_read_version;
     uint16_t transaction_write_version;
 };
diff --git a/c7/list.h b/c7/list.h
--- a/c7/list.h
+++ b/c7/list.h
@@ -8,7 +8,7 @@
     uintptr_t count;
     union {
         uintptr_t last_allocated;       /* always odd */
-        struct stm_list_s *nextlist;    /* always even */
+        //struct stm_list_s *nextlist;    /* always even */
     };
     object_t *items[];
 };


More information about the pypy-commit mailing list