[pypy-commit] pypy stm-gc-2: In-progress: trying to get the safe points correctly is hard

arigo noreply at buildbot.pypy.org
Fri Apr 19 11:29:41 CEST 2013


Author: Armin Rigo <arigo at tunes.org>
Branch: stm-gc-2
Changeset: r63499:9a7f6d86069c
Date: 2013-04-19 11:29 +0200
http://bitbucket.org/pypy/pypy/changeset/9a7f6d86069c/

Log:	In-progress: trying to get the safe points correctly is hard

diff --git a/rpython/memory/gc/stmgc.py b/rpython/memory/gc/stmgc.py
--- a/rpython/memory/gc/stmgc.py
+++ b/rpython/memory/gc/stmgc.py
@@ -182,6 +182,10 @@
         #
         self.stm_operations = stm_operations
         self.nursery_size = nursery_size
+        self.major_collection_threshold = 1.82     # xxx
+        self.min_heap_size = r_uint(8 * self.nursery_size)
+        self.real_limit_for_major_gc = self.min_heap_pages
+        self.dyn_limit_for_major_gc = self.real_limit_for_major_gc
         #self.maximum_extra_threshold = 0
         self.sharedarea = stmshared.StmGCSharedArea(self, page_size,
                                                     small_request_threshold)
@@ -238,6 +242,7 @@
         self.remove_from_linked_list(stmtls)
         self.stm_operations.begin_inevitable_transaction()
         stmtls.delete()
+    teardown_thread._dont_inline_ = True
 
     def acquire_global_lock(self):
         if self.ll_global_lock:
@@ -330,9 +335,10 @@
 
 
     def collect(self, gen=1):
-        self.get_tls().local_collection()
-        if gen > 0:
-            debug_print("XXX not implemented: global collect()")
+        if gen <= 0:
+            self.get_tls().local_collection()
+        else:
+            self.major_collection(force=True)
 
     def start_transaction(self):
         self.get_tls().start_transaction()
@@ -401,6 +407,41 @@
 ##        stmtls.set_extra_threshold(reserved_size)
 
     # ----------
+    # major collections
+
+    def maybe_major_collection(self):
+        """Check the memory usage, and maybe do a major GC collection."""
+        if (self.sharedarea.fetch_count_total_bytes() >=
+                self.dyn_limit_for_major_gc):
+            self.major_collection()
+            return True
+        else:
+            return False
+
+    def major_collection(self, force=False):
+        """Do a major collection.  This uses a stop-the-world system."""
+        #
+        # When the present function is called we know we'll do at least
+        # one major GC.  Setting this limit to 0 now will invite other
+        # threads to enter major_collection() soon too.
+        self.dyn_limit_for_major_gc = r_uint(0)
+        #
+        # While still running multithreaded, do a local collection.
+        # This is not strictly needed.
+        self.get_tls().local_collection(run_finalizers=False)
+        #
+        # Now wait until we can acquire the RW lock in exclusive mode.
+        self.stm_operations.start_single_thread()
+        #
+        # At this point all other threads should be blocked or running
+        # external C code
+        if (self.sharedarea.fetch_count_total_bytes() >=
+                self.limit_for_major_collection):
+            xxxxxxx
+        self.stm_operations.stop_single_thread()
+    major_collection._dont_inline_ = True
+
+    # ----------
     # id() and identityhash() support
 
     def id_or_identityhash(self, gcobj, is_hash):
diff --git a/rpython/memory/gc/stmshared.py b/rpython/memory/gc/stmshared.py
--- a/rpython/memory/gc/stmshared.py
+++ b/rpython/memory/gc/stmshared.py
@@ -1,8 +1,8 @@
 from rpython.rtyper.lltypesystem import lltype, llmemory, llarena, rffi
-from rpython.rlib.rarithmetic import LONG_BIT
+from rpython.rlib.rarithmetic import LONG_BIT, r_uint
 from rpython.rlib.objectmodel import free_non_gc_object, we_are_translated
 from rpython.rlib.debug import ll_assert, fatalerror
-from rpython.rlib import rthread
+from rpython.rlib import rthread, atomic_ops
 
 WORD = LONG_BIT // 8
 NULL = llmemory.NULL
@@ -78,10 +78,20 @@
         #
         # Counters for statistics
         self.count_global_pages = 0
+        self.v_count_total_bytes = lltype.malloc(rffi.CArray(lltype.Unsigned),
+                                                 1, flavor='raw',
+                                                 immortal=True, zero=True)
 
     def setup(self):
         pass
 
+    def fetch_count_total_bytes(self):
+        return self.v_count_total_bytes[0]
+
+    def fetch_count_total_bytes_and_add(self, increment):
+        adr = rffi.cast(llmemory.Address, self.v_count_total_bytes)
+        return r_uint(atomic_ops.fetch_and_add(adr, increment))
+
 
 # ------------------------------------------------------------
 
