[pypy-svn] r33235 - in pypy/dist/pypy/objspace: . cclp

auc at codespeak.net auc at codespeak.net
Thu Oct 12 19:20:21 CEST 2006


Author: auc
Date: Thu Oct 12 19:20:18 2006
New Revision: 33235

Modified:
   pypy/dist/pypy/objspace/cclp/misc.py
   pypy/dist/pypy/objspace/cclp/scheduler.py
   pypy/dist/pypy/objspace/cclp/space.py
   pypy/dist/pypy/objspace/cclp/thunk.py
   pypy/dist/pypy/objspace/logic.py
Log:
(auc, ale) scheduler refactoring that should help a bit some other tasks


Modified: pypy/dist/pypy/objspace/cclp/misc.py
==============================================================================
--- pypy/dist/pypy/objspace/cclp/misc.py	(original)
+++ pypy/dist/pypy/objspace/cclp/misc.py	Thu Oct 12 19:20:18 2006
@@ -27,6 +27,10 @@
 def get_current_cspace(space):
     curr = ClonableCoroutine.w_getcurrent(space)
     assert isinstance(curr, ClonableCoroutine)
+    if curr._cspace is None:
+        if not we_are_translated():
+            import pdb
+            pdb.set_trace()
     return curr._cspace
 
 def interp_id(space, w_obj):

Modified: pypy/dist/pypy/objspace/cclp/scheduler.py
==============================================================================
--- pypy/dist/pypy/objspace/cclp/scheduler.py	(original)
+++ pypy/dist/pypy/objspace/cclp/scheduler.py	Thu Oct 12 19:20:18 2006
@@ -1,179 +1,101 @@
 from pypy.rpython.objectmodel import we_are_translated
 from pypy.interpreter.error import OperationError
-from pypy.interpreter import gateway
+from pypy.interpreter import gateway, baseobjspace
 from pypy.objspace.std.listobject import W_ListObject
 
 from pypy.objspace.cclp.types import W_Var, W_FailedValue, aliases
-from pypy.objspace.cclp.misc import w, v, ClonableCoroutine
-from pypy.objspace.cclp.space import W_CSpace
+from pypy.objspace.cclp.misc import w, v, ClonableCoroutine, get_current_cspace
 from pypy.objspace.cclp.global_state import sched
 
 #-- Singleton scheduler ------------------------------------------------
 
-class Scheduler(object):
+class TopLevelScheduler(object):
 
-    def __init__(self, space):
+    # we are dealing with cspaces
+
+    def __init__(self, space, top_level_space):
+        w("NEW TOPLEVEL SCHEDULER", str(id(self)), "with", str(id(top_level_space)))
         self.space = space
-        self._main = ClonableCoroutine.w_getcurrent(space)
-        assert isinstance(self._main, ClonableCoroutine)
+        sched.main_thread._cspace = top_level_space
         self._switch_count = 0
-        self._init_head(self._main)
-        self._blocked = {} # thread set
+        self._head = top_level_space
+        self._head._next = self._head._prev = self._head
+        # asking for stability
+        self._asking = {} # cspace -> thread set
+        self._asking[top_level_space] = {}
         # variables suspension lists
+        self._blocked = {}
         self._blocked_on = {} # var -> threads
         self._blocked_byneed = {} # var -> threads
-        self._asking = {} # thread -> cspace
-        # more accounting
-        self._per_space_live_threads = {} # space -> nb runnable threads
-        self._traced = {} # thread -> vars
-        w("MAIN THREAD = ", str(id(self._main)))
-
-    def _init_head(self, thread):
-        assert isinstance(thread, ClonableCoroutine)
-        self._head = thread
-        # for the reset case
-        self._head._next = self._head._prev = self._head
-            
-    def _chain_insert(self, thread):
-        assert thread._next is thread
-        assert thread._prev is thread
-        assert isinstance(thread, ClonableCoroutine)
-        assert isinstance(thread._next, ClonableCoroutine)
-        assert isinstance(thread._prev, ClonableCoroutine)
+        
+    def _chain_insert(self, group):
+        assert group._next is group
+        assert group._prev is group
+        assert isinstance(group, W_ThreadGroupScheduler)
+        assert isinstance(group._next, W_ThreadGroupScheduler)
+        assert isinstance(group._prev, W_ThreadGroupScheduler)
         r = self._head
         l = r._prev
