[pypy-commit] pypy concurrent-marksweep: Fix enough to have the simplest test pass again.
arigo
noreply at buildbot.pypy.org
Sun Oct 23 11:46:28 CEST 2011
Author: Armin Rigo <arigo at tunes.org>
Branch: concurrent-marksweep
Changeset: r48355:174ecd921aae
Date: 2011-10-23 11:45 +0200
http://bitbucket.org/pypy/pypy/changeset/174ecd921aae/
Log: Fix enough to have the simplest test pass again. For now, kill the
logic of pages, because it's a mess to reason both about it and
about concurrentgen.txt at the same time. All objects are just
malloc()ed.
diff --git a/pypy/rpython/lltypesystem/llarena.py b/pypy/rpython/lltypesystem/llarena.py
--- a/pypy/rpython/lltypesystem/llarena.py
+++ b/pypy/rpython/lltypesystem/llarena.py
@@ -551,6 +551,8 @@
has_protect = False
+# note: the concurrent GCs do mallocs in one thread and frees in another,
+# so make sure these point to a thread-safe implementation
llimpl_malloc = rffi.llexternal('malloc', [lltype.Signed], llmemory.Address,
sandboxsafe=True, _nowrapper=True)
llimpl_free = rffi.llexternal('free', [llmemory.Address], lltype.Void,
diff --git a/pypy/rpython/memory/gc/concurrentgen.py b/pypy/rpython/memory/gc/concurrentgen.py
--- a/pypy/rpython/memory/gc/concurrentgen.py
+++ b/pypy/rpython/memory/gc/concurrentgen.py
@@ -9,6 +9,7 @@
from pypy.rlib.rarithmetic import ovfcheck, LONG_BIT, r_uint
from pypy.rpython.memory.gc.base import GCBase
from pypy.rpython.memory import gctypelayout
+from pypy.rpython.memory.support import get_address_stack
from pypy.module.thread import ll_thread
#
@@ -18,7 +19,8 @@
# See concurrentgen.txt for some details.
#
# Major collections are serialized for the mark phase, but the sweep
-# phase can be parallelized again. XXX not done so far
+# phase can be parallelized again. XXX not done so far, YYY investigate
+# also completely parallelizing them too
#
# Based on observations that the timing of collections with "minimark"
# (on translate.py) is: about 15% of the time in minor collections
@@ -59,8 +61,10 @@
malloc_zero_filled = True
gcflag_extra = FL_EXTRA
- HDR = lltype.Struct('header', ('tid', lltype.Signed))
- HDRPTR = lltype.Ptr(HDR)
+ HDRPTR = lltype.Ptr(lltype.ForwardReference())
+ HDR = lltype.Struct('header', ('tid', lltype.Signed),
+ ('next', HDRPTR)) # <-- kill me later
+ HDRPTR.TO.become(HDR)
HDRSIZE = llmemory.sizeof(HDR)
NULL = lltype.nullptr(HDR)
typeid_is_in_field = 'tid', llgroup.HALFSHIFT
@@ -68,63 +72,14 @@
# ^^^ prebuilt objects may have the flag FL_WITHHASH;
# then they are one word longer, the extra word storing the hash.
- TRANSLATION_PARAMS = {'page_size': 16384,
- 'small_request_threshold': 35*WORD,
- }
+ TRANSLATION_PARAMS = {}
- def __init__(self, config, page_size=64, small_request_threshold=24,
- **kwds):
- # 'small_request_threshold' is the largest size that we will
- # satisfy using our own pages mecanism. Larger requests just
- # go to the system malloc().
- self.addressstack_lock_object = SyncLock()
- kwds['lock'] = self.addressstack_lock_object
+ def __init__(self, config, **kwds):
GCBase.__init__(self, config, **kwds)
- assert small_request_threshold % WORD == 0
- self.small_request_threshold = small_request_threshold
- self.page_size = page_size
- self.pagelists_length = small_request_threshold // WORD + 1
+ self.main_thread_ident = ll_thread.get_ident()
#
- # The following are arrays of 36 linked lists: the linked lists
- # at indices 1 to 35 correspond to pages that store objects of
- # size 1 * WORD to 35 * WORD. The linked list at index 0
- # contains the pages of objects that are larger than this limit
- # (variable-sized pages, with exactly one object per page).
- #
- def list_of_addresses_per_small_size():
- return lltype.malloc(rffi.CArray(self.HDRPTR),
- self.pagelists_length, flavor='raw',
- immortal=True)
- #
- # pages that contain at least one new young object
- self.new_young_objects_pages = list_of_addresses_per_small_size()
- #
- # when the collection starts and we make all young objects aging,
- # we move the linked lists above into 'aging_objects_pages'
- self.aging_objects_pages = list_of_addresses_per_small_size()
- #
- # free list of non-allocated locations within pages
- # (at index 0: always empty)
- self.location_free_lists = list_of_addresses_per_small_size()
- #
- # head and tail of the free list of locations built by the
- # collector thread
- self.collect_loc_heads = list_of_addresses_per_small_size()
- self.collect_loc_tails = list_of_addresses_per_small_size()
- #
- def collector_start():
- if we_are_translated():
- self.collector_run()
- else:
- self.collector_run_nontranslated()
- #
- collector_start._should_never_raise_ = True
- self.collector_start = collector_start
- #
- self.gray_objects = self.AddressStack()
- self.extra_objects_to_mark = self.AddressStack()
- self.flagged_objects = self.AddressStack()
- self.prebuilt_root_objects = self.AddressStack()
+ # Create the CollectorThread object
+ self.collector = CollectorThread(self)
#
self._initialize()
#
@@ -133,62 +88,31 @@
# that was not scanned yet.
self._init_writebarrier_logic()
- def _clear_list(self, array):
- i = 0
- while i < self.pagelists_length:
- array[i] = self.NULL
- i += 1
-
def _initialize(self):
- self.free_pages = self.NULL
+ # Initialize the GC. In normal translated program, this function
+ # is not translated but just called from __init__ ahead of time.
+ # During test_transformed_gc, it is translated, so that we can
+ # quickly reset the GC between tests.
#
- # Clear the lists
- self._clear_list(self.nonfree_pages)
- self._clear_list(self.collect_pages)
- self._clear_list(self.free_lists)
- self._clear_list(self.collect_heads)
- self._clear_list(self.collect_tails)
+ self.extra_objects_to_mark = self.AddressStack()
+ self.flagged_objects = self.AddressStack()
+ self.prebuilt_root_objects = self.AddressStack()
#
- self.finalizer_pages = self.NULL
- self.collect_finalizer_pages = self.NULL
- self.collect_finalizer_tails = self.NULL
- self.collect_run_finalizers_head = self.NULL
- self.collect_run_finalizers_tail = self.NULL
- self.objects_with_finalizers_to_run = self.NULL
- #
- self.weakref_pages = self.NULL
- self.collect_weakref_pages = self.NULL
- self.collect_weakref_tails = self.NULL
+ # the linked list of new young objects, and the linked list of
+ # all old objects. note that the aging objects are not here
+ # but on 'collector.aging_objects'.
+ self.young_objects = self.NULL
+ self.old_objects = self.NULL
#
# See concurrentgen.txt for more information about these fields.
self.current_young_marker = MARK_BYTE_1
self.current_aging_marker = MARK_BYTE_2
#
- # When the mutator thread wants to trigger the next collection,
- # it scans its own stack roots and prepares everything, then
- # sets 'collection_running' to 1, and releases
- # 'ready_to_start_lock'. This triggers the collector thread,
- # which re-acquires 'ready_to_start_lock' and does its job.
- # When done it releases 'finished_lock'. The mutator thread is
- # responsible for resetting 'collection_running' to 0.
- #
- # The collector thread's state can be found (with careful locking)
- # by inspecting the same variable from the mutator thread:
- # * collection_running == 1: Marking. [Deletion barrier active.]
- # * collection_running == 2: Clearing weakrefs.
- # * collection_running == 3: Marking from unreachable finalizers.
- # * collection_running == 4: Sweeping.
- # * collection_running == -1: Done.
- # The mutex_lock is acquired to go from 1 to 2, and from 2 to 3.
- self.collection_running = 0
#self.ready_to_start_lock = ...built in setup()
#self.finished_lock = ...built in setup()
+ #self.mutex_lock = ...built in setup()
#
- #self.mutex_lock = ...built in setup()
- self.gray_objects.clear()
- self.extra_objects_to_mark.clear()
- self.flagged_objects.clear()
- self.prebuilt_root_objects.clear()
+ self.collector._initialize()
def setup(self):
"Start the concurrent collector thread."
@@ -196,26 +120,22 @@
# 'run_finalizers' as a deque
self.finalizer_lock_count = 0
#
- self.main_thread_ident = ll_thread.get_ident()
self.ready_to_start_lock = ll_thread.allocate_ll_lock()
self.finished_lock = ll_thread.allocate_ll_lock()
self.mutex_lock = ll_thread.allocate_ll_lock()
- self.addressstack_lock_object.setup()
#
self.acquire(self.finished_lock)
self.acquire(self.ready_to_start_lock)
#
- self.collector_ident = ll_thread.c_thread_start_nowrapper(
- llhelper(ll_thread.CALLBACK, self.collector_start))
- assert self.collector_ident != -1
+ self.collector.setup()
def _teardown(self):
"Stop the collector thread after tests have run."
self.wait_for_the_end_of_collection()
#
- # start the next collection, but with collection_running set to 42,
+ # start the next collection, but with collector.running set to 42,
# which should shut down the collector thread
- self.collection_running = 42
+ self.collector.running = 42
debug_print("teardown!")
self.release(self.ready_to_start_lock)
self.acquire(self.finished_lock)
@@ -235,7 +155,11 @@
def malloc_fixedsize_clear(self, typeid, size,
needs_finalizer=False, contains_weakptr=False):
- # contains_weakptr: detected during collection
+ #
+ # For now, we always start the next collection as soon as the
+ # previous one is finished
+ if self.collector.running <= 0:
+ self.trigger_next_collection()
#
# Case of finalizers (test constant-folded)
if needs_finalizer:
@@ -250,22 +174,14 @@
# Regular case
size_gc_header = self.gcheaderbuilder.size_gc_header
totalsize = size_gc_header + size
- rawtotalsize = raw_malloc_usage(totalsize)
- if rawtotalsize <= self.small_request_threshold:
- ll_assert(rawtotalsize & (WORD - 1) == 0,
- "fixedsize not properly rounded")
- #
- n = rawtotalsize >> WORD_POWER_2
- result = self.free_lists[n]
- if result != self.NULL:
- self.free_lists[n] = list_next(result)
- obj = self.grow_reservation(result, totalsize)
- hdr = self.header(obj)
- hdr.tid = self.combine(typeid, self.current_young_marker, 0)
- #debug_print("malloc_fixedsize_clear", obj)
- return llmemory.cast_adr_to_ptr(obj, llmemory.GCREF)
- #
- return self._malloc_slowpath(typeid, size)
+ adr = llarena.arena_malloc(llmemory.raw_malloc_usage(totalsize), 2)
+ llarena.arena_reserve(adr, totalsize)
+ obj = adr + size_gc_header
+ hdr = self.header(obj)
+ hdr.tid = self.combine(typeid, self.current_young_marker, 0)
+ hdr.next = self.young_objects
+ self.young_objects = hdr
+ return llmemory.cast_adr_to_ptr(obj, llmemory.GCREF)
def malloc_varsize_clear(self, typeid, length, size, itemsize,
offset_to_length):
@@ -295,9 +211,9 @@
"round_up_for_allocation failed")
#
n = rawtotalsize >> WORD_POWER_2
- result = self.free_lists[n]
+ result = self.location_free_lists[n]
if result != self.NULL:
- self.free_lists[n] = list_next(result)
+ self.location_free_lists[n] = list_next(result)
obj = self.grow_reservation(result, totalsize)
hdr = self.header(obj)
hdr.tid = self.combine(typeid, self.current_young_marker, 0)
@@ -311,147 +227,6 @@
# is actually negative.
return self._malloc_varsize_slowpath(typeid, length)
- def _malloc_slowpath(self, typeid, size):
- # Slow-path malloc. Call this with 'size' being a valid and
- # rounded number, between WORD and up to MAXIMUM_SIZE.
- #
- # For now, we always start the next collection immediately.
- if self.collection_running <= 0:
- self.trigger_next_collection()
- #
- size_gc_header = self.gcheaderbuilder.size_gc_header
- totalsize = size_gc_header + size
- rawtotalsize = raw_malloc_usage(totalsize)
- #
- if rawtotalsize <= self.small_request_threshold:
- #
- # Case 1: unless trigger_next_collection() happened to get us
- # more locations in free_lists[n], we have run out of them
- ll_assert(rawtotalsize & (WORD - 1) == 0,
- "malloc_slowpath: non-rounded size")
- n = rawtotalsize >> WORD_POWER_2
- head = self.free_lists[n]
- if head:
- self.free_lists[n] = list_next(head)
- obj = self.grow_reservation(head, totalsize)
- hdr = self.header(obj)
- hdr.tid = self.combine(typeid, self.current_young_marker, 0)
- return llmemory.cast_adr_to_ptr(obj, llmemory.GCREF)
- #
- # We really have run out of the free list corresponding to
- # the size. Grab the next free page.
- newpage = self.free_pages
- if newpage == self.NULL:
- self.allocate_next_arena()
- newpage = self.free_pages
- self.free_pages = list_next(newpage)
- #
- # Put the free page in the list 'nonfree_pages[n]'. This is
- # a linked list chained through the first word of each page.
- set_next(newpage, self.nonfree_pages[n])
- self.nonfree_pages[n] = newpage
- #
- # Initialize the free page to contain objects of the given
- # size. This requires setting up all object locations in the
- # page, linking them in the free list.
- i = self.page_size - rawtotalsize
- limit = rawtotalsize + raw_malloc_usage(self.HDRSIZE)
- newpageadr = llmemory.cast_ptr_to_adr(newpage)
- newpageadr = llarena.getfakearenaaddress(newpageadr)
- while i >= limit:
- adr = newpageadr + i
- llarena.arena_reserve(adr, self.HDRSIZE)
- p = llmemory.cast_adr_to_ptr(adr, self.HDRPTR)
- set_next(p, head)
- head = p
- i -= rawtotalsize
- self.free_lists[n] = head
- result = newpageadr + i
- #
- # Done: all object locations are linked, apart from
- # 'result', which is the first object location in the page.
- # Note that if the size is not an exact divisor of
- # 4096-WORD, there are a few wasted WORDs, which we place at
- # the start of the page rather than at the end (Hans Boehm,
- # xxx ref).
- #
- return self._malloc_result(typeid, totalsize, result)
- else:
- # Case 2: the object is too large, so allocate it directly
- # with the system malloc().
- return self._malloc_large_object(typeid, size, 0)
- #
- _malloc_slowpath._dont_inline_ = True
-
- def _malloc_result(self, typeid, totalsize, result):
- llarena.arena_reserve(result, totalsize)
- hdr = llmemory.cast_adr_to_ptr(result, self.HDRPTR)
- hdr.tid = self.combine(typeid, self.current_young_marker, 0)
- obj = result + self.gcheaderbuilder.size_gc_header
- #debug_print("malloc_slowpath", obj)
- return llmemory.cast_adr_to_ptr(obj, llmemory.GCREF)
-
- def _malloc_large_object(self, typeid, size, linked_list):
- # xxx on 32-bit, we'll prefer 64-bit alignment of the object by
- # always allocating an 8-bytes header
- totalsize = self.gcheaderbuilder.size_gc_header + size
- rawtotalsize = raw_malloc_usage(totalsize)
- rawtotalsize += 8
- block = llarena.arena_malloc(rawtotalsize, 2)
- if not block:
- raise MemoryError
- llarena.arena_reserve(block, self.HDRSIZE)
- blockhdr = llmemory.cast_adr_to_ptr(block, self.HDRPTR)
- if linked_list == 0:
- set_next(blockhdr, self.nonfree_pages[0])
- self.nonfree_pages[0] = blockhdr
- elif linked_list == 1:
- set_next(blockhdr, self.finalizer_pages)
- self.finalizer_pages = blockhdr
- elif linked_list == 2:
- set_next(blockhdr, self.weakref_pages)
- self.weakref_pages = blockhdr
- else:
- ll_assert(0, "bad linked_list")
- return self._malloc_result(typeid, totalsize, block + 8)
- _malloc_large_object._annspecialcase_ = 'specialize:arg(3)'
- _malloc_large_object._dont_inline_ = True
-
- def _malloc_varsize_slowpath(self, typeid, length):
- #
- if length < 0:
- # negative length! This likely comes from an overflow
- # earlier. We will just raise MemoryError here.
- raise MemoryError
- #
- # Compute the total size, carefully checking for overflows.
- nonvarsize = self.fixed_size(typeid)
- itemsize = self.varsize_item_sizes(typeid)
- try:
- varsize = ovfcheck(itemsize * length)
- totalsize = ovfcheck(nonvarsize + varsize)
- except OverflowError:
- raise MemoryError
- #
- # Detect very rare cases of overflows
- if raw_malloc_usage(totalsize) > MAXIMUM_SIZE:
- raise MemoryError("rare case of overflow")
- #
- totalsize = llarena.round_up_for_allocation(totalsize)
- result = self._malloc_slowpath(typeid, totalsize)
- #
- offset_to_length = self.varsize_offset_to_length(typeid)
- obj = llmemory.cast_ptr_to_adr(result)
- (obj + offset_to_length).signed[0] = length
- return result
- _malloc_varsize_slowpath._dont_inline_ = True
-
- def _malloc_with_finalizer(self, typeid, size):
- return self._malloc_large_object(typeid, size, 1)
-
- def _malloc_weakref(self, typeid, size):
- return self._malloc_large_object(typeid, size, 2)
-
# ----------
# Other functions in the GC API
@@ -545,17 +320,17 @@
# it is only possible to reach this point if there is
# a collection running in collector_mark(), before it
# does mutex_lock itself. Check this:
- ll_assert(self.collection_running == 1,
+ ll_assert(self.collector.running == 1,
"write barrier: wrong call?")
#
# It's fine to set the mark before tracing, because
# we are anyway in a 'mutex_lock' critical section.
# The collector thread will not exit from the phase
- # 'collection_running == 1' here.
+ # 'collector.running == 1' here.
self.trace(obj, self._barrier_add_extra, None)
#
# Still at 1:
- ll_assert(self.collection_running == 1,
+ ll_assert(self.collector.running == 1,
"write barrier: oups!?")
#
else:
@@ -582,50 +357,19 @@
def wait_for_the_end_of_collection(self):
"""In the mutator thread: wait for the minor collection currently
running (if any) to finish."""
- if self.collection_running != 0:
+ if self.collector.running != 0:
debug_start("gc-stop")
#
self.acquire(self.finished_lock)
- self.collection_running = 0
- #debug_print("collection_running = 0")
+ self.collector.running = 0
+ #debug_print("collector.running = 0")
#
# Check invariants
ll_assert(not self.extra_objects_to_mark.non_empty(),
"objs left behind in extra_objects_to_mark")
- ll_assert(not self.gray_objects.non_empty(),
+ ll_assert(not self.collector.gray_objects.non_empty(),
"objs left behind in gray_objects")
#
- # Grab the results of the last collection: read the collector's
- # 'collect_heads/collect_tails' and merge them with the mutator's
- # 'free_lists'.
- n = 1
- while n < self.pagelists_length:
- self.free_lists[n] = self.join_lists(self.free_lists[n],
- self.collect_heads[n],
- self.collect_tails[n])
- n += 1
- #
- # Do the same with 'collect_heads[0]/collect_tails[0]'.
- self.nonfree_pages[0] = self.join_lists(self.nonfree_pages[0],
- self.collect_heads[0],
- self.collect_tails[0])
- #
- # Do the same with 'collect_weakref_pages/tails'
- self.weakref_pages = self.join_lists(self.weakref_pages,
- self.collect_weakref_pages,
- self.collect_weakref_tails)
- #
- # Do the same with 'collect_finalizer_pages/tails'
- self.finalizer_pages = self.join_lists(self.finalizer_pages,
- self.collect_finalizer_pages,
- self.collect_finalizer_tails)
- #
- # Do the same with 'collect_run_finalizers_head/tail'
- self.objects_with_finalizers_to_run = self.join_lists(
- self.objects_with_finalizers_to_run,
- self.collect_run_finalizers_head,
- self.collect_run_finalizers_tail)
- #
if self.DEBUG:
self.debug_check_lists()
#
@@ -635,7 +379,7 @@
# can start the next collection, and then this function returns
# with a collection in progress, which it should not. Be careful
# to call execute_finalizers_ll() in the caller somewhere.
- ll_assert(self.collection_running == 0,
+ ll_assert(self.collector.running == 0,
"collector thread not paused?")
def join_lists(self, list1, list2head, list2tail):
@@ -649,6 +393,7 @@
def execute_finalizers_ll(self):
+ return # XXX
self.finalizer_lock_count += 1
try:
while self.objects_with_finalizers_to_run != self.NULL:
@@ -689,7 +434,7 @@
The most useful default.
gen>=4: Do a full synchronous major collection.
"""
- if gen >= 1 or self.collection_running <= 0:
+ if gen >= 1 or self.collector.running <= 0:
self.trigger_next_collection(gen >= 3)
if gen >= 2:
self.wait_for_the_end_of_collection()
@@ -710,15 +455,15 @@
None) # static in prebuilt gc
#
# Add the objects still waiting in 'objects_with_finalizers_to_run'
- p = self.objects_with_finalizers_to_run
- while p != self.NULL:
- x = llmemory.cast_ptr_to_adr(p)
- x = llarena.getfakearenaaddress(x) + 8
- obj = x + self.gcheaderbuilder.size_gc_header
- #debug_print("_objects_with_finalizers_to_run", obj)
- self.get_mark(obj)
- self.gray_objects.append(obj)
- p = list_next(p)
+ #p = self.objects_with_finalizers_to_run
+ #while p != self.NULL:
+ # x = llmemory.cast_ptr_to_adr(p)
+ # x = llarena.getfakearenaaddress(x) + 8
+ # obj = x + self.gcheaderbuilder.size_gc_header
+ # #debug_print("_objects_with_finalizers_to_run", obj)
+ # self.get_mark(obj)
+ # self.gray_objects.append(obj)
+ # p = list_next(p)
#
# Add all old objects that have been written to since the last
# time trigger_next_collection was called
@@ -732,27 +477,16 @@
self.current_young_marker = self.current_aging_marker
self.current_aging_marker = other
#
- # Copy a few 'mutator' fields to 'collector' fields: the
- # 'collect_pages[n]' make linked lists of all nonfree pages at the
- # start of the collection (unlike the 'nonfree_pages' lists, which
- # the mutator will continue to grow).
- n = 0
- while n < self.pagelists_length:
- self.collect_pages[n] = self.nonfree_pages[n]
- n += 1
- self.collect_weakref_pages = self.weakref_pages
- self.collect_finalizer_pages = self.finalizer_pages
- #
- # Clear the following lists. When the collector thread finishes,
- # it will give back (in collect_{pages,tails}[0] and
- # collect_finalizer_{pages,tails}) all the original items that survive.
- self.nonfree_pages[0] = self.NULL
- self.weakref_pages = self.NULL
- self.finalizer_pages = self.NULL
+ # Copy a few 'mutator' fields to 'collector' fields
+ collector = self.collector
+ collector.aging_objects = self.young_objects
+ self.young_objects = self.NULL
+ #self.collect_weakref_pages = self.weakref_pages
+ #self.collect_finalizer_pages = self.finalizer_pages
#
# Start the collector thread
- self.collection_running = 1
- #debug_print("collection_running = 1")
+ self.collector.running = 1
+ #debug_print("collector.running = 1")
self.release(self.ready_to_start_lock)
#
debug_stop("gc-start")
@@ -763,33 +497,29 @@
obj = root.address[0]
#debug_print("_add_stack_root", obj)
self.get_mark(obj)
- self.gray_objects.append(obj)
+ self.collector.gray_objects.append(obj)
def _add_prebuilt_root(self, obj, ignored):
+ # NB. it's ok to edit 'gray_objects' from the mutator thread here,
+ # because the collector thread is not running yet
#debug_print("_add_prebuilt_root", obj)
self.get_mark(obj)
- self.gray_objects.append(obj)
+ self.collector.gray_objects.append(obj)
def debug_check_lists(self):
# just check that they are correct, non-infinite linked lists
- self.debug_check_list(self.nonfree_pages[0])
- n = 1
- while n < self.pagelists_length:
- self.debug_check_list(self.free_lists[n])
- n += 1
- self.debug_check_list(self.weakref_pages)
- self.debug_check_list(self.finalizer_pages)
- self.debug_check_list(self.objects_with_finalizers_to_run)
+ self.debug_check_list(self.young_objects)
+ self.debug_check_list(self.old_objects)
- def debug_check_list(self, page):
+ def debug_check_list(self, list):
try:
- previous_page = self.NULL
+ previous = self.NULL
count = 0
- while page != self.NULL:
+ while list != self.NULL:
# prevent constant-folding, and detects loops of length 1
- ll_assert(page != previous_page, "loop!")
- previous_page = page
- page = list_next(page)
+ ll_assert(list != previous, "loop!")
+ previous = list
+ list = list.next
count += 1
return count
except KeyboardInterrupt:
@@ -797,24 +527,151 @@
raise
def acquire(self, lock):
- if (we_are_translated() or
- ll_thread.get_ident() != self.main_thread_ident):
+ if we_are_translated():
ll_thread.c_thread_acquirelock(lock, 1)
else:
+ assert ll_thread.get_ident() == self.main_thread_ident
while rffi.cast(lltype.Signed,
ll_thread.c_thread_acquirelock(lock, 0)) == 0:
time.sleep(0.05)
# ---------- EXCEPTION FROM THE COLLECTOR THREAD ----------
- if hasattr(self, '_exc_info'):
+ if hasattr(self.collector, '_exc_info'):
self._reraise_from_collector_thread()
def release(self, lock):
ll_thread.c_thread_releaselock(lock)
def _reraise_from_collector_thread(self):
- exc, val, tb = self._exc_info
+ exc, val, tb = self.collector._exc_info
raise exc, val, tb
+ def set_mark(self, obj, newmark):
+ _set_mark(self.header(obj), newmark)
+
+ def get_mark(self, obj):
+ mark = self.header(obj).tid & 0xFF
+ ll_assert(mark == MARK_BYTE_1 or
+ mark == MARK_BYTE_2 or
+ mark == MARK_BYTE_OLD or
+ mark == MARK_BYTE_STATIC, "bad mark byte in object")
+ return mark
+
+ # ----------
+ # Weakrefs
+
+ def weakref_deref(self, wrobj):
+ # Weakrefs need some care. This code acts as a read barrier.
+ # The only way I found is to acquire the mutex_lock to prevent
+ # the collection thread from going from collector.running==1
+ # to collector.running==2, or from collector.running==2 to
+ # collector.running==3.
+ #
+ self.acquire(self.mutex_lock)
+ #
+ targetobj = gctypelayout.ll_weakref_deref(wrobj)
+ if targetobj != llmemory.NULL:
+ #
+ if self.collector.running == 1:
+ # If we are in the phase collector.running==1, we don't
+ # know if the object will be scanned a bit later or
+ # not; so we have to assume that it survives, and
+ # force it to be scanned.
+ self.get_mark(targetobj)
+ self.extra_objects_to_mark.append(targetobj)
+ #
+ elif self.collector.running == 2:
+ # In the phase collector.running==2, if the object is
+ # not marked it's too late; we have to detect that case
+ # and return NULL instead here, as if the corresponding
+ # collector phase was already finished (deal_with_weakrefs).
+ # Otherwise we would be returning an object that is about to
+ # be swept away.
+ if not self.is_marked_or_static(targetobj, self.current_mark):
+ targetobj = llmemory.NULL
+ #
+ else:
+ # In other phases we are fine.
+ pass
+ #
+ self.release(self.mutex_lock)
+ #
+ return targetobj
+
+
+# ____________________________________________________________
+#
+# The collector thread is put on another class, in order to separate
+# it more cleanly (both from a code organization point of view and
+# from the point of view of cache locality).
+
+
+class CollectorThread(object):
+ _alloc_flavor_ = "raw"
+
+ NULL = ConcurrentGenGC.NULL
+
+ def __init__(self, gc):
+ self.gc = gc
+ #
+ # When the mutator thread wants to trigger the next collection,
+ # it scans its own stack roots and prepares everything, then
+ # sets 'collector.running' to 1, and releases
+ # 'ready_to_start_lock'. This triggers the collector thread,
+ # which re-acquires 'ready_to_start_lock' and does its job.
+ # When done it releases 'finished_lock'. The mutator thread is
+ # responsible for resetting 'collector.running' to 0.
+ #
+ # The collector thread's state can be found (with careful locking)
+ # by inspecting the same variable from the mutator thread:
+ # * collector.running == 1: Marking. [Deletion barrier active.]
+ # * collector.running == 2: Clearing weakrefs.
+ # * collector.running == 3: Marking from unreachable finalizers.
+ # * collector.running == 4: Sweeping.
+ # * collector.running == -1: Done.
+ # The mutex_lock is acquired to go from 1 to 2, and from 2 to 3.
+ self.running = 0
+ #
+ # a different AddressStack class, which uses a different pool
+ # of free pages than the regular one, so can run concurrently
+ self.CollectorAddressStack = get_address_stack(lock="collector")
+ #
+ # when the collection starts, we make all young objects aging and
+ # move 'young_objects' into 'aging_objects'
+ self.aging_objects = self.NULL
+ #
+ # The start function for the thread, as a function and not a method
+ def collector_start():
+ if we_are_translated():
+ self.collector_run()
+ else:
+ self.collector_run_nontranslated()
+ collector_start._should_never_raise_ = True
+ self.collector_start = collector_start
+
+ def _initialize(self):
+ self.gray_objects = self.CollectorAddressStack()
+
+ def setup(self):
+ self.ready_to_start_lock = self.gc.ready_to_start_lock
+ self.finished_lock = self.gc.finished_lock
+ self.mutex_lock = self.gc.mutex_lock
+ #
+ # start the thread
+ self.collector_ident = ll_thread.c_thread_start_nowrapper(
+ llhelper(ll_thread.CALLBACK, self.collector_start))
+ assert self.collector_ident != -1
+
+ def acquire(self, lock):
+ ll_thread.c_thread_acquirelock(lock, 1)
+
+ def release(self, lock):
+ ll_thread.c_thread_releaselock(lock)
+
+ def get_mark(self, obj):
+ return self.gc.get_mark(obj)
+
+ def set_mark(self, obj, newmark):
+ self.gc.set_mark(obj, newmark)
def collector_run_nontranslated(self):
try:
@@ -847,7 +704,7 @@
self.acquire(self.ready_to_start_lock)
#
# For tests: detect when we have to shut down
- if self.collection_running == 42:
+ if self.running == 42:
self.release(self.finished_lock)
break
#
@@ -863,17 +720,6 @@
self.release(self.finished_lock)
- def set_mark(self, obj, newmark):
- _set_mark(self.header(obj), newmark)
-
- def get_mark(self, obj):
- mark = self.header(obj).tid & 0xFF
- ll_assert(mark == MARK_BYTE_1 or
- mark == MARK_BYTE_2 or
- mark == MARK_BYTE_OLD or
- mark == MARK_BYTE_STATIC, "bad mark byte in object")
- return mark
-
def collector_mark(self):
while True:
#
@@ -888,8 +734,9 @@
# unless XXX we've hit the write barrier of a large array
self.acquire(self.mutex_lock)
#debug_print("...collector thread has mutex_lock")
- while self.extra_objects_to_mark.non_empty():
- obj = self.extra_objects_to_mark.pop()
+ extra_objects_to_mark = self.gc.extra_objects_to_mark
+ while extra_objects_to_mark.non_empty():
+ obj = extra_objects_to_mark.pop()
self.get_mark(obj)
self.gray_objects.append(obj)
#
@@ -905,185 +752,89 @@
# Else release mutex_lock and try again.
self.release(self.mutex_lock)
#
- self.collection_running = 2
+ self.running = 2
#debug_print("collection_running = 2")
self.release(self.mutex_lock)
def _collect_mark(self):
- cam = self.current_aging_marker
+ extra_objects_to_mark = self.gc.extra_objects_to_mark
+ cam = self.gc.current_aging_marker
while self.gray_objects.non_empty():
obj = self.gray_objects.pop()
- if self.get_mark(obj) == cam:
- #
- # Scan the content of 'obj'. We use a snapshot-at-the-
- # beginning order, meaning that we want to scan the state
- # of the object as it was at the beginning of the current
- # collection --- and not the current state, which might have
- # been modified. That's why we have a deletion barrier:
- # when the mutator thread is about to change an object that
- # is not yet marked, it will itself do the scanning of just
- # this object, and mark the object. But this function is not
- # synchronized, which means that in some rare cases it's
- # possible that the object is scanned a second time here
- # (harmlessly).
- #
- # The order of the next two lines is essential! *First*
- # scan the object, adding all objects found to gray_objects;
- # and *only then* set the mark. This is essential, because
- # otherwise, we might set the mark, then the main thread
- # thinks a force_scan() is not necessary and modifies the
- # content of 'obj', and then here in the collector thread
- # we scan a modified content --- and the original content
- # is never scanned.
- #
- self.trace(obj, self._collect_add_pending, None)
- self.set_mark(obj, MARK_BYTE_OLD)
- #
- # Interrupt early if the mutator's write barrier adds stuff
- # to that list. Note that the check is imprecise because
- # it is not lock-protected, but that's good enough. The
- # idea is that we trace in priority objects flagged with
- # the write barrier, because they are more likely to
- # reference further objects that will soon be accessed too.
- if self.extra_objects_to_mark.non_empty():
- return
+ if self.get_mark(obj) != cam:
+ continue
+ #
+ # Scan the content of 'obj'. We use a snapshot-at-the-
+ # beginning order, meaning that we want to scan the state
+ # of the object as it was at the beginning of the current
+ # collection --- and not the current state, which might have
+ # been modified. That's why we have a deletion barrier:
+ # when the mutator thread is about to change an object that
+ # is not yet marked, it will itself do the scanning of just
+ # this object, and mark the object. But this function is not
+ # synchronized, which means that in some rare cases it's
+ # possible that the object is scanned a second time here
+ # (harmlessly).
+ #
+ # The order of the next two lines is essential! *First*
+ # scan the object, adding all objects found to gray_objects;
+ # and *only then* set the mark. This is essential, because
+ # otherwise, we might set the mark, then the main thread
+ # thinks a force_scan() is not necessary and modifies the
+ # content of 'obj', and then here in the collector thread
+ # we scan a modified content --- and the original content
+ # is never scanned.
+ #
+ self.gc.trace(obj, self._collect_add_pending, None)
+ self.set_mark(obj, MARK_BYTE_OLD)
+ #
+ # Interrupt early if the mutator's write barrier adds stuff
+ # to that list. Note that the check is imprecise because
+ # it is not lock-protected, but that's good enough. The
+ # idea is that we trace in priority objects flagged with
+ # the write barrier, because they are more likely to
+ # reference further objects that will soon be accessed too.
+ if extra_objects_to_mark.non_empty():
+ return
def _collect_add_pending(self, root, ignored):
obj = root.address[0]
+ # these 'get_mark(obj) are here for debugging invalid marks.
+ # XXX check that the C compiler removes them if lldebug is off
self.get_mark(obj)
self.gray_objects.append(obj)
def collector_sweep(self):
- self._collect_sweep_large_objects()
- #
- n = 1
- while n < self.pagelists_length:
- self._collect_sweep_pages(n)
- n += 1
- #
- self.collection_running = -1
- #debug_print("collection_running = -1")
-
- def _collect_sweep_large_objects(self):
- block = self.collect_pages[0]
- cam = self.current_aging_marker
- linked_list = self.NULL
- first_block_in_linked_list = self.NULL
- while block != self.NULL:
- nextblock = list_next(block)
- blockadr = llmemory.cast_ptr_to_adr(block)
- blockadr = llarena.getfakearenaaddress(blockadr)
- hdr = llmemory.cast_adr_to_ptr(blockadr + 8, self.HDRPTR)
+ cam = self.gc.current_aging_marker
+ hdr = self.aging_objects
+ linked_list = self.gc.old_objects
+ while hdr != self.NULL:
+ nexthdr = hdr.next
mark = hdr.tid & 0xFF
if mark == cam:
# the object is still not marked. Free it.
+ blockadr = llmemory.cast_ptr_to_adr(hdr)
llarena.arena_free(blockadr)
#
else:
# the object was marked: relink it
ll_assert(mark == MARK_BYTE_OLD,
"bad mark in large object")
- set_next(block, linked_list)
- linked_list = block
- if first_block_in_linked_list == self.NULL:
- first_block_in_linked_list = block
- block = nextblock
+ hdr.next = linked_list
+ linked_list = hdr
+ #
+ hdr = nexthdr
#
- self.collect_heads[0] = linked_list
- self.collect_tails[0] = first_block_in_linked_list
+ self.gc.old_objects = linked_list
+ #
+ self.running = -1
+ #debug_print("collection_running = -1")
- def _collect_sweep_pages(self, n):
- # sweep all pages from the linked list starting at 'page',
- # containing objects of fixed size 'object_size'.
- size_gc_header = self.gcheaderbuilder.size_gc_header
- page = self.collect_pages[n]
- object_size = n << WORD_POWER_2
- linked_list = self.NULL
- first_loc_in_linked_list = self.NULL
- cam = self.current_aging_marker
- while page != self.NULL:
- i = self.page_size - object_size
- limit = raw_malloc_usage(self.HDRSIZE)
- pageadr = llmemory.cast_ptr_to_adr(page)
- pageadr = llarena.getfakearenaaddress(pageadr)
- while i >= limit:
- adr = pageadr + i
- hdr = llmemory.cast_adr_to_ptr(adr, self.HDRPTR)
- mark = hdr.tid & 0xFF
- #
- if mark == cam:
- # the location contains really an object (and is not just
- # part of a linked list of free locations), and moreover
- # the object is still not marked. Free it by inserting
- # it into the linked list.
- #debug_print("sweeps", adr + size_gc_header)
- llarena.arena_reset(adr, object_size, 0)
- llarena.arena_reserve(adr, self.HDRSIZE)
- hdr = llmemory.cast_adr_to_ptr(adr, self.HDRPTR)
- set_next(hdr, linked_list)
- linked_list = hdr
- if first_loc_in_linked_list == self.NULL:
- first_loc_in_linked_list = hdr
- # XXX detect when the whole page is freed again
- #
- # Clear the data, in prevision for the following
- # malloc_fixedsize_clear().
- size_of_int = raw_malloc_usage(
- llmemory.sizeof(lltype.Signed))
- llarena.arena_reset(adr + size_of_int,
- object_size - size_of_int, 2)
- #
- i -= object_size
- #
- page = list_next(page)
- #
- self.collect_heads[n] = linked_list
- self.collect_tails[n] = first_loc_in_linked_list
-
-
- # ----------
- # Weakrefs
-
- def weakref_deref(self, wrobj):
- # Weakrefs need some care. This code acts as a read barrier.
- # The only way I found is to acquire the mutex_lock to prevent
- # the collection thread from going from collection_running==1
- # to collection_running==2, or from collection_running==2 to
- # collection_running==3.
- #
- self.acquire(self.mutex_lock)
- #
- targetobj = gctypelayout.ll_weakref_deref(wrobj)
- if targetobj != llmemory.NULL:
- #
- if self.collection_running == 1:
- # If we are in the phase collection_running==1, we don't
- # know if the object will be scanned a bit later or
- # not; so we have to assume that it survives, and
- # force it to be scanned.
- self.get_mark(targetobj)
- self.extra_objects_to_mark.append(targetobj)
- #
- elif self.collection_running == 2:
- # In the phase collection_running==2, if the object is
- # not marked it's too late; we have to detect that case
- # and return NULL instead here, as if the corresponding
- # collector phase was already finished (deal_with_weakrefs).
- # Otherwise we would be returning an object that is about to
- # be swept away.
- if not self.is_marked_or_static(targetobj, self.current_mark):
- targetobj = llmemory.NULL
- #
- else:
- # In other phases we are fine.
- pass
- #
- self.release(self.mutex_lock)
- #
- return targetobj
+ # -------------------------
+ # CollectorThread: Weakrefs
def deal_with_weakrefs(self):
- self.collection_running = 3; return
+ self.running = 3; return
# ^XXX^
size_gc_header = self.gcheaderbuilder.size_gc_header
current_mark = self.current_mark
@@ -1118,16 +869,15 @@
weakref_page = next_page
#
self.acquire(self.mutex_lock)
- self.collection_running = 3
- #debug_print("collection_running = 3")
+ self.collector.running = 3
+ #debug_print("collector.running = 3")
self.release(self.mutex_lock)
-
- # ----------
- # Finalizers
+ # ---------------------------
+ # CollectorThread: Finalizers
def deal_with_objects_with_finalizers(self):
- self.collection_running = 4; return
+ self.running = 4; return
# ^XXX^
# XXX needs to be done correctly; for now we'll call finalizers
@@ -1170,25 +920,12 @@
"should not see objects only reachable from finalizers "
"before we run them")
#
- self.collection_running = 4
+ self.collector.running = 4
#debug_print("collection_running = 4")
# ____________________________________________________________
#
-# Support for linked lists (used here because AddressStack is not thread-safe)
-
-def list_next(hdr):
- return llmemory.cast_adr_to_ptr(llmemory.cast_int_to_adr(hdr.tid),
- ConcurrentGenGC.HDRPTR)
-
-def set_next(hdr, nexthdr):
- hdr.tid = llmemory.cast_adr_to_int(llmemory.cast_ptr_to_adr(nexthdr),
- "symbolic")
-
-
-# ____________________________________________________________
-#
# Hack to write the 'mark' or the 'flags' bytes of an object header
# without overwriting the whole word. Essential in the rare case where
# the other thread might be concurrently writing the other byte.
@@ -1232,19 +969,3 @@
[ConcurrentGenGC.HDRPTR, lltype.Signed],
lltype.Void, compilation_info=eci, _nowrapper=True,
_callable=emulate_set_flags)
-
-# ____________________________________________________________
-#
-# A lock to synchronize access to AddressStack's free pages
-
-class SyncLock:
- _alloc_flavor_ = "raw"
- _lock = lltype.nullptr(ll_thread.TLOCKP.TO)
- def setup(self):
- self._lock = ll_thread.allocate_ll_lock()
- def acquire(self):
- if self._lock:
- ll_thread.c_thread_acquirelock(self._lock, 1)
- def release(self):
- if self._lock:
- ll_thread.c_thread_releaselock(self._lock)
diff --git a/pypy/rpython/memory/support.py b/pypy/rpython/memory/support.py
--- a/pypy/rpython/memory/support.py
+++ b/pypy/rpython/memory/support.py
@@ -69,7 +69,7 @@
lltype.free(chunk, flavor="raw", track_allocation=False)
self._unlock()
- if lock is not None:
+ if lock is not None and not isinstance(lock, str):
def _lock(self): lock.acquire()
def _unlock(self): lock.release()
else:
More information about the pypy-commit
mailing list