[pypy-commit] stmgc default: Introduce tx_public_descriptor to cope with threads ending
arigo
noreply at buildbot.pypy.org
Sat Jun 8 21:25:56 CEST 2013
Author: Armin Rigo <arigo at tunes.org>
Branch:
Changeset: r79:91fdce635f8f
Date: 2013-06-08 21:25 +0200
http://bitbucket.org/pypy/stmgc/changeset/91fdce635f8f/
Log: Introduce tx_public_descriptor to cope with threads ending
diff --git a/c4/doc-objects.txt b/c4/doc-objects.txt
--- a/c4/doc-objects.txt
+++ b/c4/doc-objects.txt
@@ -83,8 +83,7 @@
- the PRN (private revision number): odd, negative, changes for every
transaction that commits
-- list active_backup_copies =
- [(private converted from protected, backup copy)]
+- list active_backup_copies = [(private, backup copy)]
- dict public_to_private = {public obj: private copy}
diff --git a/c4/et.c b/c4/et.c
--- a/c4/et.c
+++ b/c4/et.c
@@ -82,6 +82,13 @@
static void steal(gcptr P)
{
+ struct tx_public_descriptor *foreign_pd;
+ revision_t target_descriptor_index;
+ revision_t v = ACCESS_ONCE(P->h_revision);
+ if ((v & 3) != 2)
+ return;
+ target_descriptor_index = *(revision_t *)(v & ~(HANDLE_BLOCK_SIZE-1));
+ //foreign_pd = ACCESS_ONCE(stm_descriptor_array[target_descriptor_index]);
abort();
}
@@ -167,7 +174,7 @@
old_to_young:;
revision_t target_descriptor_index;
target_descriptor_index = *(revision_t *)(v & ~(HANDLE_BLOCK_SIZE-1));
- if (target_descriptor_index == d->descriptor_index)
+ if (target_descriptor_index == d->public_descriptor_index)
{
P = (gcptr)(*(revision_t *)(v - 2));
assert(!(P->h_tid & GCFLAG_PUBLIC));
@@ -195,7 +202,7 @@
/* stealing */
fprintf(stderr, "read_barrier: %p -> stealing %p...", G, (gcptr)v);
steal(P);
- abort();
+ goto retry;
}
}
@@ -331,7 +338,7 @@
memcpy(B + 1, P + 1, size - sizeof(*B));
}
assert(B->h_tid & GCFLAG_BACKUP_COPY);
- g2l_insert(&d->private_to_backup, P, B);
+ gcptrlist_insert2(&d->public_descriptor->active_backup_copies, P, B);
P->h_revision = stm_private_rev_num;
return P;
}
@@ -394,10 +401,13 @@
gcptr stm_get_backup_copy(gcptr P)
{
- struct tx_descriptor *d = thread_descriptor;
- wlog_t *entry;
- G2L_FIND(d->private_to_backup, P, entry, return NULL);
- return entry->val;
+ struct tx_public_descriptor *pd = thread_descriptor->public_descriptor;
+ long i, size = pd->active_backup_copies.size;
+ gcptr *items = pd->active_backup_copies.items;
+ for (i = 0; i < size; i += 2)
+ if (items[i] == P)
+ return items[i + 1];
+ return NULL;
}
gcptr stm_get_read_obj(long index)
@@ -568,7 +578,7 @@
}
gcptrlist_clear(&d->list_of_read_objects);
- g2l_clear(&d->private_to_backup);
+ gcptrlist_clear(&d->public_descriptor->active_backup_copies);
abort();//stmgc_abort_transaction(d);
fprintf(stderr,
@@ -578,7 +588,7 @@
"!!!!!!!!!!!!!!!!!!!!! [%lx] abort %d\n"
"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n"
"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n"
- "\n", (long)d->descriptor_index, num);
+ "\n", (long)d->public_descriptor_index, num);
if (num != ABRT_MANUAL && d->max_aborts >= 0 && !d->max_aborts--)
{
fprintf(stderr, "unexpected abort!\n");
@@ -632,7 +642,7 @@
d->start_real_time.tv_nsec = -1;
}
assert(d->list_of_read_objects.size == 0);
- assert(!g2l_any_entry(&d->private_to_backup));
+ assert(d->public_descriptor->active_backup_copies.size == 0);
assert(!g2l_any_entry(&d->public_to_private));
d->count_reads = 1;
@@ -774,7 +784,7 @@
handle_block = (revision_t *)
((((intptr_t)handle_block) + HANDLE_BLOCK_SIZE-1)
& ~(HANDLE_BLOCK_SIZE-1));
- handle_block[0] = d->descriptor_index;
+ handle_block[0] = d->public_descriptor_index;
handle_block[1] = v;
revision_t w = ((revision_t)(handle_block + 1)) + 2;
@@ -826,18 +836,19 @@
void TurnPrivateWithBackupToProtected(struct tx_descriptor *d,
revision_t cur_time)
{
- wlog_t *item;
- G2L_LOOP_FORWARD(d->private_to_backup, item)
+ long i, size = d->public_descriptor->active_backup_copies.size;
+ gcptr *items = d->public_descriptor->active_backup_copies.items;
+
+ for (i = 0; i < size; i += 2)
{
- gcptr P = item->addr;
- gcptr B = item->val;
+ gcptr P = items[i];
+ gcptr B = items[i + 1];
assert(P->h_revision == stm_private_rev_num);
assert(B->h_tid & GCFLAG_BACKUP_COPY);
B->h_revision = cur_time;
P->h_revision = (revision_t)B;
-
- } G2L_LOOP_END;
- g2l_clear(&d->private_to_backup);
+ };
+ gcptrlist_clear(&d->public_descriptor->active_backup_copies);
}
void CommitTransaction(void)
@@ -846,7 +857,7 @@
struct tx_descriptor *d = thread_descriptor;
assert(d->active >= 1);
- spinlock_acquire(d->collection_lock, 'C'); /* committing */
+ spinlock_acquire(d->public_descriptor->collection_lock, 'C'); /*committing*/
AcquireLocks(d);
if (is_inevitable(d))
@@ -868,10 +879,10 @@
if (cur_time & 1)
{ // there is another inevitable transaction
CancelLocks(d);
- spinlock_release(d->collection_lock);
+ spinlock_release(d->public_descriptor->collection_lock);
inev_mutex_acquire(); // wait until released
inev_mutex_release();
- spinlock_acquire(d->collection_lock, 'C');
+ spinlock_acquire(d->public_descriptor->collection_lock, 'C');
AcquireLocks(d);
continue;
}
@@ -907,7 +918,7 @@
UpdateChainHeads(d, cur_time, localrev);
- spinlock_release(d->collection_lock);
+ spinlock_release(d->public_descriptor->collection_lock);
d->num_commits++;
d->active = 0;
stm_stop_sharedlock();
@@ -948,7 +959,8 @@
(XXX statically we should know when we're outside
a transaction) */
- fprintf(stderr, "[%lx] inevitable: %s\n", (long)d->descriptor_index, why);
+ fprintf(stderr, "[%lx] inevitable: %s\n",
+ (long)d->public_descriptor_index, why);
cur_time = acquire_inev_mutex_and_mark_global_cur_time();
if (d->start_time != cur_time)
@@ -1072,8 +1084,8 @@
/************************************************************/
-struct tx_descriptor *stm_descriptor_array[MAX_THREADS] = {0};
-static revision_t descriptor_array_next = 0;
+struct tx_public_descriptor *stm_descriptor_array[MAX_THREADS] = {0};
+static revision_t descriptor_array_free_list = 0;
static revision_t descriptor_array_lock = 0;
int DescriptorInit(void)
@@ -1092,22 +1104,30 @@
memset(d, 0, sizeof(struct tx_descriptor));
spinlock_acquire(descriptor_array_lock, 1);
- i = descriptor_array_next;
- while (stm_descriptor_array[i] != NULL)
- {
- i++;
- if (i == MAX_THREADS)
- i = 0;
- if (i == descriptor_array_next)
- {
+ struct tx_public_descriptor *pd;
+ i = descriptor_array_free_list;
+ pd = stm_descriptor_array[i];
+ if (pd != NULL) {
+ /* we are reusing 'pd' */
+ descriptor_array_free_list = pd->free_list_next;
+ assert(descriptor_array_free_list >= 0);
+ }
+ else {
+ /* no item in the free list */
+ descriptor_array_free_list = i + 1;
+ if (descriptor_array_free_list == MAX_THREADS) {
fprintf(stderr, "error: too many threads at the same time "
"in this process");
abort();
- }
- }
- descriptor_array_next = i;
- stm_descriptor_array[i] = d;
- d->descriptor_index = i;
+ }
+ pd = stm_malloc(sizeof(struct tx_public_descriptor));
+ memset(pd, 0, sizeof(struct tx_public_descriptor));
+ stm_descriptor_array[i] = pd;
+ }
+ pd->free_list_next = -1;
+
+ d->public_descriptor = pd;
+ d->public_descriptor_index = i;
d->my_lock = LOCKED + 2 * i;
assert(d->my_lock & 1);
assert(d->my_lock >= LOCKED);
@@ -1117,7 +1137,7 @@
thread_descriptor = d;
fprintf(stderr, "[%lx] pthread %lx starting\n",
- (long)d->descriptor_index, (long)pthread_self());
+ (long)d->public_descriptor_index, (long)pthread_self());
spinlock_release(descriptor_array_lock);
return 1;
@@ -1128,15 +1148,25 @@
void DescriptorDone(void)
{
+ revision_t i;
struct tx_descriptor *d = thread_descriptor;
assert(d != NULL);
assert(d->active == 0);
- stm_descriptor_array[d->descriptor_index] = NULL;
+ d->public_descriptor->collection_lock = 0; /* unlock */
+
+ spinlock_acquire(descriptor_array_lock, 1);
+ i = d->public_descriptor_index;
+ assert(stm_descriptor_array[i] == d->public_descriptor);
+ d->public_descriptor->free_list_next = descriptor_array_free_list;
+ descriptor_array_free_list = i;
+ spinlock_release(descriptor_array_lock);
+
thread_descriptor = NULL;
g2l_delete(&d->public_to_private);
- g2l_delete(&d->private_to_backup);
+ assert(d->public_descriptor->active_backup_copies.size == 0);
+ gcptrlist_delete(&d->public_descriptor->active_backup_copies);
gcptrlist_delete(&d->list_of_read_objects);
gcptrlist_delete(&d->abortinfo);
free(d->longest_abort_info);
@@ -1145,7 +1175,6 @@
#endif
int num_aborts = 0, num_spinloops = 0;
- int i;
char line[256], *p = line;
for (i=0; i<ABORT_REASONS; i++)
@@ -1154,7 +1183,7 @@
num_spinloops += d->num_spinloops[i];
p += sprintf(p, "[%lx] finishing: %d commits, %d aborts ",
- (long)d->descriptor_index,
+ (long)d->public_descriptor_index,
d->num_commits,
num_aborts);
diff --git a/c4/et.h b/c4/et.h
--- a/c4/et.h
+++ b/c4/et.h
@@ -101,12 +101,25 @@
#define SPLP_LOCKED_COMMIT 3
#define SPINLOOP_REASONS 4
+/* this struct contains thread-local data that may be occasionally
+ * accessed by a foreign thread and that must stay around after the
+ * thread shuts down. It is reused the next time a thread starts. */
+struct tx_public_descriptor {
+ revision_t collection_lock;
+ struct GcPtrList stolen_objects;
+ struct GcPtrList active_backup_copies;
+ revision_t free_list_next;
+ /* xxx gcpage data here */
+};
+
+/* this struct contains all thread-local data that is never accessed
+ * by a foreign thread */
struct tx_descriptor {
+ struct tx_public_descriptor *public_descriptor;
+ revision_t public_descriptor_index;
jmp_buf *setjmp_buf;
revision_t start_time;
- revision_t descriptor_index;
revision_t my_lock;
- revision_t collection_lock;
gcptr *shadowstack;
gcptr **shadowstack_end_ref;
@@ -123,7 +136,6 @@
unsigned int num_spinloops[SPINLOOP_REASONS];
struct GcPtrList list_of_read_objects;
struct GcPtrList abortinfo;
- struct G2L private_to_backup;
struct G2L public_to_private;
char *longest_abort_info;
long long longest_abort_info_time;
@@ -133,43 +145,10 @@
extern __thread struct tx_descriptor *thread_descriptor;
extern __thread revision_t stm_private_rev_num;
-extern struct tx_descriptor *stm_descriptor_array[];
+extern struct tx_public_descriptor *stm_descriptor_array[];
/************************************************************/
-//#define STM_BARRIER_P2R(P)
-// (__builtin_expect((((gcptr)(P))->h_tid & GCFLAG_GLOBAL) == 0, 1) ?
-// (P) : (typeof(P))stm_DirectReadBarrier(P))
-
-//#define STM_BARRIER_G2R(G)
-// (assert(((gcptr)(G))->h_tid & GCFLAG_GLOBAL),
-// (typeof(G))stm_DirectReadBarrier(G))
-
-//#define STM_BARRIER_O2R(O)
-// (__builtin_expect((((gcptr)(O))->h_tid & GCFLAG_POSSIBLY_OUTDATED) == 0,
-// 1) ?
-// (O) : (typeof(O))stm_RepeatReadBarrier(O))
-
-//#define STM_READ_BARRIER_P_FROM_R(P, R_container, offset)
-// (__builtin_expect((((gcptr)(P))->h_tid & GCFLAG_GLOBAL) == 0, 1) ?
-// (P) : (typeof(P))stm_DirectReadBarrierFromR((P),
-// (R_container),
-// offset))
-
-//#define STM_BARRIER_P2W(P)
-// (__builtin_expect((((gcptr)(P))->h_tid & GCFLAG_NOT_WRITTEN) == 0, 1) ?
-// (P) : (typeof(P))stm_WriteBarrier(P))
-
-//#define STM_BARRIER_G2W(G)
-// (assert(((gcptr)(G))->h_tid & GCFLAG_GLOBAL),
-// (typeof(G))stm_WriteBarrier(G))
-
-//#define STM_BARRIER_R2W(R)
-// (__builtin_expect((((gcptr)(R))->h_tid & GCFLAG_NOT_WRITTEN) == 0, 1) ?
-// (R) : (typeof(R))stm_WriteBarrierFromReady(R))
-
-//#define STM_BARRIER_O2W(R) STM_BARRIER_R2W(R) /* same logic works here */
-
void BeginTransaction(jmp_buf *);
void BeginInevitableTransaction(void); /* must save roots around this call */
diff --git a/c4/test/support.py b/c4/test/support.py
--- a/c4/test/support.py
+++ b/c4/test/support.py
@@ -208,7 +208,7 @@
revision_t get_descriptor_index(void)
{
- return thread_descriptor->descriptor_index;
+ return thread_descriptor->public_descriptor_index;
}
/*gcptr *addr_of_thread_local(void)
More information about the pypy-commit
mailing list