@@ -145,16 +155,18 @@
     def _allocate_new_page(self, size_class):
         """Allocate and return a new page for the given size_class."""
         #
-        result = llarena.arena_malloc(self.sharedarea.page_size, 0)
+        sharedarea = self.sharedarea
+        result = llarena.arena_malloc(sharedarea.page_size, 0)
         if not result:
             fatalerror("FIXME: Out of memory! (should raise MemoryError)")
             return NULL
         if not we_are_translated():
             self._seen_pages.add(result)
         self.count_pages += 1
-        llarena.arena_reserve(result, llmemory.sizeof(PAGE_HEADER))
+        sharedarea.fetch_count_total_bytes_and_add(sharedarea.page_size)
         #
         # Initialize the fields of the resulting page
+        llarena.arena_reserve(result, llmemory.sizeof(PAGE_HEADER))
         page = llmemory.cast_adr_to_ptr(result, PAGE_PTR)
         page.nextpage = self.pages_for_size[size_class]
         self.pages_for_size[size_class] = page
@@ -165,7 +177,7 @@
                   "free_loc_for_size is supposed to contain NULL here")
         self.free_loc_for_size[size_class] = head
         #
-        i = self.sharedarea.nblocks_for_size[size_class]
+        i = sharedarea.nblocks_for_size[size_class]
         nsize = size_class << WORD_POWER_2
         current = head
         while True:
@@ -189,8 +201,13 @@
             size_class = (nsize + WORD_POWER_2 - 1) >> WORD_POWER_2
             return self._malloc_size_class(size_class)
         else:
-            return llarena.arena_malloc(
-                llmemory.raw_malloc_usage(totalsize), 0)
+            count = llmemory.raw_malloc_usage(totalsize)
+            result = llarena.arena_malloc(count, 0)
+            # increment the counter *after* arena_malloc() returned
+            # successfully, otherwise we might increment it of a huge
+            # bogus number
+            self.sharedarea.fetch_count_total_bytes_and_add(count)
+            return result
 
     def malloc_object(self, objsize):
         totalsize = self.gc.gcheaderbuilder.size_gc_header + objsize
@@ -212,6 +229,10 @@
             size_class = (totalsize + WORD_POWER_2 - 1) >> WORD_POWER_2
             self._free_size_class(adr1, size_class)
         else:
+            # decrement the counter *before* we free the memory,
+            # otherwise there could in theory be a race condition that
+            # ends up overflowing the counter
+            self.sharedarea.fetch_count_total_bytes_and_add(-totalsize)
             llarena.arena_free(llarena.getfakearenaaddress(adr1))
 
     def free_and_clear(self):
diff --git a/rpython/memory/gc/stmtls.py b/rpython/memory/gc/stmtls.py
--- a/rpython/memory/gc/stmtls.py
+++ b/rpython/memory/gc/stmtls.py
@@ -166,6 +166,7 @@
         nursery and track which objects are still alive now, and
         then mark all these objects as global."""
         self.local_collection(end_of_transaction=True)
+        self.gc.maybe_major_collection()
         if not self.local_nursery_is_empty():
             self.local_collection(end_of_transaction=True,
                                   run_finalizers=False)
@@ -273,10 +274,8 @@
         if llmemory.raw_malloc_usage(size) > self.nursery_size // 8 * 7:
             fatalerror("XXX object too large to ever fit in the nursery")
         #
-        self.local_collection(run_finalizers=True)
-        #
-        # call this here in case another thread is waiting for a global GC
-        self.stm_operations.should_break_transaction()
+        if not self.gc.maybe_major_collection():
+            self.local_collection(run_finalizers=True)
         #
         # if we have now enough space, return it
         free = self.nursery_free
diff --git a/rpython/memory/gctransform/stmframework.py b/rpython/memory/gctransform/stmframework.py
--- a/rpython/memory/gctransform/stmframework.py
+++ b/rpython/memory/gctransform/stmframework.py
@@ -163,14 +163,13 @@
         base = self.stackgcdata.root_stack_base
         llmemory.raw_free(base)
 
-    def walk_stack_roots(self, collect_stack_root):
+    def walk_roots(self, *args):
+        "NOT_RPYTHON"
         raise NotImplementedError
-        # XXX only to walk the main thread's shadow stack, so far
-        stackgcdata = self.stackgcdata
-        if self.gcdata.main_thread_stack_base != stackgcdata.root_stack_base:
-            fatalerror_notb("XXX not implemented: walk_stack_roots in thread")
-        self.rootstackhook(collect_stack_root, self.gcdata.gc,
-                           stackgcdata.root_stack_top)
+
+    def walk_stack_roots(self, *args):
+        "NOT_RPYTHON"
+        raise NotImplementedError
 
     @specialize.argtype(2)
     def walk_current_stack_roots(self, collect_stack_root, arg):
diff --git a/rpython/translator/stm/jitdriver.py b/rpython/translator/stm/jitdriver.py
--- a/rpython/translator/stm/jitdriver.py
+++ b/rpython/translator/stm/jitdriver.py
@@ -159,7 +159,7 @@
                 raise cast_base_ptr_to_instance(Exception, p.got_exception)
             return p.result_value
 """     % (', '.join(['a%d' % i for i in irange]),
-           '\n            '.join(['p.a%d = a%d' % (i, i) for i in irange]))
+           '; '.join(['p.a%d = a%d' % (i, i) for i in irange]))
         d = {'CONTAINER': CONTAINER,
              'lltype': lltype,
              'perform_transaction': perform_transaction,
diff --git a/rpython/translator/stm/src_stm/et.h b/rpython/translator/stm/src_stm/et.h
--- a/rpython/translator/stm/src_stm/et.h
+++ b/rpython/translator/stm/src_stm/et.h
@@ -87,7 +87,6 @@
 //int _FakeReach(gcptr);
 void CommitTransaction(void);
 void BecomeInevitable(const char *why);
-//void BeginInevitableTransaction(void);
 int DescriptorInit(void);
 void DescriptorDone(void);
 
diff --git a/rpython/translator/stm/src_stm/rpyintf.c b/rpython/translator/stm/src_stm/rpyintf.c
--- a/rpython/translator/stm/src_stm/rpyintf.c
+++ b/rpython/translator/stm/src_stm/rpyintf.c
@@ -63,8 +63,9 @@
 }
 
 static unsigned long stm_regular_length_limit = ULONG_MAX;
