[pypy-commit] pypy concurrent-marksweep: Fix enough to have the simplest test pass again.

arigo noreply at buildbot.pypy.org
Sun Oct 23 11:46:28 CEST 2011


Author: Armin Rigo <arigo at tunes.org>
Branch: concurrent-marksweep
Changeset: r48355:174ecd921aae
Date: 2011-10-23 11:45 +0200
http://bitbucket.org/pypy/pypy/changeset/174ecd921aae/

Log:	Fix enough to have the simplest test pass again. For now, kill the
	logic of pages, because it's a mess to reason both about it and
	about concurrentgen.txt at the same time. All objects are just
	malloc()ed.

diff --git a/pypy/rpython/lltypesystem/llarena.py b/pypy/rpython/lltypesystem/llarena.py
--- a/pypy/rpython/lltypesystem/llarena.py
+++ b/pypy/rpython/lltypesystem/llarena.py
@@ -551,6 +551,8 @@
     has_protect = False
 
 
+# note: the concurrent GCs do mallocs in one thread and frees in another,
+# so make sure these point to a thread-safe implementation
 llimpl_malloc = rffi.llexternal('malloc', [lltype.Signed], llmemory.Address,
                                 sandboxsafe=True, _nowrapper=True)
 llimpl_free = rffi.llexternal('free', [llmemory.Address], lltype.Void,
diff --git a/pypy/rpython/memory/gc/concurrentgen.py b/pypy/rpython/memory/gc/concurrentgen.py
--- a/pypy/rpython/memory/gc/concurrentgen.py
+++ b/pypy/rpython/memory/gc/concurrentgen.py
@@ -9,6 +9,7 @@
 from pypy.rlib.rarithmetic import ovfcheck, LONG_BIT, r_uint
 from pypy.rpython.memory.gc.base import GCBase
 from pypy.rpython.memory import gctypelayout
+from pypy.rpython.memory.support import get_address_stack
 from pypy.module.thread import ll_thread
 
 #
@@ -18,7 +19,8 @@
 # See concurrentgen.txt for some details.
 #
 # Major collections are serialized for the mark phase, but the sweep
-# phase can be parallelized again.  XXX not done so far
+# phase can be parallelized again.  XXX not done so far, YYY investigate
+# also completely parallelizing them too
 #
 # Based on observations that the timing of collections with "minimark"
 # (on translate.py) is: about 15% of the time in minor collections
@@ -59,8 +61,10 @@
     malloc_zero_filled = True
     gcflag_extra = FL_EXTRA
 
-    HDR = lltype.Struct('header', ('tid', lltype.Signed))
-    HDRPTR = lltype.Ptr(HDR)
+    HDRPTR = lltype.Ptr(lltype.ForwardReference())
+    HDR = lltype.Struct('header', ('tid', lltype.Signed),
+                                  ('next', HDRPTR))   # <-- kill me later
+    HDRPTR.TO.become(HDR)
     HDRSIZE = llmemory.sizeof(HDR)
     NULL = lltype.nullptr(HDR)
     typeid_is_in_field = 'tid', llgroup.HALFSHIFT
@@ -68,63 +72,14 @@
     # ^^^ prebuilt objects may have the flag FL_WITHHASH;
     #     then they are one word longer, the extra word storing the hash.
 
-    TRANSLATION_PARAMS = {'page_size': 16384,
-                          'small_request_threshold': 35*WORD,
-                          }
+    TRANSLATION_PARAMS = {}
 
-    def __init__(self, config, page_size=64, small_request_threshold=24,
-                 **kwds):
-        # 'small_request_threshold' is the largest size that we will
-        # satisfy using our own pages mecanism.  Larger requests just
-        # go to the system malloc().
-        self.addressstack_lock_object = SyncLock()
-        kwds['lock'] = self.addressstack_lock_object
+    def __init__(self, config, **kwds):
         GCBase.__init__(self, config, **kwds)
-        assert small_request_threshold % WORD == 0
-        self.small_request_threshold = small_request_threshold
-        self.page_size = page_size
-        self.pagelists_length = small_request_threshold // WORD + 1
+        self.main_thread_ident = ll_thread.get_ident()
         #
-        # The following are arrays of 36 linked lists: the linked lists
-        # at indices 1 to 35 correspond to pages that store objects of
-        # size  1 * WORD  to  35 * WORD.  The linked list at index 0
-        # contains the pages of objects that are larger than this limit
-        # (variable-sized pages, with exactly one object per page).
-        #
-        def list_of_addresses_per_small_size():
-            return lltype.malloc(rffi.CArray(self.HDRPTR),
-                                 self.pagelists_length, flavor='raw',
-                                 immortal=True)
-        #
-        # pages that contain at least one new young object
-        self.new_young_objects_pages = list_of_addresses_per_small_size()
-        #
-        # when the collection starts and we make all young objects aging,
-        # we move the linked lists above into 'aging_objects_pages'
-        self.aging_objects_pages = list_of_addresses_per_small_size()
-        #
-        # free list of non-allocated locations within pages
-        # (at index 0: always empty)
-        self.location_free_lists = list_of_addresses_per_small_size()
-        #
-        # head and tail of the free list of locations built by the
-        # collector thread
-        self.collect_loc_heads = list_of_addresses_per_small_size()
-        self.collect_loc_tails = list_of_addresses_per_small_size()
-        #
-        def collector_start():
-            if we_are_translated():
-                self.collector_run()
-            else:
-                self.collector_run_nontranslated()
-        #
-        collector_start._should_never_raise_ = True
-        self.collector_start = collector_start
-        #
-        self.gray_objects = self.AddressStack()
-        self.extra_objects_to_mark = self.AddressStack()
-        self.flagged_objects = self.AddressStack()
-        self.prebuilt_root_objects = self.AddressStack()
+        # Create the CollectorThread object
+        self.collector = CollectorThread(self)
         #
         self._initialize()
         #
@@ -133,62 +88,31 @@
         # that was not scanned yet.
         self._init_writebarrier_logic()
 
-    def _clear_list(self, array):
-        i = 0
-        while i < self.pagelists_length:
-            array[i] = self.NULL
-            i += 1
-
     def _initialize(self):
-        self.free_pages = self.NULL
+        # Initialize the GC.  In normal translated program, this function
+        # is not translated but just called from __init__ ahead of time.
+        # During test_transformed_gc, it is translated, so that we can
+        # quickly reset the GC between tests.
         #
-        # Clear the lists
-        self._clear_list(self.nonfree_pages)
-        self._clear_list(self.collect_pages)
-        self._clear_list(self.free_lists)
-        self._clear_list(self.collect_heads)
-        self._clear_list(self.collect_tails)
+        self.extra_objects_to_mark = self.AddressStack()
+        self.flagged_objects = self.AddressStack()
+        self.prebuilt_root_objects = self.AddressStack()
         #
-        self.finalizer_pages = self.NULL
-        self.collect_finalizer_pages = self.NULL
-        self.collect_finalizer_tails = self.NULL
-        self.collect_run_finalizers_head = self.NULL
-        self.collect_run_finalizers_tail = self.NULL
-        self.objects_with_finalizers_to_run = self.NULL
-        #
-        self.weakref_pages = self.NULL
-        self.collect_weakref_pages = self.NULL
-        self.collect_weakref_tails = self.NULL
+        # the linked list of new young objects, and the linked list of
+        # all old objects.  note that the aging objects are not here
+        # but on 'collector.aging_objects'.
+        self.young_objects = self.NULL
+        self.old_objects = self.NULL
         #
         # See concurrentgen.txt for more information about these fields.
         self.current_young_marker = MARK_BYTE_1
         self.current_aging_marker = MARK_BYTE_2
         #
-        # When the mutator thread wants to trigger the next collection,
-        # it scans its own stack roots and prepares everything, then
-        # sets 'collection_running' to 1, and releases
-        # 'ready_to_start_lock'.  This triggers the collector thread,
-        # which re-acquires 'ready_to_start_lock' and does its job.
-        # When done it releases 'finished_lock'.  The mutator thread is
-        # responsible for resetting 'collection_running' to 0.
-        #
-        # The collector thread's state can be found (with careful locking)
-        # by inspecting the same variable from the mutator thread:
-        #   * collection_running == 1: Marking.  [Deletion barrier active.]
-        #   * collection_running == 2: Clearing weakrefs.
-        #   * collection_running == 3: Marking from unreachable finalizers.
-        #   * collection_running == 4: Sweeping.
-        #   * collection_running == -1: Done.
-        # The mutex_lock is acquired to go from 1 to 2, and from 2 to 3.
-        self.collection_running = 0
         #self.ready_to_start_lock = ...built in setup()
         #self.finished_lock = ...built in setup()
+        #self.mutex_lock = ...built in setup()
         #
-        #self.mutex_lock = ...built in setup()
-        self.gray_objects.clear()
-        self.extra_objects_to_mark.clear()
-        self.flagged_objects.clear()
-        self.prebuilt_root_objects.clear()
+        self.collector._initialize()
 
     def setup(self):
         "Start the concurrent collector thread."
@@ -196,26 +120,22 @@
         # 'run_finalizers' as a deque
         self.finalizer_lock_count = 0
         #
-        self.main_thread_ident = ll_thread.get_ident()
         self.ready_to_start_lock = ll_thread.allocate_ll_lock()
         self.finished_lock = ll_thread.allocate_ll_lock()
         self.mutex_lock = ll_thread.allocate_ll_lock()
-        self.addressstack_lock_object.setup()
         #
         self.acquire(self.finished_lock)
         self.acquire(self.ready_to_start_lock)
         #
-        self.collector_ident = ll_thread.c_thread_start_nowrapper(
-            llhelper(ll_thread.CALLBACK, self.collector_start))
-        assert self.collector_ident != -1
+        self.collector.setup()
 
     def _teardown(self):
         "Stop the collector thread after tests have run."
         self.wait_for_the_end_of_collection()
         #
-        # start the next collection, but with collection_running set to 42,
+        # start the next collection, but with collector.running set to 42,
         # which should shut down the collector thread
-        self.collection_running = 42
+        self.collector.running = 42
         debug_print("teardown!")
         self.release(self.ready_to_start_lock)
         self.acquire(self.finished_lock)
@@ -235,7 +155,11 @@
 
     def malloc_fixedsize_clear(self, typeid, size,
                                needs_finalizer=False, contains_weakptr=False):
-        # contains_weakptr: detected during collection
+        #
+        # For now, we always start the next collection as soon as the
+        # previous one is finished
+        if self.collector.running <= 0:
+            self.trigger_next_collection()
         #
         # Case of finalizers (test constant-folded)
         if needs_finalizer:
@@ -250,22 +174,14 @@
         # Regular case
         size_gc_header = self.gcheaderbuilder.size_gc_header
         totalsize = size_gc_header + size
-        rawtotalsize = raw_malloc_usage(totalsize)
-        if rawtotalsize <= self.small_request_threshold:
-            ll_assert(rawtotalsize & (WORD - 1) == 0,
-                      "fixedsize not properly rounded")
-            #
-            n = rawtotalsize >> WORD_POWER_2
-            result = self.free_lists[n]
-            if result != self.NULL:
-                self.free_lists[n] = list_next(result)
-                obj = self.grow_reservation(result, totalsize)
-                hdr = self.header(obj)
-                hdr.tid = self.combine(typeid, self.current_young_marker, 0)
-                #debug_print("malloc_fixedsize_clear", obj)
-                return llmemory.cast_adr_to_ptr(obj, llmemory.GCREF)
-                #
-        return self._malloc_slowpath(typeid, size)
+        adr = llarena.arena_malloc(llmemory.raw_malloc_usage(totalsize), 2)
+        llarena.arena_reserve(adr, totalsize)
+        obj = adr + size_gc_header
+        hdr = self.header(obj)
+        hdr.tid = self.combine(typeid, self.current_young_marker, 0)
+        hdr.next = self.young_objects
+        self.young_objects = hdr
+        return llmemory.cast_adr_to_ptr(obj, llmemory.GCREF)
 
     def malloc_varsize_clear(self, typeid, length, size, itemsize,
                              offset_to_length):
@@ -295,9 +211,9 @@
                       "round_up_for_allocation failed")
             #
             n = rawtotalsize >> WORD_POWER_2
