[pypy-commit] pypy concurrent-marksweep: Proper weakref support. In-progress.
arigo
noreply at buildbot.pypy.org
Fri Oct 14 10:57:34 CEST 2011
Author: Armin Rigo <arigo at tunes.org>
Branch: concurrent-marksweep
Changeset: r48051:62ab2327b1d5
Date: 2011-10-14 10:57 +0200
http://bitbucket.org/pypy/pypy/changeset/62ab2327b1d5/
Log: Proper weakref support. In-progress.
diff --git a/pypy/rpython/memory/gc/base.py b/pypy/rpython/memory/gc/base.py
--- a/pypy/rpython/memory/gc/base.py
+++ b/pypy/rpython/memory/gc/base.py
@@ -17,6 +17,7 @@
moving_gc = False
needs_write_barrier = False
needs_deletion_barrier = False
+ needs_weakref_read_barrier = False
malloc_zero_filled = False
prebuilt_gc_objects_are_static_roots = True
object_minimal_size = 0
diff --git a/pypy/rpython/memory/gc/concurrentms.py b/pypy/rpython/memory/gc/concurrentms.py
--- a/pypy/rpython/memory/gc/concurrentms.py
+++ b/pypy/rpython/memory/gc/concurrentms.py
@@ -8,6 +8,7 @@
from pypy.rlib.debug import ll_assert, debug_print, debug_start, debug_stop
from pypy.rlib.rarithmetic import ovfcheck, LONG_BIT, r_uint
from pypy.rpython.memory.gc.base import GCBase
+from pypy.rpython.memory import gctypelayout
from pypy.module.thread import ll_thread
#
@@ -31,7 +32,7 @@
# Objects start with an integer 'tid', which is decomposed as follows.
-# Lowest byte: one of the the following values (which are all odd, so
+# Lowest byte: one of the following values (which are all odd, so
# let us know if the 'tid' is valid or is just a word-aligned address):
MARK_VALUE_1 = 0x4D # 'M', 77
MARK_VALUE_2 = 0x6B # 'k', 107
@@ -47,6 +48,7 @@
inline_simple_malloc = True
inline_simple_malloc_varsize = True
needs_deletion_barrier = True
+ needs_weakref_read_barrier = True
prebuilt_gc_objects_are_static_roots = False
malloc_zero_filled = True
gcflag_extra = FL_EXTRA
@@ -137,6 +139,10 @@
self.collect_run_finalizers_tail = self.NULL
self.objects_with_finalizers_to_run = self.NULL
#
+ self.weakref_pages = self.NULL
+ self.collect_weakref_pages = self.NULL
+ self.collect_weakref_tails = self.NULL
+ #
# The following character is either MARK_VALUE_1 or MARK_VALUE_2,
# and represents the character that must be in the 'mark' field
# of an object header in order for the object to be considered as
@@ -154,6 +160,15 @@
# which re-acquires 'ready_to_start_lock' and does its job.
# When done it releases 'finished_lock'. The mutator thread is
# responsible for resetting 'collection_running' to 0.
+ #
+ # The collector thread's state can be found (with careful locking)
+ # by inspecting the same variable from the mutator thread:
+ # * collection_running == 1: Marking. [Deletion barrier active.]
+ # * collection_running == 2: Clearing weakrefs.
+ # * collection_running == 3: Marking from unreachable finalizers.
+ # * collection_running == 4: Sweeping.
+ # * collection_running == -1: Done.
+ # The mutex_lock is acquired to go from 1 to 2, and from 2 to 3.
self.collection_running = 0
#self.ready_to_start_lock = ...built in setup()
#self.finished_lock = ...built in setup()
@@ -215,6 +230,10 @@
"'needs_finalizer' and 'contains_weakptr' both specified")
return self._malloc_with_finalizer(typeid, size)
#
+ # Case of weakreferences (test constant-folded)
+ if contains_weakptr:
+ return self._malloc_weakref(typeid, size)
+ #
# Regular case
size_gc_header = self.gcheaderbuilder.size_gc_header
totalsize = size_gc_header + size
@@ -290,13 +309,13 @@
size_gc_header = self.gcheaderbuilder.size_gc_header
totalsize = size_gc_header + size
rawtotalsize = raw_malloc_usage(totalsize)
- ll_assert(rawtotalsize & (WORD - 1) == 0,
- "malloc_slowpath: non-rounded size")
#
if rawtotalsize <= self.small_request_threshold:
#
# Case 1: unless trigger_next_collection() happened to get us
# more locations in free_lists[n], we have run out of them
+ ll_assert(rawtotalsize & (WORD - 1) == 0,
+ "malloc_slowpath: non-rounded size")
n = rawtotalsize >> WORD_POWER_2
head = self.free_lists[n]
if head:
@@ -343,29 +362,47 @@
# the start of the page rather than at the end (Hans Boehm,
# xxx ref).
#
+ return self._malloc_result(typeid, totalsize, result)
else:
# Case 2: the object is too large, so allocate it directly
- # with the system malloc(). xxx on 32-bit, we'll prefer 64-bit
- # alignment of the object by always allocating an 8-bytes header
- rawtotalsize += 8
- block = llarena.arena_malloc(rawtotalsize, 2)
- if not block:
- raise MemoryError
- llarena.arena_reserve(block, self.HDRSIZE)
- blockhdr = llmemory.cast_adr_to_ptr(block, self.HDRPTR)
- set_next(blockhdr, self.nonfree_pages[0])
- self.nonfree_pages[0] = blockhdr
- result = block + 8
+ # with the system malloc().
+ return self._malloc_large_object(typeid, size, 0)
#
+ _malloc_slowpath._dont_inline_ = True
+
+ def _malloc_result(self, typeid, totalsize, result):
llarena.arena_reserve(result, totalsize)
hdr = llmemory.cast_adr_to_ptr(result, self.HDRPTR)
hdr.tid = self.combine(typeid, self.current_mark, 0)
- #
- obj = result + size_gc_header
+ obj = result + self.gcheaderbuilder.size_gc_header
#debug_print("malloc_slowpath", obj)
return llmemory.cast_adr_to_ptr(obj, llmemory.GCREF)
- #
- _malloc_slowpath._dont_inline_ = True
+
+ def _malloc_large_object(self, typeid, size, linked_list):
+ # xxx on 32-bit, we'll prefer 64-bit alignment of the object by
+ # always allocating an 8-bytes header
+ totalsize = self.gcheaderbuilder.size_gc_header + size
+ rawtotalsize = raw_malloc_usage(totalsize)
+ rawtotalsize += 8
+ block = llarena.arena_malloc(rawtotalsize, 2)
+ if not block:
+ raise MemoryError
+ llarena.arena_reserve(block, self.HDRSIZE)
+ blockhdr = llmemory.cast_adr_to_ptr(block, self.HDRPTR)
+ if linked_list == 0:
+ set_next(blockhdr, self.nonfree_pages[0])
+ self.nonfree_pages[0] = blockhdr
+ elif linked_list == 1:
+ set_next(blockhdr, self.finalizer_pages)
+ self.finalizer_pages = blockhdr
+ elif linked_list == 2:
+ set_next(blockhdr, self.weakref_pages)
+ self.weakref_pages = blockhdr
+ else:
+ ll_assert(0, "bad linked_list")
+ return self._malloc_result(typeid, totalsize, block + 8)
+ _malloc_large_object._annspecialcase_ = 'specialize:arg(3)'
+ _malloc_large_object._dont_inline_ = True
def _malloc_varsize_slowpath(self, typeid, length):
#
@@ -397,30 +434,10 @@
_malloc_varsize_slowpath._dont_inline_ = True
def _malloc_with_finalizer(self, typeid, size):
- # XXX a lot of copy and paste from _malloc_slowpath()
- size_gc_header = self.gcheaderbuilder.size_gc_header
- totalsize = size_gc_header + size
- rawtotalsize = raw_malloc_usage(totalsize)
- ll_assert(rawtotalsize & (WORD - 1) == 0,
- "malloc_with_finalizer: non-rounded size")
- rawtotalsize += 8
- block = llarena.arena_malloc(rawtotalsize, 2)
- if not block:
- raise MemoryError
- llarena.arena_reserve(block, self.HDRSIZE)
- blockhdr = llmemory.cast_adr_to_ptr(block, self.HDRPTR)
- # the changes are only in the following two lines: we add the block
- # to a different linked list
- set_next(blockhdr, self.finalizer_pages)
- self.finalizer_pages = blockhdr
- result = block + 8
- #
- llarena.arena_reserve(result, totalsize)
- hdr = llmemory.cast_adr_to_ptr(result, self.HDRPTR)
- hdr.tid = self.combine(typeid, self.current_mark, 0)
- #
- obj = result + size_gc_header
- return llmemory.cast_adr_to_ptr(obj, llmemory.GCREF)
+ return self._malloc_large_object(typeid, size, 1)
+
+ def _malloc_weakref(self, typeid, size):
+ return self._malloc_large_object(typeid, size, 2)
# ----------
# Other functions in the GC API
@@ -553,27 +570,31 @@
# 'free_lists'.
n = 1
while n < self.pagelists_length:
- if self.collect_tails[n] != self.NULL:
- set_next(self.collect_tails[n], self.free_lists[n])
- self.free_lists[n] = self.collect_heads[n]
+ self.free_lists[n] = self.join_lists(self.free_lists[n],
+ self.collect_heads[n],
+ self.collect_tails[n])
n += 1
#
# Do the same with 'collect_heads[0]/collect_tails[0]'.
- if self.collect_tails[0] != self.NULL:
- set_next(self.collect_tails[0], self.nonfree_pages[0])
- self.nonfree_pages[0] = self.collect_heads[0]
+ self.nonfree_pages[0] = self.join_lists(self.nonfree_pages[0],
+ self.collect_heads[0],
+ self.collect_tails[0])
+ #
+ # Do the same with 'collect_weakref_pages/tails'
+ self.weakref_pages = self.join_lists(self.weakref_pages,
+ self.collect_weakref_pages,
+ self.collect_weakref_tails)
#
# Do the same with 'collect_finalizer_pages/tails'
- if self.collect_finalizer_tails != self.NULL:
- set_next(self.collect_finalizer_tails, self.finalizer_pages)
- self.finalizer_pages = self.collect_finalizer_pages
+ self.finalizer_pages = self.join_lists(self.finalizer_pages,
+ self.collect_finalizer_pages,
+ self.collect_finalizer_tails)
#
# Do the same with 'collect_run_finalizers_head/tail'
- if self.collect_run_finalizers_tail != self.NULL:
- set_next(self.collect_run_finalizers_tail,
- self.objects_with_finalizers_to_run)
- self.objects_with_finalizers_to_run = (
- self.collect_run_finalizers_head)
+ self.objects_with_finalizers_to_run = self.join_lists(
+ self.objects_with_finalizers_to_run,
+ self.collect_run_finalizers_head,
+ self.collect_run_finalizers_tail)
#
if self.DEBUG:
self.debug_check_lists()
@@ -587,6 +608,15 @@
ll_assert(self.collection_running == 0,
"collector thread not paused?")
+ def join_lists(self, list1, list2head, list2tail):
+ if list2tail == self.NULL:
+ ll_assert(list2head == self.NULL, "join_lists/1")
+ return list1
+ else:
+ ll_assert(list2head != self.NULL, "join_lists/2")
+ set_next(list2tail, list1)
+ return list2head
+
def execute_finalizers_ll(self):
self.finalizer_lock_count += 1
@@ -668,12 +698,14 @@
while n < self.pagelists_length:
self.collect_pages[n] = self.nonfree_pages[n]
n += 1
+ self.collect_weakref_pages = self.weakref_pages
self.collect_finalizer_pages = self.finalizer_pages
#
# Clear the following lists. When the collector thread finishes,
# it will give back (in collect_{pages,tails}[0] and
# collect_finalizer_{pages,tails}) all the original items that survive.
self.nonfree_pages[0] = self.NULL
+ self.weakref_pages = self.NULL
self.finalizer_pages = self.NULL
#
# Start the collector thread
@@ -703,6 +735,7 @@
while n < self.pagelists_length:
self.debug_check_list(self.free_lists[n])
n += 1
+ self.debug_check_list(self.weakref_pages)
self.debug_check_list(self.finalizer_pages)
self.debug_check_list(self.objects_with_finalizers_to_run)
@@ -776,19 +809,15 @@
self.release(self.finished_lock)
break
#
- # Mark
+ # Mark # collection_running == 1
self.collector_mark()
- self.collection_running = 2
- #debug_print("collection_running = 2")
- #
+ # # collection_running == 2
+ self.deal_with_weakrefs()
+ # # collection_running == 3
self.deal_with_objects_with_finalizers()
- #
- # Sweep
+ # Sweep # collection_running == 4
self.collector_sweep()
- #
- # Done!
- self.collection_running = -1
- #debug_print("collection_running = -1")
+ # Done! # collection_running == -1
self.release(self.finished_lock)
@@ -797,8 +826,8 @@
"bad mark value")
return mark ^ (MARK_VALUE_1 ^ MARK_VALUE_2)
- def is_marked(self, obj, current_mark):
- return self.get_mark(obj) == current_mark
+ def is_marked_or_static(self, obj, current_mark):
+ return self.get_mark(obj) != self.other_mark(current_mark)
def set_mark(self, obj, newmark):
_set_mark(self.header(obj), newmark)
@@ -827,7 +856,6 @@
obj = self.extra_objects_to_mark.pop()
self.get_mark(obj)
self.gray_objects.append(obj)
- self.release(self.mutex_lock)
#
# If 'gray_objects' is empty, we are done: there should be
# no possible case in which more objects are being added to
@@ -837,12 +865,19 @@
# been marked.
if not self.gray_objects.non_empty():
break
+ #
+ # Else release mutex_lock and try again.
+ self.release(self.mutex_lock)
+ #
+ self.collection_running = 2
+ #debug_print("collection_running = 2")
+ self.release(self.mutex_lock)
def _collect_mark(self):
current_mark = self.current_mark
while self.gray_objects.non_empty():
obj = self.gray_objects.pop()
- if not self.is_marked(obj, current_mark):
+ if not self.is_marked_or_static(obj, current_mark):
#
# Scan the content of 'obj'. We use a snapshot-at-the-
# beginning order, meaning that we want to scan the state
@@ -883,14 +918,15 @@
self.gray_objects.append(obj)
def collector_sweep(self):
+ self._collect_sweep_large_objects()
+ #
n = 1
while n < self.pagelists_length:
self._collect_sweep_pages(n)
n += 1
- # do this *after* the other one, so that the blocks are not free'd
- # before we get a chance to inspect them, to see if they contain
- # an object that is still alive (needed for weakrefs)
- self._collect_sweep_large_objects()
+ #
+ self.collection_running = -1
+ #debug_print("collection_running = -1")
def _collect_sweep_large_objects(self):
block = self.collect_pages[0]
@@ -962,29 +998,6 @@
llarena.arena_reset(adr + size_of_int,
object_size - size_of_int, 2)
#
- elif mark == marked:
- # the location contains really an object, which is marked.
- # check the typeid to see if it's a weakref. XXX could
- # be faster
- # XXX actually it's very wrong: if we read a weakref out
- # of an object during collection_running==1, or during
- # collection_running==2 but before we reach that point,
- # the target object will never be marked
- tid = hdr.tid
- type_id = llop.extract_high_ushort(llgroup.HALFWORD, tid)
- wroffset = self.weakpointer_offset(type_id)
- if wroffset >= 0:
- obj = adr + size_gc_header
- pointing_to = (obj + wroffset).address[0]
- if pointing_to != llmemory.NULL:
- pt_adr = pointing_to - size_gc_header
- pt_hdr = llmemory.cast_adr_to_ptr(pt_adr,
- self.HDRPTR)
- if (pt_hdr.tid & 0xFF) == nonmarked:
- # this weakref points to an object that is
- # still not marked, so clear it
- (obj + wroffset).address[0] = llmemory.NULL
- #
i -= object_size
#
page = list_next(page)
@@ -992,6 +1005,87 @@
self.collect_heads[n] = linked_list
self.collect_tails[n] = first_loc_in_linked_list
+
+ # ----------
+ # Weakrefs
+
+ def weakref_deref(self, wrobj):
+ # Weakrefs need some care. This code acts as a read barrier.
+ # The only way I found is to acquire the mutex_lock to prevent
+ # the collection thread from going from collection_running==1
+ # to collection_running==2, or from collection_running==2 to
+ # collection_running==3.
+ #
+ self.acquire(self.mutex_lock)
+ #
+ targetobj = gctypelayout.ll_weakref_deref(wrobj)
+ if targetobj != llmemory.NULL:
+ #
+ if self.collection_running == 1:
+ # If we are in the phase collection_running==1, we don't
+ # know if the object will be scanned a bit later or
+ # not; so we have to assume that it survives, and
+ # force it to be scanned.
+ self.get_mark(targetobj)
+ self.extra_objects_to_mark.append(targetobj)
+ #
+ elif self.collection_running == 2:
+ # In the phase collection_running==2, if the object is
+ # not marked it's too late; we have to detect that case
+ # and return NULL instead here, as if the corresponding
+ # collector phase was already finished (deal_with_weakrefs).
+ # Otherwise we would be returning an object that is about to
+ # be swept away.
+ if not self.is_marked_or_static(targetobj, self.current_mark):
+ targetobj = llmemory.NULL
+ #
+ else:
+ # In other phases we are fine.
+ pass
+ #
+ self.release(self.mutex_lock)
+ #
+ return targetobj
+
+ def deal_with_weakrefs(self):
+ size_gc_header = self.gcheaderbuilder.size_gc_header
+ current_mark = self.current_mark
+ weakref_page = self.collect_weakref_pages
+ self.collect_weakref_pages = self.NULL
+ self.collect_weakref_tails = self.NULL
+ while weakref_page != self.NULL:
+ next_page = list_next(weakref_page)
+ #
+ # If the weakref points to a dead object, make it point to NULL.
+ x = llmemory.cast_ptr_to_adr(weakref_page)
+ x = llarena.getfakearenaaddress(x) + 8
+ hdr = llmemory.cast_adr_to_ptr(x, self.HDRPTR)
+ type_id = llop.extract_high_ushort(llgroup.HALFWORD, hdr.tid)
+ offset = self.weakpointer_offset(type_id)
+ ll_assert(offset >= 0, "bad weakref")
+ obj = x + size_gc_header
+ pointing_to = (obj + offset).address[0]
+ if (pointing_to == llmemory.NULL or
+ not self.is_marked_or_static(pointing_to, current_mark)):
+ # 'pointing_to' dies: relink to self.collect_pages[0]
+ (obj + offset).address[0] = llmemory.NULL
+ set_next(weakref_page, self.collect_pages[0])
+ self.collect_pages[0] = weakref_page
+ else:
+ # the weakref stays alive
+ set_next(weakref_page, self.collect_weakref_pages)
+ self.collect_weakref_pages = weakref_page
+ if self.collect_weakref_tails == self.NULL:
+ self.collect_weakref_tails = weakref_page
+ #
+ weakref_page = next_page
+ #
+ self.acquire(self.mutex_lock)
+ self.collection_running = 3
+ #debug_print("collection_running = 3")
+ self.release(self.mutex_lock)
+
+
# ----------
# Finalizers
@@ -1025,7 +1119,7 @@
# marked: relink into the collect_finalizer_pages list
set_next(finalizer_page, self.collect_finalizer_pages)
self.collect_finalizer_pages = finalizer_page
- if self.collect_finalizer_tails != self.NULL:
+ if self.collect_finalizer_tails == self.NULL:
self.collect_finalizer_tails = finalizer_page
#
finalizer_page = next_page
@@ -1035,6 +1129,9 @@
ll_assert(not self.extra_objects_to_mark.non_empty(),
"should not see objects only reachable from finalizers "
"before we run them")
+ #
+ self.collection_running = 4
+ #debug_print("collection_running = 4")
# ____________________________________________________________
diff --git a/pypy/rpython/memory/gctransform/framework.py b/pypy/rpython/memory/gctransform/framework.py
--- a/pypy/rpython/memory/gctransform/framework.py
+++ b/pypy/rpython/memory/gctransform/framework.py
@@ -246,13 +246,20 @@
else:
self.incr_stack_ptr = None
self.decr_stack_ptr = None
- self.weakref_deref_ptr = self.inittime_helper(
- ll_weakref_deref, [llmemory.WeakRefPtr], llmemory.Address)
-
+
classdef = bk.getuniqueclassdef(GCClass)
s_gc = annmodel.SomeInstance(classdef)
s_gcref = annmodel.SomePtr(llmemory.GCREF)
+ if GCClass.needs_weakref_read_barrier:
+ self.gc_weakref_deref_ptr = getfn(
+ GCClass.weakref_deref.im_func,
+ [s_gc, annmodel.SomePtr(llmemory.WeakRefPtr)],
+ annmodel.SomeAddress())
+ else:
+ self.weakref_deref_ptr = self.inittime_helper(
+ ll_weakref_deref, [llmemory.WeakRefPtr], llmemory.Address)
+
malloc_fixedsize_clear_meth = GCClass.malloc_fixedsize_clear.im_func
self.malloc_fixedsize_clear_ptr = getfn(
malloc_fixedsize_clear_meth,
@@ -947,9 +954,15 @@
def gct_weakref_deref(self, hop):
v_wref, = hop.spaceop.args
- v_addr = hop.genop("direct_call",
- [self.weakref_deref_ptr, v_wref],
- resulttype=llmemory.Address)
+ if hasattr(self, 'gc_weakref_deref_ptr'):
+ v_addr = hop.genop("direct_call",
+ [self.gc_weakref_deref_ptr,
+ self.c_const_gc, v_wref],
+ resulttype=llmemory.Address)
+ else:
+ v_addr = hop.genop("direct_call",
+ [self.weakref_deref_ptr, v_wref],
+ resulttype=llmemory.Address)
hop.cast_result(v_addr)
def gct_gc_identityhash(self, hop):
diff --git a/pypy/rpython/memory/gcwrapper.py b/pypy/rpython/memory/gcwrapper.py
--- a/pypy/rpython/memory/gcwrapper.py
+++ b/pypy/rpython/memory/gcwrapper.py
@@ -129,8 +129,12 @@
return llmemory.cast_ptr_to_weakrefptr(result)
def weakref_deref(self, PTRTYPE, obj):
- addr = gctypelayout.ll_weakref_deref(obj)
- return llmemory.cast_adr_to_ptr(addr, PTRTYPE)
+ if self.gc.needs_weakref_read_barrier:
+ addr = self.gc.weakref_deref(obj)
+ return llmemory.cast_adr_to_ptr(addr, PTRTYPE)
+ else:
+ addr = gctypelayout.ll_weakref_deref(obj)
+ return llmemory.cast_adr_to_ptr(addr, PTRTYPE)
def gc_id(self, ptr):
ptr = lltype.cast_opaque_ptr(llmemory.GCREF, ptr)
diff --git a/pypy/rpython/memory/test/test_gc.py b/pypy/rpython/memory/test/test_gc.py
--- a/pypy/rpython/memory/test/test_gc.py
+++ b/pypy/rpython/memory/test/test_gc.py
@@ -934,3 +934,6 @@
def test_finalizer_calls_malloc_during_minor_collect(self):
py.test.skip("check if this test is valid here")
+
+ def test_weakref_to_object_with_finalizer_ordering(self):
+ py.test.skip("frees weakrefs before calling finalizers")
More information about the pypy-commit
mailing list