-        l._next = thread
-        r._prev = thread
-        thread._prev = l
-        thread._next = r
-
-    #-- cspace helper
-
-    def is_stable(self, cspace):
-        assert isinstance(cspace, W_CSpace)
-        if cspace not in self._per_space_live_threads.keys():
-            #XXX meaning ?
-            return True
-        return self._per_space_live_threads[cspace] == 0
-
-    def wait_stable(self, cspace):
-        assert isinstance(cspace, W_CSpace)
-        if self.is_stable(cspace):
-            return
-        curr = ClonableCoroutine.w_getcurrent(self.space)
-        assert isinstance(curr, ClonableCoroutine)
-        self._asking[curr] = cspace
-        self._blocked[curr] = True
-        # either we choose() from inside
-        if curr._cspace == cspace:
-            self.dec_live_thread_count(cspace)
-            self.schedule()
-            self.inc_live_thread_count(cspace)
-        else: # or we ask(), or clone() from outside
-            self.schedule()
-
-    #-- cspace -> thread_count helpers
-    
-    def inc_live_thread_count(self, cspace):
-        assert isinstance(cspace, W_CSpace)
-        count = self._per_space_live_threads.get(cspace, 0) + 1
-        self._per_space_live_threads[cspace]  = count
-        return count
-
-    def dec_live_thread_count(self, cspace):
-        assert isinstance(cspace, W_CSpace)
-        count = self._per_space_live_threads[cspace] - 1
-        assert count >= 0
-        self._per_space_live_threads[cspace] = count
-        return count 
-    #-- /
-
-    #-- to be used by logic objspace
+        l._next = group
+        r._prev = group
+        group._prev = l
+        group._next = r
 
     def schedule(self):
         to_be_run = self._select_next()
-        assert isinstance(to_be_run, ClonableCoroutine)
-        #w(".. SWITCHING", str(id(ClonableCoroutine.w_getcurrent(self.space))), "=>", str(id(to_be_run)))
+        assert isinstance(to_be_run, W_ThreadGroupScheduler)
+        w(".. SWITCHING (spaces)", str(id(get_current_cspace(self.space))), "=>", str(id(to_be_run)))
         self._switch_count += 1
-        to_be_run.w_switch() 
+        to_be_run.schedule() 
 
-    def schedule_or_pass(self):
-        to_be_run = self._select_next(dont_pass=False)
-        assert isinstance(to_be_run, ClonableCoroutine)
-        curr = ClonableCoroutine.w_getcurrent(self.space)
-        if to_be_run == curr:
-            w(".. PASS")
-            return
-        #w(".. SWITCHING", str(id(curr)), "=>", str(id(to_be_run)))
-        self._switch_count += 1
-        to_be_run.w_switch() 
-        
-    def _select_next(self, dont_pass=True):
+    def _select_next(self):
         to_be_run = self._head
         sentinel = to_be_run
-        current = ClonableCoroutine.w_getcurrent(self.space)
-        assert isinstance(current, ClonableCoroutine)
-        while (to_be_run in self._blocked) \
-                  or (to_be_run == current):
-            
+        while to_be_run.is_blocked():
+            # check stability + asking status, give a chance to run
+            if to_be_run.is_runnable():
+                break
             to_be_run = to_be_run._next
-            assert isinstance(to_be_run, ClonableCoroutine)
-            # asking threads
-            if to_be_run in self._asking:
-                if self.is_stable(self._asking[to_be_run]):
-                    del self._asking[to_be_run]
-                    del self._blocked[to_be_run]
-                    break
+            assert isinstance(to_be_run, W_ThreadGroupScheduler)
             if to_be_run == sentinel:
