[pypy-commit] stmgc c7: in-progress
arigo
noreply at buildbot.pypy.org
Tue Jan 14 10:45:43 CET 2014
Author: Armin Rigo <arigo at tunes.org>
Branch: c7
Changeset: r602:4561d50017f8
Date: 2014-01-14 10:45 +0100
http://bitbucket.org/pypy/stmgc/changeset/4561d50017f8/
Log: in-progress
diff --git a/c7/core.c b/c7/core.c
--- a/c7/core.c
+++ b/c7/core.c
@@ -23,7 +23,6 @@
# define HAVE_FULL_EXCHANGE_INSN
#endif
-
typedef TLPREFIX char localchar_t;
typedef TLPREFIX struct alloc_for_size_s alloc_for_size_t;
typedef TLPREFIX struct _thread_local2_s _thread_local2_t;
@@ -38,6 +37,7 @@
struct _thread_local2_s {
struct _thread_local1_s _tl1;
int thread_num;
+ bool running_transaction;
char *thread_base;
struct stm_list_s *modified_objects;
struct stm_list_s *new_object_ranges;
@@ -49,15 +49,9 @@
static char *object_pages;
-static char *undo_log_pages;
-static char *undo_log_current;
-static int num_threads_started, leader_thread_num;
+static int num_threads_started;
static uintptr_t index_page_never_used;
-static int next_write_version;
-static int undo_lock;
-static struct stm_list_s *global_history;
-static uint16_t gh_write_version_first;
-static uint16_t gh_write_version_last;
+static struct stm_list_s *volatile pending_updates;
static uint8_t flag_page_private[NB_PAGES]; /* xxx_PAGE constants above */
@@ -146,6 +140,9 @@
void *localpg = object_pages + localpgoff * 4096UL;
void *otherpg = object_pages + otherpgoff * 4096UL;
+ // XXX should not use pgoff2, but instead the next unused page in
+ // thread 2, so that after major GCs the next dirty pages are the
+ // same as the old ones
int res = remap_file_pages(localpg, 4096, 0, pgoff2, 0);
if (res < 0) {
perror("remap_file_pages");
@@ -174,87 +171,58 @@
enum detect_conflicts_e { CANNOT_CONFLICT, CAN_CONFLICT };
-/* XXX this can be done by acquiring the undo_lock for much less time,
- but it needs to be carefully synchronized with _stm_write_slowpath().
- For now it must be called with the undo_lock acquired. */
static void update_to_current_version(enum detect_conflicts_e check_conflict)
{
- /* Loop over objects in 'global_history': if they have been
+ /* Loop over objects in 'pending_updates': if they have been
read by the current transaction, the current transaction must
- abort; then copy them out of the leader's object space ---
- which may have been modified by the leader's uncommitted
- transaction; this case will be fixed afterwards.
+ abort; then copy them out of the other thread's object space,
+ which is not modified so far (the other thread just committed
+ and will wait until we are done here before it starts the
+ next transaction).
*/
bool conflict_found_or_dont_check = (check_conflict == CANNOT_CONFLICT);
char *local_base = _STM_TL2->thread_base;
char *remote_base = get_thread_base(1 - _STM_TL2->thread_num);
- struct stm_list_s *gh, *gh_next;
+ struct stm_list_s *pu = pending_updates;
- assert(leader_thread_num != _STM_TL2->thread_num);
+ assert(pu != _STM_TL2->modified_objects);
- for (gh = global_history; gh != NULL; gh = gh_next) {
+ STM_LIST_FOREACH(pu, ({
- STM_LIST_FOREACH(gh, ({
+ if (!conflict_found_or_dont_check)
+ conflict_found_or_dont_check = _stm_was_read(item);
- if (!conflict_found_or_dont_check)
- conflict_found_or_dont_check = _stm_was_read(item);
-
- char *dst = REAL_ADDRESS(local_base, item);
- char *src = REAL_ADDRESS(remote_base, item);
- char *src_rebased = src - (uintptr_t)local_base;
- size_t size = stm_object_size_rounded_up((object_t *)src_rebased);
-
- memcpy(dst + sizeof(char *),
- src + sizeof(char *),
- size - sizeof(char *));
- }));
-
- gh_next = gh->nextlist;
- stm_list_free(gh);
- }
- global_history = NULL;
- gh_write_version_first = 0xffff;
- gh_write_version_last = 0;
-
- /* Finally, loop over objects modified by the leader,
- and copy them out of the undo log.
- */
- char *undo = undo_log_pages;
- char *undo_end = undo_log_current;
-
- while (undo < undo_end) {
-
- char *src = undo;
- char *dst = *(char **)src;
+ char *dst = REAL_ADDRESS(local_base, item);
+ char *src = REAL_ADDRESS(remote_base, item);
char *src_rebased = src - (uintptr_t)local_base;
-
- *(char **)src = *(char **)dst; /* fix the first word of the object in
- the undo log, for stm_object_size() */
size_t size = stm_object_size_rounded_up((object_t *)src_rebased);
memcpy(dst + sizeof(char *),
src + sizeof(char *),
size - sizeof(char *));
+ }));
- undo += size;
- }
- undo_log_current = undo_log_pages; /* make empty again */
+ write_fence();
+ pending_updates = NULL;
if (conflict_found_or_dont_check && check_conflict == CAN_CONFLICT) {
- release_lock(&undo_lock);
stm_abort_transaction();
}
}
static void maybe_update(enum detect_conflicts_e check_conflict)
{
- if (leader_thread_num != _STM_TL2->thread_num && global_history != NULL) {
- acquire_lock(&undo_lock);
+ if (pending_updates != NULL) {
update_to_current_version(check_conflict);
- release_lock(&undo_lock);
}
}
+static void wait_until_updated(void)
+{
+ while (pending_updates == _STM_TL2->modified_objects)
+ spin_loop();
+}
+
void _stm_write_slowpath(object_t *obj)
{
@@ -263,43 +231,10 @@
_stm_privatize(((uintptr_t)obj) / 4096);
stm_read(obj);
+ obj->write_version = _STM_TL1->transaction_write_version;
_STM_TL2->modified_objects = stm_list_append(
_STM_TL2->modified_objects, obj);
-
- uint16_t wv = obj->write_version;
- obj->write_version = _STM_TL1->transaction_write_version;
-
- /* We only need to store a copy of the current version of the object if:
- - we are the leader;
- - the object is present in the global_history.
- The second condition is approximated by the following range check.
- Storing a few more objects than strictly needed is not really a problem.
- */
- /* XXX this can be done without acquiring the undo_lock at all,
- but we need more care in update_to_current_version(). */
-
- /* XXX can we avoid writing an unbounded number of copies of the
- same object in case we run a lot of transactions while the other
- thread is busy? Unlikely case but in theory annoying. Should
- we anyway bound the undo log's size to much less than NB_PAGES,
- and if full here, sleep? Should the bound also count the size
- taken by the global_history lists? */
- if (ACQUIRE_LOCK_IF(&undo_lock,
- wv <= gh_write_version_last && wv >= gh_write_version_first
- && leader_thread_num == _STM_TL2->thread_num)) {
- /* record in the undo log a copy of the content of the object */
- size_t size = stm_object_size_rounded_up(obj);
- char *source = real_address((uintptr_t)obj);
- char *undo = undo_log_current;
- *((object_t **)undo) = obj;
- memcpy(undo + sizeof(object_t *),
- source + sizeof(object_t *),
- size - sizeof(object_t *));
- /*write_fence();*/
- undo_log_current = undo + size;
- release_lock(&undo_lock);
- }
}
@@ -384,7 +319,7 @@
}
-#define TOTAL_MEMORY (NB_PAGES * 4096UL * (NB_THREADS + 1))
+#define TOTAL_MEMORY (NB_PAGES * 4096UL * NB_THREADS)
#define READMARKER_END ((NB_PAGES * 4096UL) >> 4)
#define FIRST_OBJECT_PAGE ((READMARKER_END + 4095) / 4096UL)
#define READMARKER_START ((FIRST_OBJECT_PAGE * 4096UL) >> 4)
@@ -443,22 +378,12 @@
}
}
- undo_log_pages = get_thread_base(NB_THREADS);
- mprotect(undo_log_pages, 4096, PROT_NONE);
- mprotect(undo_log_pages + (NB_PAGES - 1) * 4096UL, 4096, PROT_NONE);
- undo_log_pages += 4096;
- undo_log_current = undo_log_pages;
-
num_threads_started = 0;
index_page_never_used = FIRST_OBJECT_PAGE;
- next_write_version = 1;
- leader_thread_num = 0;
- global_history = NULL;
- gh_write_version_first = 0xffff;
- gh_write_version_last = 0;
+ pending_updates = NULL;
}
-#define INVALID_GS_VALUE 0xDDDDDDDDDDDDDDDDUL
+#define INVALID_GS_VALUE 0x6D6D6D6D
static void set_gs_register(uint64_t value)
{
@@ -478,10 +403,13 @@
assert(_STM_TL2->thread_base == thread_base);
_STM_TL2->modified_objects = stm_list_create();
+ assert(!_STM_TL2->running_transaction);
}
void _stm_teardown_thread(void)
{
+ assert(!_STM_TL2->running_transaction);
+ wait_until_updated();
stm_list_free(_STM_TL2->modified_objects);
_STM_TL2->modified_objects = NULL;
@@ -492,8 +420,6 @@
{
munmap(object_pages, TOTAL_MEMORY);
object_pages = NULL;
- undo_log_pages = NULL;
- undo_log_current = NULL;
}
@@ -519,39 +445,45 @@
perror("madvise");
abort();
}
- _STM_TL1->transaction_read_version = 0;
+ _STM_TL1->transaction_read_version = 1;
}
void stm_major_collection(void)
{
+ assert(_STM_TL2->running_transaction);
abort();
}
-void stm_start_transaction(jmp_buf *jmpbufptr)
+void stm_start_transaction(jmpbufptr_t *jmpbufptr)
{
- if (_STM_TL1->transaction_read_version == 0xff)
+ assert(!_STM_TL2->running_transaction);
+
+ uint8_t old_rv = _STM_TL1->transaction_read_version;
+ _STM_TL1->transaction_read_version = old_rv + 1;
+ if (UNLIKELY(old_rv == 0xff))
reset_transaction_read_version();
- _STM_TL1->transaction_read_version++;
- _STM_TL1->jmpbufptr = NULL;
- while (1) {
- int wv = __sync_fetch_and_add(&next_write_version, 1);
- if (LIKELY(wv <= 0xffff)) {
- _STM_TL1->transaction_write_version = wv;
- break;
- }
+ int old_wv = _STM_TL1->transaction_write_version;
+ _STM_TL1->transaction_write_version = old_wv + 1;
+ if (UNLIKELY(old_wv == 0xffff)) {
/* We run out of 16-bit numbers before we do the next major
collection, which resets it. XXX This case seems unlikely
for now, but check if it could become a bottleneck at some
point. */
stm_major_collection();
}
- assert(stm_list_is_empty(_STM_TL2->modified_objects));
+
+ wait_until_updated();
+ stm_list_clear(_STM_TL2->modified_objects);
assert(stm_list_is_empty(_STM_TL2->new_object_ranges));
+ /* check that there is no stm_abort() in the following maybe_update() */
+ _STM_TL1->jmpbufptr = NULL;
+
maybe_update(CANNOT_CONFLICT); /* no read object: cannot conflict */
_STM_TL1->jmpbufptr = jmpbufptr;
+ _STM_TL2->running_transaction = 1;
}
static void update_new_objects_in_other_threads(uintptr_t pagenum,
@@ -567,10 +499,13 @@
char *src = REAL_ADDRESS(_STM_TL2->thread_base, local_src);
memcpy(dst, src, size);
+ ...;
}
void stm_stop_transaction(void)
{
+ assert(_STM_TL2->running_transaction);
+
write_fence(); /* see later in this function for why */
acquire_lock(&undo_lock);
@@ -596,8 +531,7 @@
_STM_TL2->modified_objects = stm_list_create();
uint16_t wv = _STM_TL1->transaction_write_version;
- if (wv < gh_write_version_last) gh_write_version_last = wv;
- if (wv > gh_write_version_first) gh_write_version_first = wv;
+ if (gh_write_version_first < wv) gh_write_version_first = wv;
/* walk the new_object_ranges and manually copy the new objects
to the other thread's pages in the (hopefully rare) case that
@@ -664,11 +598,14 @@
}
}
+ _STM_TL2->running_transaction = 0;
release_lock(&undo_lock);
}
void stm_abort_transaction(void)
{
+ assert(_STM_TL2->running_transaction);
+ // XXX copy back the modified objects!!
long j;
for (j = 2; j < LARGE_OBJECT_WORDS; j++) {
alloc_for_size_t *alloc = &_STM_TL2->alloc[j];
@@ -678,6 +615,7 @@
stm_list_clear(_STM_TL2->new_object_ranges);
stm_list_clear(_STM_TL2->modified_objects);
assert(_STM_TL1->jmpbufptr != NULL);
- assert(_STM_TL1->jmpbufptr != (jmp_buf *)-1); /* for tests only */
- longjmp(*_STM_TL1->jmpbufptr, 1);
+ assert(_STM_TL1->jmpbufptr != (jmpbufptr_t *)-1); /* for tests only */
+ _STM_TL2->running_transaction = 0;
+ __builtin_longjmp(*_STM_TL1->jmpbufptr, 1);
}
diff --git a/c7/core.h b/c7/core.h
--- a/c7/core.h
+++ b/c7/core.h
@@ -3,7 +3,6 @@
#include <stdint.h>
#include <stdbool.h>
-#include <setjmp.h>
#define TLPREFIX __attribute__((address_space(256)))
@@ -31,8 +30,7 @@
*/
struct object_s {
- uint16_t write_version; /* reserved for the STM library */
- /*uint8_t stm_flags;*/
+ uint8_t stm_flags; /* reserved for the STM library */
uint32_t header; /* for the user program -- only write in
newly allocated objects */
};
@@ -41,8 +39,10 @@
uint8_t rm;
};
+typedef intptr_t jmpbufptr_t[5]; /* for use with __builtin_setjmp() */
+
struct _thread_local1_s {
- jmp_buf *jmpbufptr;
+ jmpbufptr_t *jmpbufptr;
uint8_t transaction_read_version;
uint16_t transaction_write_version;
};
diff --git a/c7/list.h b/c7/list.h
--- a/c7/list.h
+++ b/c7/list.h
@@ -8,7 +8,7 @@
uintptr_t count;
union {
uintptr_t last_allocated; /* always odd */
- struct stm_list_s *nextlist; /* always even */
+ //struct stm_list_s *nextlist; /* always even */
};
object_t *items[];
};
More information about the pypy-commit
mailing list