-            result = self.free_lists[n]
+            result = self.location_free_lists[n]
             if result != self.NULL:
-                self.free_lists[n] = list_next(result)
+                self.location_free_lists[n] = list_next(result)
                 obj = self.grow_reservation(result, totalsize)
                 hdr = self.header(obj)
                 hdr.tid = self.combine(typeid, self.current_young_marker, 0)
@@ -311,147 +227,6 @@
         # is actually negative.
         return self._malloc_varsize_slowpath(typeid, length)
 
-    def _malloc_slowpath(self, typeid, size):
-        # Slow-path malloc.  Call this with 'size' being a valid and
-        # rounded number, between WORD and up to MAXIMUM_SIZE.
-        #
-        # For now, we always start the next collection immediately.
-        if self.collection_running <= 0:
-            self.trigger_next_collection()
-        #
-        size_gc_header = self.gcheaderbuilder.size_gc_header
-        totalsize = size_gc_header + size
-        rawtotalsize = raw_malloc_usage(totalsize)
-        #
-        if rawtotalsize <= self.small_request_threshold:
-            #
-            # Case 1: unless trigger_next_collection() happened to get us
-            # more locations in free_lists[n], we have run out of them
-            ll_assert(rawtotalsize & (WORD - 1) == 0,
-                      "malloc_slowpath: non-rounded size")
-            n = rawtotalsize >> WORD_POWER_2
-            head = self.free_lists[n]
-            if head:
-                self.free_lists[n] = list_next(head)
-                obj = self.grow_reservation(head, totalsize)
-                hdr = self.header(obj)
-                hdr.tid = self.combine(typeid, self.current_young_marker, 0)
-                return llmemory.cast_adr_to_ptr(obj, llmemory.GCREF)
-            #
-            # We really have run out of the free list corresponding to
-            # the size.  Grab the next free page.
-            newpage = self.free_pages
-            if newpage == self.NULL:
-                self.allocate_next_arena()
-                newpage = self.free_pages
-            self.free_pages = list_next(newpage)
-            #
-            # Put the free page in the list 'nonfree_pages[n]'.  This is
-            # a linked list chained through the first word of each page.
-            set_next(newpage, self.nonfree_pages[n])
-            self.nonfree_pages[n] = newpage
-            #
-            # Initialize the free page to contain objects of the given
-            # size.  This requires setting up all object locations in the
-            # page, linking them in the free list.
-            i = self.page_size - rawtotalsize
-            limit = rawtotalsize + raw_malloc_usage(self.HDRSIZE)
-            newpageadr = llmemory.cast_ptr_to_adr(newpage)
-            newpageadr = llarena.getfakearenaaddress(newpageadr)
-            while i >= limit:
-                adr = newpageadr + i
-                llarena.arena_reserve(adr, self.HDRSIZE)
-                p = llmemory.cast_adr_to_ptr(adr, self.HDRPTR)
-                set_next(p, head)
-                head = p
-                i -= rawtotalsize
-            self.free_lists[n] = head
-            result = newpageadr + i
-            #
-            # Done: all object locations are linked, apart from
-            # 'result', which is the first object location in the page.
-            # Note that if the size is not an exact divisor of
-            # 4096-WORD, there are a few wasted WORDs, which we place at
-            # the start of the page rather than at the end (Hans Boehm,
-            # xxx ref).
-            #
-            return self._malloc_result(typeid, totalsize, result)
-        else:
-            # Case 2: the object is too large, so allocate it directly
-            # with the system malloc().
-            return self._malloc_large_object(typeid, size, 0)
-        #
-    _malloc_slowpath._dont_inline_ = True
-
-    def _malloc_result(self, typeid, totalsize, result):
-        llarena.arena_reserve(result, totalsize)
-        hdr = llmemory.cast_adr_to_ptr(result, self.HDRPTR)
-        hdr.tid = self.combine(typeid, self.current_young_marker, 0)
-        obj = result + self.gcheaderbuilder.size_gc_header
-        #debug_print("malloc_slowpath", obj)
-        return llmemory.cast_adr_to_ptr(obj, llmemory.GCREF)
-
-    def _malloc_large_object(self, typeid, size, linked_list):
-        # xxx on 32-bit, we'll prefer 64-bit alignment of the object by
-        # always allocating an 8-bytes header
-        totalsize = self.gcheaderbuilder.size_gc_header + size
-        rawtotalsize = raw_malloc_usage(totalsize)
-        rawtotalsize += 8
-        block = llarena.arena_malloc(rawtotalsize, 2)
-        if not block:
-            raise MemoryError
-        llarena.arena_reserve(block, self.HDRSIZE)
-        blockhdr = llmemory.cast_adr_to_ptr(block, self.HDRPTR)
-        if linked_list == 0:
-            set_next(blockhdr, self.nonfree_pages[0])
-            self.nonfree_pages[0] = blockhdr
-        elif linked_list == 1:
-            set_next(blockhdr, self.finalizer_pages)
-            self.finalizer_pages = blockhdr
-        elif linked_list == 2:
-            set_next(blockhdr, self.weakref_pages)
-            self.weakref_pages = blockhdr
-        else:
-            ll_assert(0, "bad linked_list")
-        return self._malloc_result(typeid, totalsize, block + 8)
-    _malloc_large_object._annspecialcase_ = 'specialize:arg(3)'
-    _malloc_large_object._dont_inline_ = True
-
-    def _malloc_varsize_slowpath(self, typeid, length):
-        #
-        if length < 0:
-            # negative length!  This likely comes from an overflow
-            # earlier.  We will just raise MemoryError here.
-            raise MemoryError
-        #
-        # Compute the total size, carefully checking for overflows.
-        nonvarsize = self.fixed_size(typeid)
-        itemsize = self.varsize_item_sizes(typeid)
-        try:
-            varsize = ovfcheck(itemsize * length)
-            totalsize = ovfcheck(nonvarsize + varsize)
-        except OverflowError:
-            raise MemoryError
-        #
-        # Detect very rare cases of overflows
-        if raw_malloc_usage(totalsize) > MAXIMUM_SIZE:
-            raise MemoryError("rare case of overflow")
-        #
-        totalsize = llarena.round_up_for_allocation(totalsize)
-        result = self._malloc_slowpath(typeid, totalsize)
-        #
-        offset_to_length = self.varsize_offset_to_length(typeid)
-        obj = llmemory.cast_ptr_to_adr(result)
-        (obj + offset_to_length).signed[0] = length
-        return result
-    _malloc_varsize_slowpath._dont_inline_ = True
-
-    def _malloc_with_finalizer(self, typeid, size):
-        return self._malloc_large_object(typeid, size, 1)
-
-    def _malloc_weakref(self, typeid, size):
-        return self._malloc_large_object(typeid, size, 2)
-
     # ----------
     # Other functions in the GC API
 