-                if not dont_pass:
-                    return current
-                w(str(sched_info(self.space)))
-                ## we RESET sched state so as to keep being usable beyond that
                 reset_scheduler(self.space)
-                sched.uler._main = sched.uler._head = self._head
                 w(".. SCHEDULER reinitialized")
                 raise OperationError(self.space.w_AllBlockedError,
                                      self.space.wrap("can't schedule, probable deadlock in sight"))
         self._head = to_be_run
         return to_be_run
 
-    def add_new_thread(self, thread):
-        "insert 'thread' at end of running queue"
-        assert isinstance(thread, ClonableCoroutine)
-        # cspace account mgmt
-        if thread._cspace is not None:
-            self._per_space_live_threads.get(thread._cspace, 0)
-            self.inc_live_thread_count(thread._cspace)
-        self._chain_insert(thread)
 
-    def remove_thread(self, thread):
-        assert isinstance(thread, ClonableCoroutine)
-        w(".. REMOVING", str(id(thread)))
-        assert thread not in self._blocked
-        try:
-            del self._traced[thread]
-        except KeyError:
-            pass
-            #w(".. removing non-traced thread")
-        l = thread._prev
-        r = thread._next
+    def add_new_group(self, group):
+        "insert 'group' at end of running queue"
+        assert isinstance(group, W_ThreadGroupScheduler)
+        w(".. ADDING group", str(id(group)))
+        self._asking[group] = {}
+        self._chain_insert(group)
+
+    def remove_group(self, group):
+        assert isinstance(group, W_ThreadGroupScheduler)
+        w(".. REMOVING group", str(id(group)))
+        l = group._prev
+        r = group._next
         l._next = r
         r._prev = l
         self._head = r
-        if r == thread: #XXX write a test for me !
+        if r == group:
+            # IS AN ERROR
             if not we_are_translated():
                 import traceback
                 traceback.print_exc()
-        thread._next = thread._prev = None
-        # cspace/threads account mgmt
-        if thread._cspace is not None:
-            cspace = thread._cspace
-            live = self.dec_live_thread_count(cspace)
-            if live == 0:
-                del self._per_space_live_threads[cspace]
+        group._next = group._prev = None
+        # unblock all threads asking stability of this group
+        for th in self._asking[group]:
+            del self._blocked[th]
+            th._cspace.blocked_count -= 1
+        del self._asking[group]
+
 
     def add_to_blocked_on(self, w_var, thread):
-        #w(".. we BLOCK thread", str(id(thread)), "on var", str(w_var))
+        w(".. we BLOCK thread", str(id(thread)), "on var", str(w_var))
         assert isinstance(w_var, W_Var)
         assert isinstance(thread, ClonableCoroutine)
         assert thread not in self._blocked
@@ -184,26 +106,25 @@
             self._blocked_on[w_var] = blocked
         blocked.append(thread)
         self._blocked[thread] = True
-        # cspace accounting
-        if thread._cspace is not None:
-            self.dec_live_thread_count(thread._cspace)
+        # stability, accounting, etc
+        self._post_blocking(thread)
 
+            
     def unblock_on(self, w_var):
-        #v(".. we UNBLOCK threads dependants of var", str(w_var))
+        v(".. we UNBLOCK threads dependants of var", str(w_var))
         assert isinstance(w_var, W_Var)
         blocked = []
         if w_var in self._blocked_on:
             blocked = self._blocked_on[w_var]
             del self._blocked_on[w_var]
-        #w(str([id(thr) for thr in blocked]))
+        w(str([id(thr) for thr in blocked]))
         for thr in blocked:
             del self._blocked[thr]
-            # cspace accounting
-            if thr._cspace is not None:
-                self.inc_live_thread_count(thr._cspace)
+            thr._cspace.blocked_count -= 1
 
+    #XXX sync the un/block byneed stuff with above, later
     def add_to_blocked_byneed(self, w_var, thread):
-        #w(".. we BLOCK BYNEED thread", str(id(thread)), "on var", str(w_var))
+        w(".. we BLOCK BYNEED thread", str(id(thread)), "on var", str(w_var))
         assert isinstance(w_var, W_Var)
         assert isinstance(thread, ClonableCoroutine)
         if w_var in self._blocked_byneed:
