[pypy-commit] benchmarks default: Remove even more indirection around abstract_threading. This version should create exactly

arigo noreply at buildbot.pypy.org
Fri Mar 28 14:29:19 CET 2014


Author: Armin Rigo <arigo at tunes.org>
Branch: 
Changeset: r242:1b4cc089a6e7
Date: 2014-03-28 14:29 +0100
http://bitbucket.org/pypy/benchmarks/changeset/1b4cc089a6e7/

Log:	Remove even more indirection around abstract_threading. This
	version should create exactly getsegmentlimit() threads.

diff --git a/multithread/common/abstract_threading.py b/multithread/common/abstract_threading.py
--- a/multithread/common/abstract_threading.py
+++ b/multithread/common/abstract_threading.py
@@ -3,66 +3,40 @@
 import thread
 
 try:
-    from __pypy__.thread import atomic
+    from __pypy__.thread import atomic, getsegmentlimit
 except ImportError:
     atomic = Lock()
+    def getsegmentlimit():
+        return 1
+
 
 class Worker(Thread):
     """Thread executing tasks from a given tasks queue"""
     def __init__(self, queue):
         Thread.__init__(self)
         self.daemon = True
-        self.next_task = None
-        self.cond = Condition()
         self.queue = queue
         self.start()
 
     def run(self):
-        # the next line registers the at_commit_cb on interpreter
-        # level for this thread. This should be fixed in the 
-        # interpreter (it causes a conflict in stmgcintf.register_at_commit_cb).
-        # thread.at_commit(lambda : 0, ())
-
         while True:
-            with self.cond:
-                while self.next_task is None:
-                    self.cond.wait()
-
-                func, args, kargs = self.next_task
-                self.next_task = None
-
-                try:
-                    func(*args, **kargs)
-                except Exception as e:
-                    print e
-
-            # first time put in queue by threadpool on creation
+            func, args, kwds = self.queue.get()
             try:
-                self.queue.put_nowait(self)
-            except Full:
-                # thread limit reached, I'll show myself out..
-                return
+                func(*args, **kwds)
+            except Exception as e:
+                print e
 
 
 class ThreadPool(object):
-    def __init__(self, thread_queue_size=12):
-        self.threads = Queue(thread_queue_size)
+    def __init__(self):
+        self.input_queue = Queue()
+        for n in range(getsegmentlimit()):
+            Worker(self.input_queue)
 
-    def add_task(self, func, *args, **kargs):
-        try:
-            worker = self.threads.get_nowait()
-        except Empty:
-            worker = Worker(self.threads)
+    def add_task(self, func, *args, **kwds):
+        self.input_queue.put((func, args, kwds))
 
-        with worker.cond:
-            worker.next_task = (func, args, kargs)
-            worker.cond.notify_all()
-
-
-
-
-import multiprocessing
-_thread_pool = ThreadPool(1.5 * multiprocessing.cpu_count())
+_thread_pool = ThreadPool()
 
 
 
diff --git a/multithread/raytrace/raytrace.py b/multithread/raytrace/raytrace.py
--- a/multithread/raytrace/raytrace.py
+++ b/multithread/raytrace/raytrace.py
@@ -125,7 +125,6 @@
 
 
 
-tasks = 0
 def task(x, h, cameraPos, objs, lightSource):
     with atomic:
         for y in range(h):
@@ -133,20 +132,8 @@
                       (Vector(x/50.0-5,y/50.0-5,0)-cameraPos).normal())
             trace(ray, objs, lightSource, 10)
 
-    global tasks
-    with atomic:
-        tasks -= 1
-
 futures = []
 def future_dispatcher(ths, *args):
-    global tasks
-    
-    while tasks >= ths:
-        time.sleep(0)
-
-    with atomic:
-        tasks += 1
-    
     futures.append(Future(task, *args))
 
 
@@ -167,13 +154,11 @@
     cameraPos = Vector(0,0,20)
     
     for x in range(w):
-        print x
         future_dispatcher(ths, x, h, cameraPos, objs, lightSource)
 
     for f in futures:
         f()
     del futures[:]
-    assert tasks == 0
 
 
 if __name__ == '__main__':


More information about the pypy-commit mailing list