@@ -545,17 +320,17 @@
                     # it is only possible to reach this point if there is
                     # a collection running in collector_mark(), before it
                     # does mutex_lock itself.  Check this:
-                    ll_assert(self.collection_running == 1,
+                    ll_assert(self.collector.running == 1,
                               "write barrier: wrong call?")
                     #
                     # It's fine to set the mark before tracing, because
                     # we are anyway in a 'mutex_lock' critical section.
                     # The collector thread will not exit from the phase
-                    # 'collection_running == 1' here.
+                    # 'collector.running == 1' here.
                     self.trace(obj, self._barrier_add_extra, None)
                     #
                     # Still at 1:
-                    ll_assert(self.collection_running == 1,
+                    ll_assert(self.collector.running == 1,
                               "write barrier: oups!?")
                     #
                 else:
@@ -582,50 +357,19 @@
     def wait_for_the_end_of_collection(self):
         """In the mutator thread: wait for the minor collection currently
         running (if any) to finish."""
-        if self.collection_running != 0:
+        if self.collector.running != 0:
             debug_start("gc-stop")
             #
             self.acquire(self.finished_lock)
-            self.collection_running = 0
-            #debug_print("collection_running = 0")
+            self.collector.running = 0
+            #debug_print("collector.running = 0")
             #
             # Check invariants
             ll_assert(not self.extra_objects_to_mark.non_empty(),
                       "objs left behind in extra_objects_to_mark")
-            ll_assert(not self.gray_objects.non_empty(),
+            ll_assert(not self.collector.gray_objects.non_empty(),
                       "objs left behind in gray_objects")
             #
-            # Grab the results of the last collection: read the collector's
-            # 'collect_heads/collect_tails' and merge them with the mutator's
-            # 'free_lists'.
-            n = 1
-            while n < self.pagelists_length:
-                self.free_lists[n] = self.join_lists(self.free_lists[n],
-                                                     self.collect_heads[n],
-                                                     self.collect_tails[n])
-                n += 1
-            #
-            # Do the same with 'collect_heads[0]/collect_tails[0]'.
-            self.nonfree_pages[0] = self.join_lists(self.nonfree_pages[0],
-                                                    self.collect_heads[0],
-                                                    self.collect_tails[0])
-            #
-            # Do the same with 'collect_weakref_pages/tails'
-            self.weakref_pages = self.join_lists(self.weakref_pages,
-                                                 self.collect_weakref_pages,
-                                                 self.collect_weakref_tails)
-            #
-            # Do the same with 'collect_finalizer_pages/tails'
-            self.finalizer_pages = self.join_lists(self.finalizer_pages,
-                                                  self.collect_finalizer_pages,
-                                                  self.collect_finalizer_tails)
-            #
-            # Do the same with 'collect_run_finalizers_head/tail'
-            self.objects_with_finalizers_to_run = self.join_lists(
-                self.objects_with_finalizers_to_run,
-                self.collect_run_finalizers_head,
-                self.collect_run_finalizers_tail)
-            #
             if self.DEBUG:
                 self.debug_check_lists()
             #
@@ -635,7 +379,7 @@
             # can start the next collection, and then this function returns
             # with a collection in progress, which it should not.  Be careful
             # to call execute_finalizers_ll() in the caller somewhere.
-            ll_assert(self.collection_running == 0,
+            ll_assert(self.collector.running == 0,
                       "collector thread not paused?")
 
     def join_lists(self, list1, list2head, list2tail):
@@ -649,6 +393,7 @@
 
 
     def execute_finalizers_ll(self):
+        return # XXX
         self.finalizer_lock_count += 1
         try:
             while self.objects_with_finalizers_to_run != self.NULL:
@@ -689,7 +434,7 @@
            The most useful default.
            gen>=4: Do a full synchronous major collection.
         """
-        if gen >= 1 or self.collection_running <= 0:
+        if gen >= 1 or self.collector.running <= 0:
             self.trigger_next_collection(gen >= 3)
             if gen >= 2:
                 self.wait_for_the_end_of_collection()
@@ -710,15 +455,15 @@
             None)                             # static in prebuilt gc
         #
         # Add the objects still waiting in 'objects_with_finalizers_to_run'
-        p = self.objects_with_finalizers_to_run
-        while p != self.NULL:
-            x = llmemory.cast_ptr_to_adr(p)
-            x = llarena.getfakearenaaddress(x) + 8
-            obj = x + self.gcheaderbuilder.size_gc_header
-            #debug_print("_objects_with_finalizers_to_run", obj)
-            self.get_mark(obj)
-            self.gray_objects.append(obj)
-            p = list_next(p)
+        #p = self.objects_with_finalizers_to_run
+        #while p != self.NULL:
+        #    x = llmemory.cast_ptr_to_adr(p)
+        #    x = llarena.getfakearenaaddress(x) + 8
+        #    obj = x + self.gcheaderbuilder.size_gc_header
+        #    #debug_print("_objects_with_finalizers_to_run", obj)
+        #    self.get_mark(obj)
+        #    self.gray_objects.append(obj)
+        #    p = list_next(p)
         #
         # Add all old objects that have been written to since the last
         # time trigger_next_collection was called
@@ -732,27 +477,16 @@
         self.current_young_marker = self.current_aging_marker
         self.current_aging_marker = other
         #
-        # Copy a few 'mutator' fields to 'collector' fields:  the
-        # 'collect_pages[n]' make linked lists of all nonfree pages at the
-        # start of the collection (unlike the 'nonfree_pages' lists, which
-        # the mutator will continue to grow).
-        n = 0
-        while n < self.pagelists_length:
-            self.collect_pages[n] = self.nonfree_pages[n]
-            n += 1
-        self.collect_weakref_pages = self.weakref_pages
-        self.collect_finalizer_pages = self.finalizer_pages
-        #
-        # Clear the following lists.  When the collector thread finishes,
-        # it will give back (in collect_{pages,tails}[0] and
-        # collect_finalizer_{pages,tails}) all the original items that survive.
-        self.nonfree_pages[0] = self.NULL
-        self.weakref_pages = self.NULL
-        self.finalizer_pages = self.NULL
+        # Copy a few 'mutator' fields to 'collector' fields
+        collector = self.collector
+        collector.aging_objects = self.young_objects
+        self.young_objects = self.NULL
+        #self.collect_weakref_pages = self.weakref_pages
+        #self.collect_finalizer_pages = self.finalizer_pages
         #
         # Start the collector thread
-        self.collection_running = 1
-        #debug_print("collection_running = 1")
+        self.collector.running = 1
+        #debug_print("collector.running = 1")
         self.release(self.ready_to_start_lock)
         #
         debug_stop("gc-start")
@@ -763,33 +497,29 @@
         obj = root.address[0]
         #debug_print("_add_stack_root", obj)
         self.get_mark(obj)
-        self.gray_objects.append(obj)
+        self.collector.gray_objects.append(obj)
 
     def _add_prebuilt_root(self, obj, ignored):
+        # NB. it's ok to edit 'gray_objects' from the mutator thread here,
+        # because the collector thread is not running yet
         #debug_print("_add_prebuilt_root", obj)
         self.get_mark(obj)
-        self.gray_objects.append(obj)
+        self.collector.gray_objects.append(obj)
 
     def debug_check_lists(self):
         # just check that they are correct, non-infinite linked lists
-        self.debug_check_list(self.nonfree_pages[0])
-        n = 1
-        while n < self.pagelists_length:
-            self.debug_check_list(self.free_lists[n])
-            n += 1
-        self.debug_check_list(self.weakref_pages)
-        self.debug_check_list(self.finalizer_pages)
-        self.debug_check_list(self.objects_with_finalizers_to_run)
+        self.debug_check_list(self.young_objects)
+        self.debug_check_list(self.old_objects)
 
-    def debug_check_list(self, page):
+    def debug_check_list(self, list):
         try:
-            previous_page = self.NULL
+            previous = self.NULL
             count = 0
-            while page != self.NULL:
+            while list != self.NULL:
                 # prevent constant-folding, and detects loops of length 1
-                ll_assert(page != previous_page, "loop!")
-                previous_page = page
-                page = list_next(page)
+                ll_assert(list != previous, "loop!")
+                previous = list
+                list = list.next
                 count += 1
             return count
         except KeyboardInterrupt:
@@ -797,24 +527,151 @@
             raise
 
     def acquire(self, lock):
-        if (we_are_translated() or
-                ll_thread.get_ident() != self.main_thread_ident):
+        if we_are_translated():
             ll_thread.c_thread_acquirelock(lock, 1)
         else:
+            assert ll_thread.get_ident() == self.main_thread_ident
             while rffi.cast(lltype.Signed,
                             ll_thread.c_thread_acquirelock(lock, 0)) == 0:
                 time.sleep(0.05)
                 # ---------- EXCEPTION FROM THE COLLECTOR THREAD ----------
-                if hasattr(self, '_exc_info'):
+                if hasattr(self.collector, '_exc_info'):
                     self._reraise_from_collector_thread()
 
     def release(self, lock):
         ll_thread.c_thread_releaselock(lock)
 
     def _reraise_from_collector_thread(self):
-        exc, val, tb = self._exc_info
+        exc, val, tb = self.collector._exc_info
         raise exc, val, tb
 
+    def set_mark(self, obj, newmark):
+        _set_mark(self.header(obj), newmark)
+
+    def get_mark(self, obj):
+        mark = self.header(obj).tid & 0xFF
+        ll_assert(mark == MARK_BYTE_1 or
+                  mark == MARK_BYTE_2 or
+                  mark == MARK_BYTE_OLD or
+                  mark == MARK_BYTE_STATIC, "bad mark byte in object")
+        return mark
+
+    # ----------
+    # Weakrefs
+
+    def weakref_deref(self, wrobj):
+        # Weakrefs need some care.  This code acts as a read barrier.
+        # The only way I found is to acquire the mutex_lock to prevent
+        # the collection thread from going from collector.running==1
+        # to collector.running==2, or from collector.running==2 to
+        # collector.running==3.
+        #
+        self.acquire(self.mutex_lock)
+        #
+        targetobj = gctypelayout.ll_weakref_deref(wrobj)
+        if targetobj != llmemory.NULL:
+            #
+            if self.collector.running == 1:
+                # If we are in the phase collector.running==1, we don't
+                # know if the object will be scanned a bit later or
+                # not; so we have to assume that it survives, and
+                # force it to be scanned.
+                self.get_mark(targetobj)
+                self.extra_objects_to_mark.append(targetobj)
+                #
+            elif self.collector.running == 2:
+                # In the phase collector.running==2, if the object is
+                # not marked it's too late; we have to detect that case
+                # and return NULL instead here, as if the corresponding
+                # collector phase was already finished (deal_with_weakrefs).
+                # Otherwise we would be returning an object that is about to
+                # be swept away.
+                if not self.is_marked_or_static(targetobj, self.current_mark):
+                    targetobj = llmemory.NULL
+                #
+            else:
+                # In other phases we are fine.
+                pass
+        #
+        self.release(self.mutex_lock)
+        #
+        return targetobj
+
+
+# ____________________________________________________________
+#
+# The collector thread is put on another class, in order to separate
+# it more cleanly (both from a code organization point of view and
+# from the point of view of cache locality).
+
+
+class CollectorThread(object):
+    _alloc_flavor_ = "raw"
+
+    NULL = ConcurrentGenGC.NULL
+
+    def __init__(self, gc):
+        self.gc = gc
+        #
+        # When the mutator thread wants to trigger the next collection,
+        # it scans its own stack roots and prepares everything, then
+        # sets 'collector.running' to 1, and releases
+        # 'ready_to_start_lock'.  This triggers the collector thread,
+        # which re-acquires 'ready_to_start_lock' and does its job.
+        # When done it releases 'finished_lock'.  The mutator thread is
+        # responsible for resetting 'collector.running' to 0.
+        #
+        # The collector thread's state can be found (with careful locking)
+        # by inspecting the same variable from the mutator thread:
+        #   * collector.running == 1: Marking.  [Deletion barrier active.]
+        #   * collector.running == 2: Clearing weakrefs.
+        #   * collector.running == 3: Marking from unreachable finalizers.
+        #   * collector.running == 4: Sweeping.
+        #   * collector.running == -1: Done.
+        # The mutex_lock is acquired to go from 1 to 2, and from 2 to 3.
+        self.running = 0
+        #
+        # a different AddressStack class, which uses a different pool
+        # of free pages than the regular one, so can run concurrently
+        self.CollectorAddressStack = get_address_stack(lock="collector")
+        #
+        # when the collection starts, we make all young objects aging and
+        # move 'young_objects' into 'aging_objects'
+        self.aging_objects = self.NULL
+        #
+        # The start function for the thread, as a function and not a method
+        def collector_start():
+            if we_are_translated():
+                self.collector_run()
+            else:
+                self.collector_run_nontranslated()
+        collector_start._should_never_raise_ = True
+        self.collector_start = collector_start
+
+    def _initialize(self):
+        self.gray_objects = self.CollectorAddressStack()
+
+    def setup(self):
+        self.ready_to_start_lock = self.gc.ready_to_start_lock
+        self.finished_lock = self.gc.finished_lock
+        self.mutex_lock = self.gc.mutex_lock
+        #
+        # start the thread
+        self.collector_ident = ll_thread.c_thread_start_nowrapper(
+            llhelper(ll_thread.CALLBACK, self.collector_start))
+        assert self.collector_ident != -1
+
+    def acquire(self, lock):
+        ll_thread.c_thread_acquirelock(lock, 1)
+
+    def release(self, lock):
+        ll_thread.c_thread_releaselock(lock)
+
+    def get_mark(self, obj):
+        return self.gc.get_mark(obj)
+
+    def set_mark(self, obj, newmark):
+        self.gc.set_mark(obj, newmark)
 
     def collector_run_nontranslated(self):
         try:
@@ -847,7 +704,7 @@
             self.acquire(self.ready_to_start_lock)
             #
             # For tests: detect when we have to shut down
-            if self.collection_running == 42:
+            if self.running == 42:
                 self.release(self.finished_lock)
                 break
             #
@@ -863,17 +720,6 @@
             self.release(self.finished_lock)
 
 
-    def set_mark(self, obj, newmark):
-        _set_mark(self.header(obj), newmark)
-
-    def get_mark(self, obj):
-        mark = self.header(obj).tid & 0xFF
-        ll_assert(mark == MARK_BYTE_1 or
-                  mark == MARK_BYTE_2 or
-                  mark == MARK_BYTE_OLD or
-                  mark == MARK_BYTE_STATIC, "bad mark byte in object")
-        return mark
-
     def collector_mark(self):
         while True:
             #
@@ -888,8 +734,9 @@
             # unless XXX we've hit the write barrier of a large array
             self.acquire(self.mutex_lock)
             #debug_print("...collector thread has mutex_lock")
-            while self.extra_objects_to_mark.non_empty():
-                obj = self.extra_objects_to_mark.pop()
+            extra_objects_to_mark = self.gc.extra_objects_to_mark
+            while extra_objects_to_mark.non_empty():
+                obj = extra_objects_to_mark.pop()
                 self.get_mark(obj)
                 self.gray_objects.append(obj)
             #
@@ -905,185 +752,89 @@
             # Else release mutex_lock and try again.
             self.release(self.mutex_lock)
         #
-        self.collection_running = 2
+        self.running = 2
         #debug_print("collection_running = 2")
         self.release(self.mutex_lock)
 
     def _collect_mark(self):
-        cam = self.current_aging_marker
+        extra_objects_to_mark = self.gc.extra_objects_to_mark
+        cam = self.gc.current_aging_marker
         while self.gray_objects.non_empty():
             obj = self.gray_objects.pop()
-            if self.get_mark(obj) == cam:
-                #
-                # Scan the content of 'obj'.  We use a snapshot-at-the-
-                # beginning order, meaning that we want to scan the state
-                # of the object as it was at the beginning of the current
-                # collection --- and not the current state, which might have
-                # been modified.  That's why we have a deletion barrier:
-                # when the mutator thread is about to change an object that
-                # is not yet marked, it will itself do the scanning of just
-                # this object, and mark the object.  But this function is not
-                # synchronized, which means that in some rare cases it's
-                # possible that the object is scanned a second time here
-                # (harmlessly).
-                #
-                # The order of the next two lines is essential!  *First*
-                # scan the object, adding all objects found to gray_objects;
-                # and *only then* set the mark.  This is essential, because
-                # otherwise, we might set the mark, then the main thread
-                # thinks a force_scan() is not necessary and modifies the
-                # content of 'obj', and then here in the collector thread
-                # we scan a modified content --- and the original content
-                # is never scanned.
-                #
-                self.trace(obj, self._collect_add_pending, None)
-                self.set_mark(obj, MARK_BYTE_OLD)
-                #
-                # Interrupt early if the mutator's write barrier adds stuff
-                # to that list.  Note that the check is imprecise because
-                # it is not lock-protected, but that's good enough.  The
-                # idea is that we trace in priority objects flagged with
-                # the write barrier, because they are more likely to
-                # reference further objects that will soon be accessed too.
-                if self.extra_objects_to_mark.non_empty():
-                    return
+            if self.get_mark(obj) != cam:
+                continue
+            #
+            # Scan the content of 'obj'.  We use a snapshot-at-the-
+            # beginning order, meaning that we want to scan the state
+            # of the object as it was at the beginning of the current
+            # collection --- and not the current state, which might have
+            # been modified.  That's why we have a deletion barrier:
+            # when the mutator thread is about to change an object that
+            # is not yet marked, it will itself do the scanning of just
+            # this object, and mark the object.  But this function is not
+            # synchronized, which means that in some rare cases it's
+            # possible that the object is scanned a second time here
+            # (harmlessly).
+            #
+            # The order of the next two lines is essential!  *First*
+            # scan the object, adding all objects found to gray_objects;
+            # and *only then* set the mark.  This is essential, because
+            # otherwise, we might set the mark, then the main thread
+            # thinks a force_scan() is not necessary and modifies the
+            # content of 'obj', and then here in the collector thread
+            # we scan a modified content --- and the original content
+            # is never scanned.
+            #
+            self.gc.trace(obj, self._collect_add_pending, None)
+            self.set_mark(obj, MARK_BYTE_OLD)
+            #
+            # Interrupt early if the mutator's write barrier adds stuff
+            # to that list.  Note that the check is imprecise because
+            # it is not lock-protected, but that's good enough.  The
+            # idea is that we trace in priority objects flagged with
+            # the write barrier, because they are more likely to
+            # reference further objects that will soon be accessed too.
+            if extra_objects_to_mark.non_empty():
+                return
 
     def _collect_add_pending(self, root, ignored):
         obj = root.address[0]
+        # these 'get_mark(obj) are here for debugging invalid marks.
+        # XXX check that the C compiler removes them if lldebug is off
         self.get_mark(obj)
         self.gray_objects.append(obj)
 
     def collector_sweep(self):
-        self._collect_sweep_large_objects()
-        #
-        n = 1
-        while n < self.pagelists_length:
-            self._collect_sweep_pages(n)
-            n += 1
-        #
-        self.collection_running = -1
-        #debug_print("collection_running = -1")
-
-    def _collect_sweep_large_objects(self):
-        block = self.collect_pages[0]
-        cam = self.current_aging_marker
-        linked_list = self.NULL
-        first_block_in_linked_list = self.NULL
-        while block != self.NULL:
-            nextblock = list_next(block)
-            blockadr = llmemory.cast_ptr_to_adr(block)
-            blockadr = llarena.getfakearenaaddress(blockadr)
-            hdr = llmemory.cast_adr_to_ptr(blockadr + 8, self.HDRPTR)
+        cam = self.gc.current_aging_marker
+        hdr = self.aging_objects
+        linked_list = self.gc.old_objects
+        while hdr != self.NULL:
+            nexthdr = hdr.next
             mark = hdr.tid & 0xFF
             if mark == cam:
                 # the object is still not marked.  Free it.
+                blockadr = llmemory.cast_ptr_to_adr(hdr)
                 llarena.arena_free(blockadr)
                 #
             else:
                 # the object was marked: relink it
                 ll_assert(mark == MARK_BYTE_OLD,
                           "bad mark in large object")
-                set_next(block, linked_list)
-                linked_list = block
-                if first_block_in_linked_list == self.NULL:
-                    first_block_in_linked_list = block
-            block = nextblock
+                hdr.next = linked_list
+                linked_list = hdr
+                #
+            hdr = nexthdr
         #
-        self.collect_heads[0] = linked_list
-        self.collect_tails[0] = first_block_in_linked_list
+        self.gc.old_objects = linked_list
+        #
+        self.running = -1
+        #debug_print("collection_running = -1")
 
-    def _collect_sweep_pages(self, n):
-        # sweep all pages from the linked list starting at 'page',
-        # containing objects of fixed size 'object_size'.
-        size_gc_header = self.gcheaderbuilder.size_gc_header
-        page = self.collect_pages[n]
-        object_size = n << WORD_POWER_2
-        linked_list = self.NULL
-        first_loc_in_linked_list = self.NULL
-        cam = self.current_aging_marker
-        while page != self.NULL:
-            i = self.page_size - object_size
-            limit = raw_malloc_usage(self.HDRSIZE)
-            pageadr = llmemory.cast_ptr_to_adr(page)
-            pageadr = llarena.getfakearenaaddress(pageadr)
-            while i >= limit:
-                adr = pageadr + i
-                hdr = llmemory.cast_adr_to_ptr(adr, self.HDRPTR)
-                mark = hdr.tid & 0xFF
-                #
-                if mark == cam:
-                    # the location contains really an object (and is not just
-                    # part of a linked list of free locations), and moreover
-                    # the object is still not marked.  Free it by inserting
-                    # it into the linked list.
-                    #debug_print("sweeps", adr + size_gc_header)
-                    llarena.arena_reset(adr, object_size, 0)
-                    llarena.arena_reserve(adr, self.HDRSIZE)
-                    hdr = llmemory.cast_adr_to_ptr(adr, self.HDRPTR)
-                    set_next(hdr, linked_list)
-                    linked_list = hdr
-                    if first_loc_in_linked_list == self.NULL:
-                        first_loc_in_linked_list = hdr
-                    # XXX detect when the whole page is freed again
-                    #
-                    # Clear the data, in prevision for the following
-                    # malloc_fixedsize_clear().
-                    size_of_int = raw_malloc_usage(
-                        llmemory.sizeof(lltype.Signed))
-                    llarena.arena_reset(adr + size_of_int,
-                                        object_size - size_of_int, 2)
-                #
-                i -= object_size
-            #
-            page = list_next(page)
-        #
-        self.collect_heads[n] = linked_list
-        self.collect_tails[n] = first_loc_in_linked_list
-
-
-    # ----------
-    # Weakrefs
-
-    def weakref_deref(self, wrobj):
-        # Weakrefs need some care.  This code acts as a read barrier.
-        # The only way I found is to acquire the mutex_lock to prevent
-        # the collection thread from going from collection_running==1
-        # to collection_running==2, or from collection_running==2 to
-        # collection_running==3.
-        #
-        self.acquire(self.mutex_lock)
-        #
-        targetobj = gctypelayout.ll_weakref_deref(wrobj)
-        if targetobj != llmemory.NULL:
-            #
-            if self.collection_running == 1:
-                # If we are in the phase collection_running==1, we don't
-                # know if the object will be scanned a bit later or
-                # not; so we have to assume that it survives, and
-                # force it to be scanned.
-                self.get_mark(targetobj)
-                self.extra_objects_to_mark.append(targetobj)
-                #
-            elif self.collection_running == 2:
-                # In the phase collection_running==2, if the object is
-                # not marked it's too late; we have to detect that case
-                # and return NULL instead here, as if the corresponding
-                # collector phase was already finished (deal_with_weakrefs).
-                # Otherwise we would be returning an object that is about to
-                # be swept away.
-                if not self.is_marked_or_static(targetobj, self.current_mark):
-                    targetobj = llmemory.NULL
-                #
-            else:
-                # In other phases we are fine.
-                pass
-        #
-        self.release(self.mutex_lock)
-        #
-        return targetobj
+    # -------------------------
+    # CollectorThread: Weakrefs
 
     def deal_with_weakrefs(self):
-        self.collection_running = 3; return
+        self.running = 3; return
         # ^XXX^
         size_gc_header = self.gcheaderbuilder.size_gc_header
         current_mark = self.current_mark
@@ -1118,16 +869,15 @@
             weakref_page = next_page
         #
         self.acquire(self.mutex_lock)
-        self.collection_running = 3
-        #debug_print("collection_running = 3")
+        self.collector.running = 3
+        #debug_print("collector.running = 3")
         self.release(self.mutex_lock)
 
-
-    # ----------
-    # Finalizers
+    # ---------------------------
+    # CollectorThread: Finalizers
 
     def deal_with_objects_with_finalizers(self):
-        self.collection_running = 4; return
+        self.running = 4; return
         # ^XXX^
         
         # XXX needs to be done correctly; for now we'll call finalizers
@@ -1170,25 +920,12 @@
                   "should not see objects only reachable from finalizers "
                   "before we run them")
         #
-        self.collection_running = 4
+        self.collector.running = 4
         #debug_print("collection_running = 4")
 
 
 # ____________________________________________________________
 #
-# Support for linked lists (used here because AddressStack is not thread-safe)
-
-def list_next(hdr):
-    return llmemory.cast_adr_to_ptr(llmemory.cast_int_to_adr(hdr.tid),
-                                    ConcurrentGenGC.HDRPTR)
-
-def set_next(hdr, nexthdr):
-    hdr.tid = llmemory.cast_adr_to_int(llmemory.cast_ptr_to_adr(nexthdr),
-                                       "symbolic")
-
-
-# ____________________________________________________________
-#
 # Hack to write the 'mark' or the 'flags' bytes of an object header
 # without overwriting the whole word.  Essential in the rare case where
 # the other thread might be concurrently writing the other byte.
@@ -1232,19 +969,3 @@
                            [ConcurrentGenGC.HDRPTR, lltype.Signed],
                            lltype.Void, compilation_info=eci, _nowrapper=True,
                            _callable=emulate_set_flags)
-
-# ____________________________________________________________
-#
-# A lock to synchronize access to AddressStack's free pages
-
-class SyncLock:
-    _alloc_flavor_ = "raw"
-    _lock = lltype.nullptr(ll_thread.TLOCKP.TO)
-    def setup(self):
-        self._lock = ll_thread.allocate_ll_lock()
-    def acquire(self):
-        if self._lock:
-            ll_thread.c_thread_acquirelock(self._lock, 1)
-    def release(self):
-        if self._lock:
-            ll_thread.c_thread_releaselock(self._lock)
diff --git a/pypy/rpython/memory/support.py b/pypy/rpython/memory/support.py
--- a/pypy/rpython/memory/support.py
+++ b/pypy/rpython/memory/support.py
@@ -69,7 +69,7 @@
                 lltype.free(chunk, flavor="raw", track_allocation=False)
             self._unlock()
 
-        if lock is not None:
+        if lock is not None and not isinstance(lock, str):
             def _lock(self):   lock.acquire()
             def _unlock(self): lock.release()
         else:


More information about the pypy-commit mailing list