@@ -213,12 +134,10 @@
             self._blocked_byneed[w_var] = blocked
         blocked.append(thread)
         self._blocked[thread] = True
-        # cspace accounting
-        if thread._cspace is not None:
-            self.dec_live_thread_count(thread._cspace)
+        self._post_blocking(thread)
 
     def unblock_byneed_on(self, w_var):
-        #v(".. we UNBLOCK BYNEED dependants of var", str(w_var))
+        v(".. we UNBLOCK BYNEED dependants of var", str(w_var))
         assert isinstance(w_var, W_Var)
         blocked = []
         for w_alias in aliases(self.space, w_var):
@@ -226,43 +145,62 @@
                 blocked += self._blocked_byneed[w_alias]
                 del self._blocked_byneed[w_alias]
             w_alias.needed = True
-        #w(str([id(thr) for thr in blocked]))
+        w(str([id(thr) for thr in blocked]))
         for thr in blocked:
             del self._blocked[thr]
-            # cspace accounting
-            if thr._cspace is not None:
-                self.inc_live_thread_count(thr._cspace)
-            
+            thr._cspace.blocked_count -= 1
+
+    def _post_blocking(self, thread):
+        # check that those asking for stability in the home space
+        # of the thread can be unblocked
+        home = thread._cspace
+        home.blocked_count += 1
+        if home.is_stable():
+            for th in sched.uler._asking[home].keys():
+                # these asking threads must be unblocked, in their
+                # respective home spaces
+                del sched.uler._blocked[th] 
+                th._cspace.blocked_count -= 1
+            sched.uler._asking[home] = {}
+
+    # delegated to thread group
+    def add_new_thread(self, thread):
+        tg = get_current_cspace(self.space)
+        tg.add_new_thread(thread)
+
+    def remove_thread(self, thread):
+        tg = get_current_cspace(self.space)
+        tg.remove_thread(thread)
 
-    # Logic Variables tracing, helps exception propagation
-    # amongst threads
     def trace_vars(self, thread, lvars):
-        assert isinstance(thread, ClonableCoroutine)
-        assert isinstance(lvars, list)
-        #w(".. TRACING logic vars.", str(lvars), "for", str(id(thread)))
-        #assert not self._traced.has_key(thread) doesn't translate 
-        self._traced[thread] = lvars
+        tg = get_current_cspace(self.space)
+        tg.trace_vars(thread, lvars)
 
     def dirty_traced_vars(self, thread, failed_value):
-        assert isinstance(thread, ClonableCoroutine)
-        assert isinstance(failed_value, W_FailedValue)
-        #w(".. DIRTYING traced vars")
-        for w_var in self._traced[thread]:
-            if self.space.is_true(self.space.is_free(w_var)):
-                self.space.bind(w_var, failed_value)
+        tg = get_current_cspace(self.space)
+        tg.dirty_traced_vars(thread, failed_value)
 
-    def w_threads(self):
+    def wait_stable(self):
+        tg = get_current_cspace(self.space)
+        tg.wait_stable()
+
+    # statistics
+    def sched_info(self):
         s = self.space
-        thl = [s.newint(id(self._head))]
-        assert isinstance(self._head, ClonableCoroutine)
+        si = self.space.setitem
+        w_all = s.newdict()
+        si(w_all, s.newint(id(self._head)), self._head.group_info())
+        assert isinstance(self._head, W_ThreadGroupScheduler)
         curr = self._head._next
         while curr != self._head:
-            assert isinstance(curr, ClonableCoroutine)
-            thl.append(s.newint(id(curr)))
+            assert isinstance(curr, W_ThreadGroupScheduler)
+            si(w_all, s.newint(id(curr)), curr.group_info())
             curr = curr._next
