[pypy-commit] benchmarks default: Remove even more indirection around abstract_threading. This version should create exactly
arigo
noreply at buildbot.pypy.org
Fri Mar 28 14:29:19 CET 2014
Author: Armin Rigo <arigo at tunes.org>
Branch:
Changeset: r242:1b4cc089a6e7
Date: 2014-03-28 14:29 +0100
http://bitbucket.org/pypy/benchmarks/changeset/1b4cc089a6e7/
Log: Remove even more indirection around abstract_threading. This
version should create exactly getsegmentlimit() threads.
diff --git a/multithread/common/abstract_threading.py b/multithread/common/abstract_threading.py
--- a/multithread/common/abstract_threading.py
+++ b/multithread/common/abstract_threading.py
@@ -3,66 +3,40 @@
import thread
try:
- from __pypy__.thread import atomic
+ from __pypy__.thread import atomic, getsegmentlimit
except ImportError:
atomic = Lock()
+ def getsegmentlimit():
+ return 1
+
class Worker(Thread):
"""Thread executing tasks from a given tasks queue"""
def __init__(self, queue):
Thread.__init__(self)
self.daemon = True
- self.next_task = None
- self.cond = Condition()
self.queue = queue
self.start()
def run(self):
- # the next line registers the at_commit_cb on interpreter
- # level for this thread. This should be fixed in the
- # interpreter (it causes a conflict in stmgcintf.register_at_commit_cb).
- # thread.at_commit(lambda : 0, ())
-
while True:
- with self.cond:
- while self.next_task is None:
- self.cond.wait()
-
- func, args, kargs = self.next_task
- self.next_task = None
-
- try:
- func(*args, **kargs)
- except Exception as e:
- print e
-
- # first time put in queue by threadpool on creation
+ func, args, kwds = self.queue.get()
try:
- self.queue.put_nowait(self)
- except Full:
- # thread limit reached, I'll show myself out..
- return
+ func(*args, **kwds)
+ except Exception as e:
+ print e
class ThreadPool(object):
- def __init__(self, thread_queue_size=12):
- self.threads = Queue(thread_queue_size)
+ def __init__(self):
+ self.input_queue = Queue()
+ for n in range(getsegmentlimit()):
+ Worker(self.input_queue)
- def add_task(self, func, *args, **kargs):
- try:
- worker = self.threads.get_nowait()
- except Empty:
- worker = Worker(self.threads)
+ def add_task(self, func, *args, **kwds):
+ self.input_queue.put((func, args, kwds))
- with worker.cond:
- worker.next_task = (func, args, kargs)
- worker.cond.notify_all()
-
-
-
-
-import multiprocessing
-_thread_pool = ThreadPool(1.5 * multiprocessing.cpu_count())
+_thread_pool = ThreadPool()
diff --git a/multithread/raytrace/raytrace.py b/multithread/raytrace/raytrace.py
--- a/multithread/raytrace/raytrace.py
+++ b/multithread/raytrace/raytrace.py
@@ -125,7 +125,6 @@
-tasks = 0
def task(x, h, cameraPos, objs, lightSource):
with atomic:
for y in range(h):
@@ -133,20 +132,8 @@
(Vector(x/50.0-5,y/50.0-5,0)-cameraPos).normal())
trace(ray, objs, lightSource, 10)
- global tasks
- with atomic:
- tasks -= 1
-
futures = []
def future_dispatcher(ths, *args):
- global tasks
-
- while tasks >= ths:
- time.sleep(0)
-
- with atomic:
- tasks += 1
-
futures.append(Future(task, *args))
@@ -167,13 +154,11 @@
cameraPos = Vector(0,0,20)
for x in range(w):
- print x
future_dispatcher(ths, x, h, cameraPos, objs, lightSource)
for f in futures:
f()
del futures[:]
- assert tasks == 0
if __name__ == '__main__':
More information about the pypy-commit
mailing list