[pypy-commit] pypy stm-gc: Start to refactor the RPython interface to be higher-level. This should
arigo
noreply at buildbot.pypy.org
Sun Apr 22 12:17:24 CEST 2012
Author: Armin Rigo <arigo at tunes.org>
Branch: stm-gc
Changeset: r54613:9290ce3f4d40
Date: 2012-04-22 12:16 +0200
http://bitbucket.org/pypy/pypy/changeset/9290ce3f4d40/
Log: Start to refactor the RPython interface to be higher-level. This
should make it easier to call directly C code from the higher-level
code without passing through lower-level interfaces. No need to
access from RPython the ll_thread module with locks and so on; this
should all be done in C.
Untested code so far (it doesn't run at all, missing levels, but I'd
like to add proper testing anyway).
diff --git a/pypy/rlib/rstm.py b/pypy/rlib/rstm.py
--- a/pypy/rlib/rstm.py
+++ b/pypy/rlib/rstm.py
@@ -1,79 +1,146 @@
-import threading
-from pypy.rlib.objectmodel import specialize, we_are_translated
+from pypy.rpython.lltypesystem import lltype, llmemory, rffi
+from pypy.rpython.lltypesystem.lloperation import llop
+from pypy.rpython.annlowlevel import llhelper, cast_instance_to_base_ptr
+from pypy.rpython.annlowlevel import base_ptr_lltype, cast_base_ptr_to_instance
from pypy.rlib.objectmodel import keepalive_until_here
-from pypy.rlib.debug import ll_assert
-from pypy.rpython.lltypesystem import lltype, llmemory, rffi, rclass
-from pypy.rpython.lltypesystem.lloperation import llop
-from pypy.rpython.annlowlevel import (cast_base_ptr_to_instance,
- cast_instance_to_base_ptr,
- llhelper)
from pypy.translator.stm.stmgcintf import StmOperations
-_global_lock = threading.RLock()
- at specialize.memo()
-def _get_stm_callback(func, argcls):
- def _stm_callback(llarg, retry_counter):
- llop.stm_start_transaction(lltype.Void)
- if we_are_translated():
- llarg = rffi.cast(rclass.OBJECTPTR, llarg)
- arg = cast_base_ptr_to_instance(argcls, llarg)
+
+NUM_THREADS_DEFAULT = 4 # XXX for now
+
+
+class TransactionError(Exception):
+ pass
+
+class Transaction(object):
+ _next_transaction = None
+ retry_counter = 0
+
+ def run(self):
+ raise NotImplementedError
+
+
+def run_all_transactions(initial_transaction,
+ num_threads = NUM_THREADS_DEFAULT):
+ if StmOperations.in_transaction():
+ raise TransactionError("nested call to rstm.run_all_transactions()")
+ #
+ _transactionalstate.initialize()
+ #
+ # Tell the GC we are entering transactional mode. This makes
+ # sure that 'initial_transaction' is flagged as GLOBAL.
+ # No more GC operation afterwards!
+ llop.stm_enter_transactional_mode(lltype.Void)
+ #
+ # Keep alive 'initial_transaction'. In truth we would like it to
+ # survive a little bit longer, for the beginning of the C code in
+ # run_all_transactions(). This should be equivalent because there
+ # is no possibility of having a GC collection inbetween.
+ keepalive_until_here(initial_transaction)
+ #
+ # Tell the C code to run all transactions.
+ callback = llhelper(_CALLBACK, _run_transaction)
+ ptr = _cast_transaction_to_voidp(initial_transaction)
+ StmOperations.run_all_transactions(callback, ptr, num_threads)
+ #
+ # Tell the GC we are leaving transactional mode.
+ llop.stm_leave_transactional_mode(lltype.Void)
+ #
+ # If an exception was raised, re-raise it here.
+ _transactionalstate.close_exceptions()
+
+
+_CALLBACK = lltype.Ptr(lltype.FuncType([rffi.VOIDP, lltype.Signed],
+ rffi.VOIDP))
+
+def _cast_transaction_to_voidp(transaction):
+ ptr = cast_instance_to_base_ptr(transaction)
+ return lltype.cast_pointer(rffi.VOIDP, ptr)
+
+def _cast_voidp_to_transaction(transactionptr):
+ ptr = lltype.cast_pointer(base_ptr_lltype(), transactionptr)
+ return cast_base_ptr_to_instance(Transaction, ptr)
+
+
+class _TransactionalState(object):
+ def initialize(self):
+ self._reraise_exception = None
+
+ def has_exception(self):
+ return self._reraise_exception is not None
+
+ def must_reraise_exception(self, got_exception):
+ self._got_exception = got_exception
+ self._reraise_exception = self.reraise_exception_callback
+
+ def close_exceptions(self):
+ if self._reraise_exception is not None:
+ self._reraise_exception()
+
+ @staticmethod
+ def reraise_exception_callback():
+ exc = _transactionalstate._got_exception
+ self._got_exception = None
+ raise exc
+
+_transactionalstate = _TransactionalState()
+
+
+def _run_transaction(transactionptr, retry_counter):
+ #
+ # Tell the GC we are starting a transaction
+ llop.stm_start_transaction(lltype.Void)
+ #
+ # Now we can use the GC
+ next = None
+ try:
+ if _transactionalstate.has_exception():
+ # a previously committed transaction raised: don't do anything
+ # more in this transaction
+ pass
else:
- arg = lltype.TLS.stm_callback_arg
- try:
- res = func(arg, retry_counter)
- ll_assert(res is None, "stm_callback should return None")
- finally:
- llop.stm_commit_transaction(lltype.Void)
- return lltype.nullptr(rffi.VOIDP.TO)
- return _stm_callback
+ # run!
+ next = _run_really(transactionptr, retry_counter)
+ #
+ except Exception, e:
+ _transactionalstate.must_reraise_exception(e)
+ #
+ # Stop using the GC. This will make 'next' and all transactions linked
+ # from there GLOBAL objects.
+ llop.stm_stop_transaction(lltype.Void)
+ #
+ # Mark 'next' as kept-alive-until-here. In truth we would like to
+ # keep it alive after the return, for the C code. This should be
+ # equivalent because there is no possibility of having a GC collection
+ # inbetween.
+ keepalive_until_here(next)
+ return _cast_transaction_to_voidp(next)
- at specialize.arg(0, 1)
-def perform_transaction(func, argcls, arg):
- ll_assert(arg is None or isinstance(arg, argcls),
- "perform_transaction: wrong class")
- if we_are_translated():
- llarg = cast_instance_to_base_ptr(arg)
- llarg = rffi.cast(rffi.VOIDP, llarg)
- adr_of_top = llop.gc_adr_of_root_stack_top(llmemory.Address)
- else:
- # only for tests: we want (1) to test the calls to the C library,
- # but also (2) to work with multiple Python threads, so we acquire
- # and release some custom GIL here --- even though it doesn't make
- # sense from an STM point of view :-/
- _global_lock.acquire()
- lltype.TLS.stm_callback_arg = arg
- llarg = lltype.nullptr(rffi.VOIDP.TO)
- adr_of_top = llmemory.NULL
- #
- callback = _get_stm_callback(func, argcls)
- llcallback = llhelper(StmOperations.CALLBACK_TX, callback)
- StmOperations.perform_transaction(llcallback, llarg, adr_of_top)
- keepalive_until_here(arg)
- if not we_are_translated():
- _global_lock.release()
-def enter_transactional_mode():
- llop.stm_enter_transactional_mode(lltype.Void)
+def _run_really(transactionptr, retry_counter):
+ # Call the RPython method run() on the Transaction instance.
+ # This logic is in a sub-function because we want to catch
+ # the MemoryErrors that could occur.
+ transaction = _cast_voidp_to_transaction(transactionptr)
+ ll_assert(transaction._next_transaction is None,
+ "_next_transaction should be cleared by C code")
+ transaction.retry_counter = retry_counter
+ new_transactions = transaction.run()
+ return _link_new_transactions(new_transactions)
+_run_really._dont_inline_ = True
-def leave_transactional_mode():
- llop.stm_leave_transactional_mode(lltype.Void)
-
-def descriptor_init():
- if not we_are_translated(): _global_lock.acquire()
- llop.stm_descriptor_init(lltype.Void)
- if not we_are_translated(): _global_lock.release()
-
-def descriptor_done():
- if not we_are_translated(): _global_lock.acquire()
- llop.stm_descriptor_done(lltype.Void)
- if not we_are_translated(): _global_lock.release()
-
-def _debug_get_state():
- if not we_are_translated(): _global_lock.acquire()
- res = StmOperations._debug_get_state()
- if not we_are_translated(): _global_lock.release()
- return res
-
-def thread_id():
- return StmOperations.thread_id()
+def _link_new_transactions(new_transactions):
+ # in order to schedule the new transactions, we have to return a
+ # raw pointer to the first one, with their field '_next_transaction'
+ # making a linked list. The C code reads directly from this
+ # field '_next_transaction'.
+ if new_transactions is None:
+ return None
+ n = len(new_transactions) - 1
+ next = None
+ while n >= 0:
+ new_transactions[n]._next_transaction = next
+ next = new_transactions[n]
+ n -= 1
+ return next
diff --git a/pypy/translator/stm/test/targetdemo.py b/pypy/translator/stm/test/targetdemo.py
--- a/pypy/translator/stm/test/targetdemo.py
+++ b/pypy/translator/stm/test/targetdemo.py
@@ -1,8 +1,6 @@
-from pypy.rpython.lltypesystem import lltype, rffi
-from pypy.module.thread import ll_thread
-from pypy.rlib import rstm, rgc
-from pypy.rlib.debug import debug_print
-from pypy.rpython.annlowlevel import llhelper
+from pypy.rpython.lltypesystem import rffi
+from pypy.rlib import rstm
+from pypy.rlib.debug import debug_print, ll_assert
class Node:
@@ -21,10 +19,7 @@
pass
-def add_at_end_of_chained_list(arg, retry_counter):
- assert arg.foobar == 42
- node = arg.anchor
- value = arg.value
+def add_at_end_of_chained_list(node, value):
x = Node(value)
while node.next:
node = node.next
@@ -59,65 +54,40 @@
print "check ok!"
-def increment_done(arg, retry_counter):
- print "thread done."
- glob.done += 1
-
def _check_pointer(arg1):
arg1.foobar = 40 # now 'arg1' is local
return arg1
-def check_pointer_equality(arg, retry_counter):
- res = _check_pointer(arg)
- if res is not arg:
- debug_print("ERROR: bogus pointer equality")
- raise AssertionError
- raw1 = rffi.cast(rffi.CCHARP, retry_counter)
- raw2 = rffi.cast(rffi.CCHARP, -1)
- if raw1 == raw2:
- debug_print("ERROR: retry_counter == -1")
- raise AssertionError
+class CheckPointerEquality(rstm.Transaction):
+ def __init__(self, arg):
+ self.arg = arg
+ def run(self):
+ res = _check_pointer(self.arg) # 'self.arg' reads a GLOBAL object
+ ll_assert(res is self.arg, "ERROR: bogus pointer equality")
+ raw1 = rffi.cast(rffi.CCHARP, self.retry_counter)
+ raw2 = rffi.cast(rffi.CCHARP, -1)
+ ll_assert(raw1 == raw2, "ERROR: retry_counter == -1")
-def run_me():
- rstm.descriptor_init()
- try:
- debug_print("thread starting...")
- arg = glob._arg
- ll_thread.release_NOAUTO(glob.lock)
- arg.foobar = 41
- rstm.perform_transaction(check_pointer_equality, Arg, arg)
- i = 0
- while i < glob.LENGTH:
- arg.anchor = glob.anchor
- arg.value = i
- arg.foobar = 42
- rstm.perform_transaction(add_at_end_of_chained_list, Arg, arg)
- i += 1
- rstm.perform_transaction(increment_done, Arg, arg)
- finally:
- rstm.descriptor_done()
+class MakeChain(rstm.Transaction):
+ def __init__(self, anchor, value):
+ self.anchor = anchor
+ self.value = value
+ def run(self):
+ add_at_end_of_chained_list(self.anchor, self.value)
+ self.value += 1
+ if self.value < glob.LENGTH:
+ return [self] # re-schedule the same Transaction object
-
- at rgc.no_collect # don't use the gc as long as other threads are running
-def _run():
- i = 0
- while i < glob.NUM_THREADS:
- glob._arg = glob._arglist[i]
- ll_run_me = llhelper(ll_thread.CALLBACK, run_me)
- ll_thread.c_thread_start_NOGIL(ll_run_me)
- ll_thread.acquire_NOAUTO(glob.lock, True)
- i += 1
- debug_print("sleeping...")
- while glob.done < glob.NUM_THREADS: # poor man's lock
- _sleep(rffi.cast(rffi.ULONG, 1))
- debug_print("done sleeping.")
-
-
-# Posix only
-_sleep = rffi.llexternal('sleep', [rffi.ULONG], rffi.ULONG,
- _nowrapper=True,
- random_effects_on_gcobjs=False)
-
+class InitialTransaction(rstm.Transaction):
+ def run(self):
+ ll_assert(self.retry_counter == 0, "no reason to abort-and-retry here")
+ scheduled = []
+ for i in range(glob.NUM_THREADS):
+ arg = Arg()
+ arg.foobar = 41
+ scheduled.append(CheckPointerEquality(arg))
+ scheduled.append(MakeChain(glob.anchor, 0))
+ return scheduled
# __________ Entry point __________
@@ -129,14 +99,9 @@
glob.LENGTH = int(argv[2])
if len(argv) > 3:
glob.USE_MEMORY = bool(int(argv[3]))
- glob.done = 0
- glob.lock = ll_thread.allocate_ll_lock()
- ll_thread.acquire_NOAUTO(glob.lock, True)
- glob._arglist = [Arg() for i in range(glob.NUM_THREADS)]
#
- rstm.enter_transactional_mode()
- _run()
- rstm.leave_transactional_mode()
+ rstm.run_all_transactions(InitialTransaction(),
+ num_threads=glob.NUM_THREADS)
#
check_chained_list(glob.anchor.next)
return 0
@@ -145,3 +110,7 @@
def target(*args):
return entry_point, None
+
+if __name__ == '__main__':
+ import sys
+ entry_point(sys.argv)
More information about the pypy-commit
mailing list