-        w_t = W_ListObject(thl)
-        return w_t
-
+        si(w_all, s.wrap('blocked'), self.w_blocked())
+        si(w_all, s.wrap('blocked_on'), self.w_blocked_on())
+        si(w_all, s.wrap('blocked_byneed'), self.w_blocked_byneed())
+        return w_all
+        
     def w_blocked(self):
         s = self.space
         w_b = W_ListObject([s.newint(id(th))
@@ -289,44 +227,186 @@
             si(w_bb, s.wrap(str(var)), w_l)
         return w_bb
 
-    def w_space_accounting(self):
-        s = self.space
-        si = s.setitem
-        w_a = s.newdict()
-        for sp, thc in self._per_space_live_threads.items():
-            si(w_a, s.newint(id(sp)), s.newint(thc))
-        return w_a
 
-    def w_asking(self):
+#-- Thread Group scheduler --------------------------------------
+
+class W_ThreadGroupScheduler(baseobjspace.Wrappable):
+
+    def __init__(self, space):
+        self.space = space
+        self._switch_count = 0
+        self._traced = {} # thread -> vars
+        self.thread_count = 1
+        self.blocked_count = 0
+
+    def _init_head(self, thread):
+        assert isinstance(thread, ClonableCoroutine)
+        self._head = thread
+        thread._next = thread._prev = thread
+        assert self._head._next == self._head
+        w("HEAD (main) THREAD = ", str(id(self._head)))
+            
+    def _chain_insert(self, thread):
+        assert thread._next is thread
+        assert thread._prev is thread
+        assert isinstance(thread, ClonableCoroutine)
+        assert isinstance(thread._next, ClonableCoroutine)
+        assert isinstance(thread._prev, ClonableCoroutine)
+        r = self._head
+        l = r._prev
+        l._next = thread
+        r._prev = thread
+        thread._prev = l
+        thread._next = r
+
+    def is_blocked(self):
+        return self.thread_count == self.blocked_count
+
+    def is_stable(self):
+        # first approx.
+        return self.is_blocked()
+
+    def is_runnable(self):
+        if not self.is_stable():
+            return True
+        asking_from_within = [th for th in sched.uler._asking[self]
+                              if th._cspace == self]
+        return len(asking_from_within)
+
+    def wait_stable(self):
+        w("WAIT_STABLE on space", id(self), "from space",
+          id(str(get_current_cspace(self.space))))
+        if self.is_stable():
+            return
+        curr = ClonableCoroutine.w_getcurrent(self.space)
+        assert isinstance(curr, ClonableCoroutine)
+        asking = sched.uler._asking
+        if asking.has_key(self):
+            asking[self][curr] = True
+        else:
+            asking[self] = {curr:True}
+        sched.uler._blocked[curr] = True
+        curr._cspace.blocked_count += 1
+        sched.uler.schedule()
+
+    def schedule(self):
+        if not self.is_runnable():
+            raise OperationError(self.space.w_AllBlockedError,
+                                 self.space.wrap("ouch, that's a BUG"))
+        to_be_run = self._select_next()
+        if to_be_run == ClonableCoroutine.w_getcurrent(self.space):
+            return
+        assert isinstance(to_be_run, ClonableCoroutine)
+        w(".. SWITCHING (treads)", str(id(ClonableCoroutine.w_getcurrent(self.space))), "=>", str(id(to_be_run)))
+        self._switch_count += 1
+        to_be_run.w_switch() 
+        
+    def _select_next(self):
+        to_be_run = self._head._next
+        sentinel = to_be_run
+        while to_be_run in sched.uler._blocked:
+            if to_be_run in sched.uler._asking[self]:
+                for th in sched.uler._asking[self]:
+                    del sched.uler._blocked[th]
+                    th._cspace.blocked_count -= 1
+                sched.uler._asking[self] = {}
+                break
+            assert isinstance(to_be_run, ClonableCoroutine)
+            to_be_run = to_be_run._next
+            if to_be_run == sentinel:
+                if not we_are_translated():
+                    import pdb
+                    pdb.set_trace()
+        self._head = to_be_run
+        return to_be_run
+
+    def add_new_thread(self, thread):
+        "insert 'thread' at end of running queue"
+        w(".. ADDING thread", str(id(thread)), "to group", str(id(self)), "count ==", str(self.thread_count))
+        assert isinstance(thread, ClonableCoroutine)
+        self._chain_insert(thread)
+        self.thread_count += 1
+
+    def remove_thread(self, thread):
+        assert isinstance(thread, ClonableCoroutine)
+        w(".. REMOVING thread", str(id(thread)))
+        assert thread not in sched.uler._blocked
+        try:
+            del self._traced[thread]
+        except KeyError:
+            w(".. removing non-traced thread")
+        l = thread._prev
+        r = thread._next
+        l._next = r
+        r._prev = l
+        self._head = r
+        if r == thread:
+            # that means thread was the last one
+            # the group is about to die
+            pass
+        thread._next = thread._prev = None
+        self.thread_count -= 1
+        if self.thread_count == 0:
+            sched.uler.remove_group(self)
+
+    # Logic Variables tracing, "accelerates" exception propagation
+    # amongst threads
+    def trace_vars(self, thread, lvars):
+        assert isinstance(thread, ClonableCoroutine)
+        assert isinstance(lvars, list)
+        #w(".. TRACING logic vars.", str(lvars), "for", str(id(thread)))
+        #assert not self._traced.has_key(thread) doesn't translate 
+        self._traced[thread] = lvars
+
+    def dirty_traced_vars(self, thread, failed_value):
+        assert isinstance(thread, ClonableCoroutine)
+        assert isinstance(failed_value, W_FailedValue)
+        #w(".. DIRTYING traced vars")
+        for w_var in self._traced[thread]:
+            if self.space.is_true(self.space.is_free(w_var)):
+                self.space.bind(w_var, failed_value)
+
+    def w_threads(self):
         s = self.space
-        si = s.setitem
-        w_a = s.newdict()
-        for th, sp in self._asking.items():
-            si(w_a, s.newint(id(th)), s.newint(id(sp)))
-        return w_a
+        thl = [s.newint(id(self._head))]
+        assert isinstance(self._head, ClonableCoroutine)
+        curr = self._head._next
+        while curr != self._head:
+            assert isinstance(curr, ClonableCoroutine)
+            thl.append(s.newint(id(curr)))
+            curr = curr._next
+        w_t = W_ListObject(thl)
+        return w_t
 
+    def w_asking(self):
+        asking = sched.uler._asking.get(self, None)
+        if not asking:
+            return self.space.w_None
+        return W_ListObject([self.space.newint(id(th))
+                             for th in asking.keys()]) 
+
+    def group_info(self):
+        s = self 
+        si = self.space.setitem
+        sw = self.space.wrap
+        w_ret = self.space.newdict()
+        si(w_ret, sw('switches'), self.space.newint(s._switch_count))
+        si(w_ret, sw('threads'), s.w_threads())
+        si(w_ret, sw('asking'), s.w_asking())
+        return w_ret
         
 #-- Misc --------------------------------------------------
 def reset_scheduler(space):
-    sched.uler = Scheduler(space)
+    tg = W_ThreadGroupScheduler(space)
+    tg._init_head(sched.main_thread)
+    sched.uler = TopLevelScheduler(space, tg)
 app_reset_scheduler = gateway.interp2app(reset_scheduler)
 
 def sched_info(space):
-    s = sched.uler
-    si = space.setitem
-    sw = space.wrap
-    w_ret = space.newdict()
-    si(w_ret, sw('switches'), space.newint(s._switch_count))
-    si(w_ret, sw('threads'), s.w_threads())
-    si(w_ret, sw('blocked'), s.w_blocked())
-    si(w_ret, sw('blocked_on'), s.w_blocked_on())
-    si(w_ret, sw('blocked_byneed'), s.w_blocked_byneed())
-    si(w_ret, sw('space_accounting'), s.w_space_accounting())
-    si(w_ret, sw('asking'), s.w_asking())
-    return w_ret
+    return sched.uler.sched_info()
 app_sched_info = gateway.interp2app(sched_info)        
 
 def schedule(space):
     "useful til we get preemtive scheduling deep into the vm"
-    sched.uler.schedule_or_pass()
+    sched.uler.schedule()
 app_schedule = gateway.interp2app(schedule)

Modified: pypy/dist/pypy/objspace/cclp/space.py
==============================================================================
--- pypy/dist/pypy/objspace/cclp/space.py	(original)
+++ pypy/dist/pypy/objspace/cclp/space.py	Thu Oct 12 19:20:18 2006
@@ -18,21 +18,11 @@
      W_CVar, W_AbstractDomain, W_AbstractDistributor
 from pypy.objspace.cclp.interp_var import interp_bind, interp_free
 from pypy.objspace.cclp.constraint.distributor import distribute
+from pypy.objspace.cclp.scheduler import W_ThreadGroupScheduler
 
-def _newspace(space, w_callable, __args__):
-    args = __args__.normalize()
-    w_coro = ClonableCoroutine(space)
-    #w_callable : a logic or constraint script
-    thunk = CSpaceThunk(space, w_callable, args, w_coro)
-    w_coro.bind(thunk)
-    if not we_are_translated():
-        w("NEWSPACE, (distributor) thread", str(id(w_coro)), "for", str(w_callable.name))
-    w_space = W_CSpace(space)
-    w_coro._cspace = w_space
-    sched.uler.add_new_thread(w_coro)
-    return w_space
 
 class NewSpaceThunk(AbstractThunk):
+    "container thread for one comp. space"
     def __init__(self, space, w_callable, __args__, thread):
         self.space = space
         self.thread = thread
@@ -49,14 +39,26 @@
             sched.uler.remove_thread(self.thread)
             sched.uler.schedule()
 
+def _newspace(space, w_callable, __args__):
+    args = __args__.normalize()
+    dist_thread = ClonableCoroutine(space)
+    thunk = CSpaceThunk(space, w_callable, args, dist_thread)
+    dist_thread.bind(thunk)
+    if not we_are_translated():
+        w("NEWSPACE, (distributor) thread", str(id(dist_thread)), "for", str(w_callable.name))
+    w_space = W_CSpace(space, dist_thread)
+    return w_space
+
 def newspace(space, w_callable, __args__):
-    thread = ClonableCoroutine(space)
-    thunk = NewSpaceThunk(space, w_callable, __args__, thread)
-    thread.bind(thunk)
-    sched.uler.add_new_thread(thread)
+    "application level creation of a new computation space"
+    outer_thread = ClonableCoroutine(space)
+    outer_thread._cspace = get_current_cspace(space)
+    thunk = NewSpaceThunk(space, w_callable, __args__, outer_thread)
+    outer_thread.bind(thunk)
+    sched.uler.add_new_thread(outer_thread)
     sched.uler.schedule() # XXX assumption: thread will be executed before we get back there
     cspace = thunk.cspace
-    cspace._container = thread
+    cspace._container = outer_thread
     return cspace
 app_newspace = gateway.interp2app(newspace, unwrap_spec=[baseobjspace.ObjSpace,
                                                          baseobjspace.W_Root,
@@ -94,11 +96,17 @@
             w_dist.w_distribute(choice)
     app_fresh_distributor = gateway.interp2app(fresh_distributor)
 
-class W_CSpace(baseobjspace.Wrappable):
 
-    def __init__(self, space):
-        self.space = space # the object space ;-)
-        self.distributor = None
+class W_CSpace(W_ThreadGroupScheduler):
+
+    def __init__(self, space, dist_thread):
+        W_ThreadGroupScheduler.__init__(self, space)
+        dist_thread._cspace = self
+        self._init_head(dist_thread)
+        self._next = self._prev = self
+        sched.uler.add_new_group(self)
+
+        self.distributor = None # dist instance != thread
         self._container = None # thread that 'contains' us
         # choice mgmt
         self._choice = newvar(space)
@@ -149,7 +157,7 @@
             for const in self._constraints:
                 ccopy = const.copy()
                 new_cspace.tell(ccopy)
-            sched.uler.wait_stable(new_cspace)
+            new_cspace.wait_stable()
             return new_cspace
         else:
             # the theory is that
@@ -166,7 +174,7 @@
             return everything.cspace
 
     def w_ask(self):
-        sched.uler.wait_stable(self)
+        self.wait_stable()
         self.space.wait(self._choice)
         choice = self._choice.w_bound_to
         self._choice = newvar(self.space)
@@ -176,7 +184,7 @@
 
     def choose(self, n):
         assert n > 1
-        sched.uler.wait_stable(self)
+        self.wait_stable()
         if self._failed: #XXX set by any propagator
             raise ConsistencyError
         assert interp_free(self._choice)

Modified: pypy/dist/pypy/objspace/cclp/thunk.py
==============================================================================
--- pypy/dist/pypy/objspace/cclp/thunk.py	(original)
+++ pypy/dist/pypy/objspace/cclp/thunk.py	Thu Oct 12 19:20:18 2006
@@ -27,6 +27,7 @@
 
 
 class ProcedureThunk(_AppThunk):
+    "used by thread.stacklet"
     def __init__(self, space, w_callable, args, coro):
         _AppThunk.__init__(self, space, coro.costate, w_callable, args)
         self._coro = coro
@@ -108,7 +109,6 @@
                     return
                 assert interp_free(cspace._choice)
                 interp_bind(cspace._choice, self.space.newint(1))
-                assert sched.uler._per_space_live_threads[cspace] == 0, "there are still threads living in this space"
         finally:
             interp_bind(cspace._finished, self.space.w_True)
             sched.uler.remove_thread(self._coro)

Modified: pypy/dist/pypy/objspace/logic.py
==============================================================================
--- pypy/dist/pypy/objspace/logic.py	(original)
+++ pypy/dist/pypy/objspace/logic.py	Thu Oct 12 19:20:18 2006
@@ -16,14 +16,15 @@
 
 from pypy.objspace.cclp.thread import app_future, app_stacklet, app_this_thread
 
-from pypy.objspace.cclp.scheduler import Scheduler,  app_sched_info, \
+from pypy.objspace.cclp.scheduler import TopLevelScheduler,  app_sched_info, \
      app_schedule, app_reset_scheduler
 
 from pypy.objspace.cclp.global_state import sched
 
 #-- COMP. SPACE --------------------------------------------
 
-from pypy.objspace.cclp.space import app_newspace, app_choose, W_CSpace, app_tell
+from pypy.objspace.cclp.space import app_newspace, app_choose, W_ThreadGroupScheduler, \
+     W_CSpace, app_tell
 
 #-- VARIABLE ------------------------------------------------
 
@@ -33,7 +34,7 @@
 
 from pypy.objspace.cclp.constraint.variable import app_domain
 
-from pypy.objspace.cclp.types import app_domain_of
+from pypy.objspace.cclp.types import app_domain_of, ClonableCoroutine
 
 all_mms.update(variable_mms)
 
@@ -189,6 +190,7 @@
     space.model.typeorder[W_Future] = [(W_Future, None), (W_Var, None)]
     space.model.typeorder[W_CVar] = [(W_CVar, None), (W_Var, None)]
     space.model.typeorder[W_CSpace] = [(W_CSpace, None), (baseobjspace.Wrappable, None)]
+    space.model.typeorder[W_ThreadGroupScheduler] = [(W_ThreadGroupScheduler, None), (W_CSpace, None)]
     space.model.typeorder[W_FiniteDomain] = [(W_FiniteDomain, None), (W_Root, None)] 
 
 
@@ -296,6 +298,9 @@
     # do the magic
     patch_space_in_place(space, 'logic', proxymaker)
     # instantiate singleton scheduler
-    sched.uler = Scheduler(space)
+    sched.main_thread = ClonableCoroutine.w_getcurrent(space)
+    tg = W_ThreadGroupScheduler(space)
+    tg._init_head(sched.main_thread)
+    sched.uler = TopLevelScheduler(space, tg)
     
     return space



More information about the Pypy-commit mailing list