-static volatile int break_please = 0;
 
+/* sync_required is either 0 or 0xffffffff */
+static volatile unsigned long sync_required = 0;
 static void reached_safe_point(void);
 
 void stm_add_atomic(long delta)
@@ -86,9 +87,10 @@
 
   /* a single comparison to handle all cases:
 
+     - first, if sync_required == 0xffffffff, this should return True.
+
      - if d->atomic, then we should return False.  This is done by
-       forcing reads_size_limit to ULONG_MAX as soon as atomic > 0,
-       and no possible value of 'count_reads' is greater than ULONG_MAX.
+       forcing reads_size_limit to ULONG_MAX as soon as atomic > 0.
 
      - otherwise, if is_inevitable(), then we should return True.
        This is done by forcing both reads_size_limit and
@@ -109,10 +111,7 @@
     assert(d->reads_size_limit_nonatomic == 0);
 #endif
 
-  if (break_please)
-    reached_safe_point();
-
-  return d->count_reads > d->reads_size_limit;
+  return (sync_required | d->count_reads) >= d->reads_size_limit;
 }
 
 void stm_set_transaction_length(long length_max)
@@ -178,9 +177,14 @@
       }
       if (!d->atomic)
         BeginTransaction(&_jmpbuf);
-
-      if (break_please)
-        reached_safe_point();
+      else
+        {
+          /* atomic transaction: a common case is that callback() returned
+             even though we are atomic because we need a major GC.  For
+             that case, release and require the rw lock here. */
+          if (sync_required)
+            reached_safe_point();
+        }
 
       /* invoke the callback in the new transaction */
       result = callback(arg, counter);
@@ -207,14 +211,16 @@
 {
   /* Called by the GC, just after a minor collection, when we need to do
      a major collection.  When it returns, it acquired the "write lock"
-     which prevents any other thread from running a transaction. */
+     which prevents any other thread from running in a transaction.
+     Warning, may block waiting for rwlock_in_transaction while another
+     thread runs a major GC itself! */
   int err;
-  break_please = 1;
+  sync_required = (unsigned long)-1;
   err = pthread_rwlock_unlock(&rwlock_in_transaction);
   assert(err == 0);
   err = pthread_rwlock_wrlock(&rwlock_in_transaction);
   assert(err == 0);
-  break_please = 0;
+  sync_required = 0;
 
   assert(in_single_thread == NULL);
   in_single_thread = thread_descriptor;
@@ -223,6 +229,8 @@
 
 void stm_stop_single_thread(void)
 {
+  /* Warning, may block waiting for rwlock_in_transaction while another
+     thread runs a major GC */
   int err;
 
   assert(in_single_thread == thread_descriptor);
@@ -236,6 +244,10 @@
 
 static void reached_safe_point(void)
 {
+  /* Warning: all places that call this function from RPython code
+     must do so with a llop with canmallocgc=True!  The release of
+     the rwlock_in_transaction below means a major GC could run in
+     another thread! */
   int err;
   struct tx_descriptor *d = thread_descriptor;
   assert(in_single_thread != d);


More information about the pypy-commit mailing list