[pypy-svn] r33235 - in pypy/dist/pypy/objspace: . cclp
auc at codespeak.net
auc at codespeak.net
Thu Oct 12 19:20:21 CEST 2006
Author: auc
Date: Thu Oct 12 19:20:18 2006
New Revision: 33235
Modified:
pypy/dist/pypy/objspace/cclp/misc.py
pypy/dist/pypy/objspace/cclp/scheduler.py
pypy/dist/pypy/objspace/cclp/space.py
pypy/dist/pypy/objspace/cclp/thunk.py
pypy/dist/pypy/objspace/logic.py
Log:
(auc, ale) scheduler refactoring that should help a bit some other tasks
Modified: pypy/dist/pypy/objspace/cclp/misc.py
==============================================================================
--- pypy/dist/pypy/objspace/cclp/misc.py (original)
+++ pypy/dist/pypy/objspace/cclp/misc.py Thu Oct 12 19:20:18 2006
@@ -27,6 +27,10 @@
def get_current_cspace(space):
curr = ClonableCoroutine.w_getcurrent(space)
assert isinstance(curr, ClonableCoroutine)
+ if curr._cspace is None:
+ if not we_are_translated():
+ import pdb
+ pdb.set_trace()
return curr._cspace
def interp_id(space, w_obj):
Modified: pypy/dist/pypy/objspace/cclp/scheduler.py
==============================================================================
--- pypy/dist/pypy/objspace/cclp/scheduler.py (original)
+++ pypy/dist/pypy/objspace/cclp/scheduler.py Thu Oct 12 19:20:18 2006
@@ -1,179 +1,101 @@
from pypy.rpython.objectmodel import we_are_translated
from pypy.interpreter.error import OperationError
-from pypy.interpreter import gateway
+from pypy.interpreter import gateway, baseobjspace
from pypy.objspace.std.listobject import W_ListObject
from pypy.objspace.cclp.types import W_Var, W_FailedValue, aliases
-from pypy.objspace.cclp.misc import w, v, ClonableCoroutine
-from pypy.objspace.cclp.space import W_CSpace
+from pypy.objspace.cclp.misc import w, v, ClonableCoroutine, get_current_cspace
from pypy.objspace.cclp.global_state import sched
#-- Singleton scheduler ------------------------------------------------
-class Scheduler(object):
+class TopLevelScheduler(object):
- def __init__(self, space):
+ # we are dealing with cspaces
+
+ def __init__(self, space, top_level_space):
+ w("NEW TOPLEVEL SCHEDULER", str(id(self)), "with", str(id(top_level_space)))
self.space = space
- self._main = ClonableCoroutine.w_getcurrent(space)
- assert isinstance(self._main, ClonableCoroutine)
+ sched.main_thread._cspace = top_level_space
self._switch_count = 0
- self._init_head(self._main)
- self._blocked = {} # thread set
+ self._head = top_level_space
+ self._head._next = self._head._prev = self._head
+ # asking for stability
+ self._asking = {} # cspace -> thread set
+ self._asking[top_level_space] = {}
# variables suspension lists
+ self._blocked = {}
self._blocked_on = {} # var -> threads
self._blocked_byneed = {} # var -> threads
- self._asking = {} # thread -> cspace
- # more accounting
- self._per_space_live_threads = {} # space -> nb runnable threads
- self._traced = {} # thread -> vars
- w("MAIN THREAD = ", str(id(self._main)))
-
- def _init_head(self, thread):
- assert isinstance(thread, ClonableCoroutine)
- self._head = thread
- # for the reset case
- self._head._next = self._head._prev = self._head
-
- def _chain_insert(self, thread):
- assert thread._next is thread
- assert thread._prev is thread
- assert isinstance(thread, ClonableCoroutine)
- assert isinstance(thread._next, ClonableCoroutine)
- assert isinstance(thread._prev, ClonableCoroutine)
+
+ def _chain_insert(self, group):
+ assert group._next is group
+ assert group._prev is group
+ assert isinstance(group, W_ThreadGroupScheduler)
+ assert isinstance(group._next, W_ThreadGroupScheduler)
+ assert isinstance(group._prev, W_ThreadGroupScheduler)
r = self._head
l = r._prev
- l._next = thread
- r._prev = thread
- thread._prev = l
- thread._next = r
-
- #-- cspace helper
-
- def is_stable(self, cspace):
- assert isinstance(cspace, W_CSpace)
- if cspace not in self._per_space_live_threads.keys():
- #XXX meaning ?
- return True
- return self._per_space_live_threads[cspace] == 0
-
- def wait_stable(self, cspace):
- assert isinstance(cspace, W_CSpace)
- if self.is_stable(cspace):
- return
- curr = ClonableCoroutine.w_getcurrent(self.space)
- assert isinstance(curr, ClonableCoroutine)
- self._asking[curr] = cspace
- self._blocked[curr] = True
- # either we choose() from inside
- if curr._cspace == cspace:
- self.dec_live_thread_count(cspace)
- self.schedule()
- self.inc_live_thread_count(cspace)
- else: # or we ask(), or clone() from outside
- self.schedule()
-
- #-- cspace -> thread_count helpers
-
- def inc_live_thread_count(self, cspace):
- assert isinstance(cspace, W_CSpace)
- count = self._per_space_live_threads.get(cspace, 0) + 1
- self._per_space_live_threads[cspace] = count
- return count
-
- def dec_live_thread_count(self, cspace):
- assert isinstance(cspace, W_CSpace)
- count = self._per_space_live_threads[cspace] - 1
- assert count >= 0
- self._per_space_live_threads[cspace] = count
- return count
- #-- /
-
- #-- to be used by logic objspace
+ l._next = group
+ r._prev = group
+ group._prev = l
+ group._next = r
def schedule(self):
to_be_run = self._select_next()
- assert isinstance(to_be_run, ClonableCoroutine)
- #w(".. SWITCHING", str(id(ClonableCoroutine.w_getcurrent(self.space))), "=>", str(id(to_be_run)))
+ assert isinstance(to_be_run, W_ThreadGroupScheduler)
+ w(".. SWITCHING (spaces)", str(id(get_current_cspace(self.space))), "=>", str(id(to_be_run)))
self._switch_count += 1
- to_be_run.w_switch()
+ to_be_run.schedule()
- def schedule_or_pass(self):
- to_be_run = self._select_next(dont_pass=False)
- assert isinstance(to_be_run, ClonableCoroutine)
- curr = ClonableCoroutine.w_getcurrent(self.space)
- if to_be_run == curr:
- w(".. PASS")
- return
- #w(".. SWITCHING", str(id(curr)), "=>", str(id(to_be_run)))
- self._switch_count += 1
- to_be_run.w_switch()
-
- def _select_next(self, dont_pass=True):
+ def _select_next(self):
to_be_run = self._head
sentinel = to_be_run
- current = ClonableCoroutine.w_getcurrent(self.space)
- assert isinstance(current, ClonableCoroutine)
- while (to_be_run in self._blocked) \
- or (to_be_run == current):
-
+ while to_be_run.is_blocked():
+ # check stability + asking status, give a chance to run
+ if to_be_run.is_runnable():
+ break
to_be_run = to_be_run._next
- assert isinstance(to_be_run, ClonableCoroutine)
- # asking threads
- if to_be_run in self._asking:
- if self.is_stable(self._asking[to_be_run]):
- del self._asking[to_be_run]
- del self._blocked[to_be_run]
- break
+ assert isinstance(to_be_run, W_ThreadGroupScheduler)
if to_be_run == sentinel:
- if not dont_pass:
- return current
- w(str(sched_info(self.space)))
- ## we RESET sched state so as to keep being usable beyond that
reset_scheduler(self.space)
- sched.uler._main = sched.uler._head = self._head
w(".. SCHEDULER reinitialized")
raise OperationError(self.space.w_AllBlockedError,
self.space.wrap("can't schedule, probable deadlock in sight"))
self._head = to_be_run
return to_be_run
- def add_new_thread(self, thread):
- "insert 'thread' at end of running queue"
- assert isinstance(thread, ClonableCoroutine)
- # cspace account mgmt
- if thread._cspace is not None:
- self._per_space_live_threads.get(thread._cspace, 0)
- self.inc_live_thread_count(thread._cspace)
- self._chain_insert(thread)
- def remove_thread(self, thread):
- assert isinstance(thread, ClonableCoroutine)
- w(".. REMOVING", str(id(thread)))
- assert thread not in self._blocked
- try:
- del self._traced[thread]
- except KeyError:
- pass
- #w(".. removing non-traced thread")
- l = thread._prev
- r = thread._next
+ def add_new_group(self, group):
+ "insert 'group' at end of running queue"
+ assert isinstance(group, W_ThreadGroupScheduler)
+ w(".. ADDING group", str(id(group)))
+ self._asking[group] = {}
+ self._chain_insert(group)
+
+ def remove_group(self, group):
+ assert isinstance(group, W_ThreadGroupScheduler)
+ w(".. REMOVING group", str(id(group)))
+ l = group._prev
+ r = group._next
l._next = r
r._prev = l
self._head = r
- if r == thread: #XXX write a test for me !
+ if r == group:
+ # IS AN ERROR
if not we_are_translated():
import traceback
traceback.print_exc()
- thread._next = thread._prev = None
- # cspace/threads account mgmt
- if thread._cspace is not None:
- cspace = thread._cspace
- live = self.dec_live_thread_count(cspace)
- if live == 0:
- del self._per_space_live_threads[cspace]
+ group._next = group._prev = None
+ # unblock all threads asking stability of this group
+ for th in self._asking[group]:
+ del self._blocked[th]
+ th._cspace.blocked_count -= 1
+ del self._asking[group]
+
def add_to_blocked_on(self, w_var, thread):
- #w(".. we BLOCK thread", str(id(thread)), "on var", str(w_var))
+ w(".. we BLOCK thread", str(id(thread)), "on var", str(w_var))
assert isinstance(w_var, W_Var)
assert isinstance(thread, ClonableCoroutine)
assert thread not in self._blocked
@@ -184,26 +106,25 @@
self._blocked_on[w_var] = blocked
blocked.append(thread)
self._blocked[thread] = True
- # cspace accounting
- if thread._cspace is not None:
- self.dec_live_thread_count(thread._cspace)
+ # stability, accounting, etc
+ self._post_blocking(thread)
+
def unblock_on(self, w_var):
- #v(".. we UNBLOCK threads dependants of var", str(w_var))
+ v(".. we UNBLOCK threads dependants of var", str(w_var))
assert isinstance(w_var, W_Var)
blocked = []
if w_var in self._blocked_on:
blocked = self._blocked_on[w_var]
del self._blocked_on[w_var]
- #w(str([id(thr) for thr in blocked]))
+ w(str([id(thr) for thr in blocked]))
for thr in blocked:
del self._blocked[thr]
- # cspace accounting
- if thr._cspace is not None:
- self.inc_live_thread_count(thr._cspace)
+ thr._cspace.blocked_count -= 1
+ #XXX sync the un/block byneed stuff with above, later
def add_to_blocked_byneed(self, w_var, thread):
- #w(".. we BLOCK BYNEED thread", str(id(thread)), "on var", str(w_var))
+ w(".. we BLOCK BYNEED thread", str(id(thread)), "on var", str(w_var))
assert isinstance(w_var, W_Var)
assert isinstance(thread, ClonableCoroutine)
if w_var in self._blocked_byneed:
@@ -213,12 +134,10 @@
self._blocked_byneed[w_var] = blocked
blocked.append(thread)
self._blocked[thread] = True
- # cspace accounting
- if thread._cspace is not None:
- self.dec_live_thread_count(thread._cspace)
+ self._post_blocking(thread)
def unblock_byneed_on(self, w_var):
- #v(".. we UNBLOCK BYNEED dependants of var", str(w_var))
+ v(".. we UNBLOCK BYNEED dependants of var", str(w_var))
assert isinstance(w_var, W_Var)
blocked = []
for w_alias in aliases(self.space, w_var):
@@ -226,43 +145,62 @@
blocked += self._blocked_byneed[w_alias]
del self._blocked_byneed[w_alias]
w_alias.needed = True
- #w(str([id(thr) for thr in blocked]))
+ w(str([id(thr) for thr in blocked]))
for thr in blocked:
del self._blocked[thr]
- # cspace accounting
- if thr._cspace is not None:
- self.inc_live_thread_count(thr._cspace)
-
+ thr._cspace.blocked_count -= 1
+
+ def _post_blocking(self, thread):
+ # check that those asking for stability in the home space
+ # of the thread can be unblocked
+ home = thread._cspace
+ home.blocked_count += 1
+ if home.is_stable():
+ for th in sched.uler._asking[home].keys():
+ # these asking threads must be unblocked, in their
+ # respective home spaces
+ del sched.uler._blocked[th]
+ th._cspace.blocked_count -= 1
+ sched.uler._asking[home] = {}
+
+ # delegated to thread group
+ def add_new_thread(self, thread):
+ tg = get_current_cspace(self.space)
+ tg.add_new_thread(thread)
+
+ def remove_thread(self, thread):
+ tg = get_current_cspace(self.space)
+ tg.remove_thread(thread)
- # Logic Variables tracing, helps exception propagation
- # amongst threads
def trace_vars(self, thread, lvars):
- assert isinstance(thread, ClonableCoroutine)
- assert isinstance(lvars, list)
- #w(".. TRACING logic vars.", str(lvars), "for", str(id(thread)))
- #assert not self._traced.has_key(thread) doesn't translate
- self._traced[thread] = lvars
+ tg = get_current_cspace(self.space)
+ tg.trace_vars(thread, lvars)
def dirty_traced_vars(self, thread, failed_value):
- assert isinstance(thread, ClonableCoroutine)
- assert isinstance(failed_value, W_FailedValue)
- #w(".. DIRTYING traced vars")
- for w_var in self._traced[thread]:
- if self.space.is_true(self.space.is_free(w_var)):
- self.space.bind(w_var, failed_value)
+ tg = get_current_cspace(self.space)
+ tg.dirty_traced_vars(thread, failed_value)
- def w_threads(self):
+ def wait_stable(self):
+ tg = get_current_cspace(self.space)
+ tg.wait_stable()
+
+ # statistics
+ def sched_info(self):
s = self.space
- thl = [s.newint(id(self._head))]
- assert isinstance(self._head, ClonableCoroutine)
+ si = self.space.setitem
+ w_all = s.newdict()
+ si(w_all, s.newint(id(self._head)), self._head.group_info())
+ assert isinstance(self._head, W_ThreadGroupScheduler)
curr = self._head._next
while curr != self._head:
- assert isinstance(curr, ClonableCoroutine)
- thl.append(s.newint(id(curr)))
+ assert isinstance(curr, W_ThreadGroupScheduler)
+ si(w_all, s.newint(id(curr)), curr.group_info())
curr = curr._next
- w_t = W_ListObject(thl)
- return w_t
-
+ si(w_all, s.wrap('blocked'), self.w_blocked())
+ si(w_all, s.wrap('blocked_on'), self.w_blocked_on())
+ si(w_all, s.wrap('blocked_byneed'), self.w_blocked_byneed())
+ return w_all
+
def w_blocked(self):
s = self.space
w_b = W_ListObject([s.newint(id(th))
@@ -289,44 +227,186 @@
si(w_bb, s.wrap(str(var)), w_l)
return w_bb
- def w_space_accounting(self):
- s = self.space
- si = s.setitem
- w_a = s.newdict()
- for sp, thc in self._per_space_live_threads.items():
- si(w_a, s.newint(id(sp)), s.newint(thc))
- return w_a
- def w_asking(self):
+#-- Thread Group scheduler --------------------------------------
+
+class W_ThreadGroupScheduler(baseobjspace.Wrappable):
+
+ def __init__(self, space):
+ self.space = space
+ self._switch_count = 0
+ self._traced = {} # thread -> vars
+ self.thread_count = 1
+ self.blocked_count = 0
+
+ def _init_head(self, thread):
+ assert isinstance(thread, ClonableCoroutine)
+ self._head = thread
+ thread._next = thread._prev = thread
+ assert self._head._next == self._head
+ w("HEAD (main) THREAD = ", str(id(self._head)))
+
+ def _chain_insert(self, thread):
+ assert thread._next is thread
+ assert thread._prev is thread
+ assert isinstance(thread, ClonableCoroutine)
+ assert isinstance(thread._next, ClonableCoroutine)
+ assert isinstance(thread._prev, ClonableCoroutine)
+ r = self._head
+ l = r._prev
+ l._next = thread
+ r._prev = thread
+ thread._prev = l
+ thread._next = r
+
+ def is_blocked(self):
+ return self.thread_count == self.blocked_count
+
+ def is_stable(self):
+ # first approx.
+ return self.is_blocked()
+
+ def is_runnable(self):
+ if not self.is_stable():
+ return True
+ asking_from_within = [th for th in sched.uler._asking[self]
+ if th._cspace == self]
+ return len(asking_from_within)
+
+ def wait_stable(self):
+ w("WAIT_STABLE on space", id(self), "from space",
+ id(str(get_current_cspace(self.space))))
+ if self.is_stable():
+ return
+ curr = ClonableCoroutine.w_getcurrent(self.space)
+ assert isinstance(curr, ClonableCoroutine)
+ asking = sched.uler._asking
+ if asking.has_key(self):
+ asking[self][curr] = True
+ else:
+ asking[self] = {curr:True}
+ sched.uler._blocked[curr] = True
+ curr._cspace.blocked_count += 1
+ sched.uler.schedule()
+
+ def schedule(self):
+ if not self.is_runnable():
+ raise OperationError(self.space.w_AllBlockedError,
+ self.space.wrap("ouch, that's a BUG"))
+ to_be_run = self._select_next()
+ if to_be_run == ClonableCoroutine.w_getcurrent(self.space):
+ return
+ assert isinstance(to_be_run, ClonableCoroutine)
+ w(".. SWITCHING (treads)", str(id(ClonableCoroutine.w_getcurrent(self.space))), "=>", str(id(to_be_run)))
+ self._switch_count += 1
+ to_be_run.w_switch()
+
+ def _select_next(self):
+ to_be_run = self._head._next
+ sentinel = to_be_run
+ while to_be_run in sched.uler._blocked:
+ if to_be_run in sched.uler._asking[self]:
+ for th in sched.uler._asking[self]:
+ del sched.uler._blocked[th]
+ th._cspace.blocked_count -= 1
+ sched.uler._asking[self] = {}
+ break
+ assert isinstance(to_be_run, ClonableCoroutine)
+ to_be_run = to_be_run._next
+ if to_be_run == sentinel:
+ if not we_are_translated():
+ import pdb
+ pdb.set_trace()
+ self._head = to_be_run
+ return to_be_run
+
+ def add_new_thread(self, thread):
+ "insert 'thread' at end of running queue"
+ w(".. ADDING thread", str(id(thread)), "to group", str(id(self)), "count ==", str(self.thread_count))
+ assert isinstance(thread, ClonableCoroutine)
+ self._chain_insert(thread)
+ self.thread_count += 1
+
+ def remove_thread(self, thread):
+ assert isinstance(thread, ClonableCoroutine)
+ w(".. REMOVING thread", str(id(thread)))
+ assert thread not in sched.uler._blocked
+ try:
+ del self._traced[thread]
+ except KeyError:
+ w(".. removing non-traced thread")
+ l = thread._prev
+ r = thread._next
+ l._next = r
+ r._prev = l
+ self._head = r
+ if r == thread:
+ # that means thread was the last one
+ # the group is about to die
+ pass
+ thread._next = thread._prev = None
+ self.thread_count -= 1
+ if self.thread_count == 0:
+ sched.uler.remove_group(self)
+
+ # Logic Variables tracing, "accelerates" exception propagation
+ # amongst threads
+ def trace_vars(self, thread, lvars):
+ assert isinstance(thread, ClonableCoroutine)
+ assert isinstance(lvars, list)
+ #w(".. TRACING logic vars.", str(lvars), "for", str(id(thread)))
+ #assert not self._traced.has_key(thread) doesn't translate
+ self._traced[thread] = lvars
+
+ def dirty_traced_vars(self, thread, failed_value):
+ assert isinstance(thread, ClonableCoroutine)
+ assert isinstance(failed_value, W_FailedValue)
+ #w(".. DIRTYING traced vars")
+ for w_var in self._traced[thread]:
+ if self.space.is_true(self.space.is_free(w_var)):
+ self.space.bind(w_var, failed_value)
+
+ def w_threads(self):
s = self.space
- si = s.setitem
- w_a = s.newdict()
- for th, sp in self._asking.items():
- si(w_a, s.newint(id(th)), s.newint(id(sp)))
- return w_a
+ thl = [s.newint(id(self._head))]
+ assert isinstance(self._head, ClonableCoroutine)
+ curr = self._head._next
+ while curr != self._head:
+ assert isinstance(curr, ClonableCoroutine)
+ thl.append(s.newint(id(curr)))
+ curr = curr._next
+ w_t = W_ListObject(thl)
+ return w_t
+ def w_asking(self):
+ asking = sched.uler._asking.get(self, None)
+ if not asking:
+ return self.space.w_None
+ return W_ListObject([self.space.newint(id(th))
+ for th in asking.keys()])
+
+ def group_info(self):
+ s = self
+ si = self.space.setitem
+ sw = self.space.wrap
+ w_ret = self.space.newdict()
+ si(w_ret, sw('switches'), self.space.newint(s._switch_count))
+ si(w_ret, sw('threads'), s.w_threads())
+ si(w_ret, sw('asking'), s.w_asking())
+ return w_ret
#-- Misc --------------------------------------------------
def reset_scheduler(space):
- sched.uler = Scheduler(space)
+ tg = W_ThreadGroupScheduler(space)
+ tg._init_head(sched.main_thread)
+ sched.uler = TopLevelScheduler(space, tg)
app_reset_scheduler = gateway.interp2app(reset_scheduler)
def sched_info(space):
- s = sched.uler
- si = space.setitem
- sw = space.wrap
- w_ret = space.newdict()
- si(w_ret, sw('switches'), space.newint(s._switch_count))
- si(w_ret, sw('threads'), s.w_threads())
- si(w_ret, sw('blocked'), s.w_blocked())
- si(w_ret, sw('blocked_on'), s.w_blocked_on())
- si(w_ret, sw('blocked_byneed'), s.w_blocked_byneed())
- si(w_ret, sw('space_accounting'), s.w_space_accounting())
- si(w_ret, sw('asking'), s.w_asking())
- return w_ret
+ return sched.uler.sched_info()
app_sched_info = gateway.interp2app(sched_info)
def schedule(space):
"useful til we get preemtive scheduling deep into the vm"
- sched.uler.schedule_or_pass()
+ sched.uler.schedule()
app_schedule = gateway.interp2app(schedule)
Modified: pypy/dist/pypy/objspace/cclp/space.py
==============================================================================
--- pypy/dist/pypy/objspace/cclp/space.py (original)
+++ pypy/dist/pypy/objspace/cclp/space.py Thu Oct 12 19:20:18 2006
@@ -18,21 +18,11 @@
W_CVar, W_AbstractDomain, W_AbstractDistributor
from pypy.objspace.cclp.interp_var import interp_bind, interp_free
from pypy.objspace.cclp.constraint.distributor import distribute
+from pypy.objspace.cclp.scheduler import W_ThreadGroupScheduler
-def _newspace(space, w_callable, __args__):
- args = __args__.normalize()
- w_coro = ClonableCoroutine(space)
- #w_callable : a logic or constraint script
- thunk = CSpaceThunk(space, w_callable, args, w_coro)
- w_coro.bind(thunk)
- if not we_are_translated():
- w("NEWSPACE, (distributor) thread", str(id(w_coro)), "for", str(w_callable.name))
- w_space = W_CSpace(space)
- w_coro._cspace = w_space
- sched.uler.add_new_thread(w_coro)
- return w_space
class NewSpaceThunk(AbstractThunk):
+ "container thread for one comp. space"
def __init__(self, space, w_callable, __args__, thread):
self.space = space
self.thread = thread
@@ -49,14 +39,26 @@
sched.uler.remove_thread(self.thread)
sched.uler.schedule()
+def _newspace(space, w_callable, __args__):
+ args = __args__.normalize()
+ dist_thread = ClonableCoroutine(space)
+ thunk = CSpaceThunk(space, w_callable, args, dist_thread)
+ dist_thread.bind(thunk)
+ if not we_are_translated():
+ w("NEWSPACE, (distributor) thread", str(id(dist_thread)), "for", str(w_callable.name))
+ w_space = W_CSpace(space, dist_thread)
+ return w_space
+
def newspace(space, w_callable, __args__):
- thread = ClonableCoroutine(space)
- thunk = NewSpaceThunk(space, w_callable, __args__, thread)
- thread.bind(thunk)
- sched.uler.add_new_thread(thread)
+ "application level creation of a new computation space"
+ outer_thread = ClonableCoroutine(space)
+ outer_thread._cspace = get_current_cspace(space)
+ thunk = NewSpaceThunk(space, w_callable, __args__, outer_thread)
+ outer_thread.bind(thunk)
+ sched.uler.add_new_thread(outer_thread)
sched.uler.schedule() # XXX assumption: thread will be executed before we get back there
cspace = thunk.cspace
- cspace._container = thread
+ cspace._container = outer_thread
return cspace
app_newspace = gateway.interp2app(newspace, unwrap_spec=[baseobjspace.ObjSpace,
baseobjspace.W_Root,
@@ -94,11 +96,17 @@
w_dist.w_distribute(choice)
app_fresh_distributor = gateway.interp2app(fresh_distributor)
-class W_CSpace(baseobjspace.Wrappable):
- def __init__(self, space):
- self.space = space # the object space ;-)
- self.distributor = None
+class W_CSpace(W_ThreadGroupScheduler):
+
+ def __init__(self, space, dist_thread):
+ W_ThreadGroupScheduler.__init__(self, space)
+ dist_thread._cspace = self
+ self._init_head(dist_thread)
+ self._next = self._prev = self
+ sched.uler.add_new_group(self)
+
+ self.distributor = None # dist instance != thread
self._container = None # thread that 'contains' us
# choice mgmt
self._choice = newvar(space)
@@ -149,7 +157,7 @@
for const in self._constraints:
ccopy = const.copy()
new_cspace.tell(ccopy)
- sched.uler.wait_stable(new_cspace)
+ new_cspace.wait_stable()
return new_cspace
else:
# the theory is that
@@ -166,7 +174,7 @@
return everything.cspace
def w_ask(self):
- sched.uler.wait_stable(self)
+ self.wait_stable()
self.space.wait(self._choice)
choice = self._choice.w_bound_to
self._choice = newvar(self.space)
@@ -176,7 +184,7 @@
def choose(self, n):
assert n > 1
- sched.uler.wait_stable(self)
+ self.wait_stable()
if self._failed: #XXX set by any propagator
raise ConsistencyError
assert interp_free(self._choice)
Modified: pypy/dist/pypy/objspace/cclp/thunk.py
==============================================================================
--- pypy/dist/pypy/objspace/cclp/thunk.py (original)
+++ pypy/dist/pypy/objspace/cclp/thunk.py Thu Oct 12 19:20:18 2006
@@ -27,6 +27,7 @@
class ProcedureThunk(_AppThunk):
+ "used by thread.stacklet"
def __init__(self, space, w_callable, args, coro):
_AppThunk.__init__(self, space, coro.costate, w_callable, args)
self._coro = coro
@@ -108,7 +109,6 @@
return
assert interp_free(cspace._choice)
interp_bind(cspace._choice, self.space.newint(1))
- assert sched.uler._per_space_live_threads[cspace] == 0, "there are still threads living in this space"
finally:
interp_bind(cspace._finished, self.space.w_True)
sched.uler.remove_thread(self._coro)
Modified: pypy/dist/pypy/objspace/logic.py
==============================================================================
--- pypy/dist/pypy/objspace/logic.py (original)
+++ pypy/dist/pypy/objspace/logic.py Thu Oct 12 19:20:18 2006
@@ -16,14 +16,15 @@
from pypy.objspace.cclp.thread import app_future, app_stacklet, app_this_thread
-from pypy.objspace.cclp.scheduler import Scheduler, app_sched_info, \
+from pypy.objspace.cclp.scheduler import TopLevelScheduler, app_sched_info, \
app_schedule, app_reset_scheduler
from pypy.objspace.cclp.global_state import sched
#-- COMP. SPACE --------------------------------------------
-from pypy.objspace.cclp.space import app_newspace, app_choose, W_CSpace, app_tell
+from pypy.objspace.cclp.space import app_newspace, app_choose, W_ThreadGroupScheduler, \
+ W_CSpace, app_tell
#-- VARIABLE ------------------------------------------------
@@ -33,7 +34,7 @@
from pypy.objspace.cclp.constraint.variable import app_domain
-from pypy.objspace.cclp.types import app_domain_of
+from pypy.objspace.cclp.types import app_domain_of, ClonableCoroutine
all_mms.update(variable_mms)
@@ -189,6 +190,7 @@
space.model.typeorder[W_Future] = [(W_Future, None), (W_Var, None)]
space.model.typeorder[W_CVar] = [(W_CVar, None), (W_Var, None)]
space.model.typeorder[W_CSpace] = [(W_CSpace, None), (baseobjspace.Wrappable, None)]
+ space.model.typeorder[W_ThreadGroupScheduler] = [(W_ThreadGroupScheduler, None), (W_CSpace, None)]
space.model.typeorder[W_FiniteDomain] = [(W_FiniteDomain, None), (W_Root, None)]
@@ -296,6 +298,9 @@
# do the magic
patch_space_in_place(space, 'logic', proxymaker)
# instantiate singleton scheduler
- sched.uler = Scheduler(space)
+ sched.main_thread = ClonableCoroutine.w_getcurrent(space)
+ tg = W_ThreadGroupScheduler(space)
+ tg._init_head(sched.main_thread)
+ sched.uler = TopLevelScheduler(space, tg)
return space
More information about the Pypy-commit
mailing list