[pypy-commit] benchmarks default: add some multithreaded benchmarks and a script to run them (optional)
Remi Meier
noreply at buildbot.pypy.org
Tue Mar 25 09:33:08 CET 2014
Author: Remi Meier
Branch:
Changeset: r240:06ac9ee33205
Date: 2014-03-25 09:33 +0100
http://bitbucket.org/pypy/benchmarks/changeset/06ac9ee33205/
Log: add some multithreaded benchmarks and a script to run them
(optional)
diff --git a/multithread/bench.py b/multithread/bench.py
new file mode 100644
--- /dev/null
+++ b/multithread/bench.py
@@ -0,0 +1,116 @@
+#!/usr/bin/python
+
+import time
+import math
+import imp, os, sys
+import json
+import contextlib
+
+def import_file(filepath):
+ mod_name, file_ext = os.path.splitext(os.path.split(filepath)[-1])
+ return imp.load_source(mod_name, filepath)
+
+
+class DummyFile(object):
+ def write(self, x): pass
+
+ at contextlib.contextmanager
+def nostdout():
+ save_stdout = sys.stdout
+ sys.stdout = DummyFile()
+ yield
+ sys.stdout = save_stdout
+
+
+def avg(xs):
+ return sum(xs) / len(xs)
+
+def std_dev(xs):
+ N = len(xs)
+ mu = avg(xs)
+ var = sum([(x - mu)**2 for x in xs]) / N
+ return math.sqrt(var)
+
+def get_error(times):
+ ts = sorted(times)[:args.k]
+ best = float(ts[0])
+
+ return max((t / best) - 1.0 for t in ts)
+
+def within_error(args, times):
+ return get_error(times) < args.error
+
+def main(args):
+ basedir = os.path.abspath(os.path.dirname(__file__))
+ sys.path.insert(0, basedir+'/')
+ import common
+ print __file__
+ folder = os.path.dirname(args.file)
+ os.chdir(folder)
+ sys.path.insert(0, os.path.abspath('.'))
+ test = import_file(os.path.basename(args.file))
+
+ times = []
+ k = 1
+ try:
+ while True:
+ time.sleep(0.2)
+ if not args.q:
+ print "Run {}/{}:".format(k, args.k)
+
+ test_time = time.time()
+ if args.p:
+ test.run(*args.more)
+ else:
+ with nostdout():
+ test.run(*args.more)
+ times.append(time.time() - test_time)
+
+ if not args.q:
+ print "took {} s".format(times[-1])
+
+ if k >= args.k:
+ if within_error(args, times):
+ break
+ elif not args.q:
+ print "error was not within", args.error
+
+ if k > 2 * args.k:
+ if not args.q:
+ print "max number of iterations reached", \
+ "error still too great, finish anyway"
+ break
+ k += 1
+ finally:
+ if not args.q:
+ print "times:", times
+
+ if times:
+ times = sorted(times)[:args.k]
+ result = {'best':min(times),
+ 'error':get_error(times),
+ 'std_dev(k)':std_dev(times)}
+ print json.dumps(result)
+
+
+
+if __name__ == '__main__':
+ import argparse
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument('-k', default=3, help='K-best K', type=int)
+ parser.add_argument('-e', '--error', default=0.05, type=float,
+ help='relative allowed error [0.05]')
+ parser.add_argument('-q', action='store_const',
+ const=True, default=False,
+ help='mute except for best run')
+ parser.add_argument('-p', action='store_const',
+ const=True, default=False,
+ help='print to stdout what the benchmark prints')
+ parser.add_argument('file', help='file to run')
+ parser.add_argument('more', nargs="*", help='file.run() arguments')
+
+ args = parser.parse_args()
+ if not args.q:
+ print args
+ main(args)
diff --git a/multithread/common/__init__.py b/multithread/common/__init__.py
new file mode 100644
diff --git a/multithread/common/abstract_threading.py b/multithread/common/abstract_threading.py
new file mode 100644
--- /dev/null
+++ b/multithread/common/abstract_threading.py
@@ -0,0 +1,119 @@
+from Queue import Queue, Empty, Full
+from threading import Thread, Condition, Lock
+import thread
+
+try:
+ from __pypy__.thread import atomic
+except ImportError:
+ atomic = Lock()
+
+class Worker(Thread):
+ """Thread executing tasks from a given tasks queue"""
+ def __init__(self, queue):
+ Thread.__init__(self)
+ self.daemon = True
+ self.next_task = None
+ self.cond = Condition()
+ self.queue = queue
+ self.start()
+
+ def run(self):
+ # the next line registers the at_commit_cb on interpreter
+ # level for this thread. This should be fixed in the
+ # interpreter (it causes a conflict in stmgcintf.register_at_commit_cb).
+ # thread.at_commit(lambda : 0, ())
+
+ while True:
+ with self.cond:
+ while self.next_task is None:
+ self.cond.wait()
+
+ func, args, kargs = self.next_task
+ self.next_task = None
+
+ try:
+ func(*args, **kargs)
+ except Exception as e:
+ print e
+
+ # first time put in queue by threadpool on creation
+ try:
+ self.queue.put_nowait(self)
+ except Full:
+ # thread limit reached, I'll show myself out..
+ return
+
+
+class ThreadPool(object):
+ def __init__(self, thread_queue_size=12):
+ self.threads = Queue(thread_queue_size)
+
+ def add_task(self, func, *args, **kargs):
+ try:
+ worker = self.threads.get_nowait()
+ except Empty:
+ worker = Worker(self.threads)
+
+ with worker.cond:
+ worker.next_task = (func, args, kargs)
+ worker.cond.notify_all()
+
+
+
+
+import multiprocessing
+_thread_pool = ThreadPool(3 * multiprocessing.cpu_count())
+
+
+
+
+class Future(object):
+ def __init__(self, func, *args, **kwargs):
+ self._done = False
+ self._result = None
+ self._exception = None
+ self._cond = Condition()
+
+ assert hasattr(func, "__call__")
+
+ _thread_pool.add_task(self._task, func, *args, **kwargs)
+
+
+ def _task(self, func, *args, **kwargs):
+ with self._cond:
+ try:
+ self._result = func(*args, **kwargs)
+ except Exception as e:
+ self._exception = e
+ finally:
+ self._done = True
+ # several points/threads in the program
+ # may wait for the result (notify_all):
+ self._cond.notify_all()
+
+
+ def __call__(self):
+ with self._cond:
+ while not self._done:
+ self._cond.wait()
+
+ if self._exception:
+ raise self._exception
+
+ return self._result
+
+
+
+class AtomicFuture(Future):
+ def _task(self, func, *args, **kwargs):
+ with self._cond:
+ try:
+ with atomic:
+ self._result = func(*args, **kwargs)
+ except Exception as e:
+ self._exception = e
+ finally:
+ self._done = True
+ # several points/threads in the program
+ # may wait for the result (notify_all):
+ self._cond.notify_all()
diff --git a/multithread/mandelbrot/mandelbrot.py b/multithread/mandelbrot/mandelbrot.py
new file mode 100644
--- /dev/null
+++ b/multithread/mandelbrot/mandelbrot.py
@@ -0,0 +1,80 @@
+from common.abstract_threading import Future, atomic
+import Image, sys
+
+
+def calculate(a, b, im_size, max_iter=255):
+ print "a:%s, b:%s, im_size:%s" % (a, b, im_size)
+ ar, ai = a
+ br, bi = b
+ width, height = im_size
+ imag_step = (bi - ai) / (height - 1)
+ real_step = (br - ar) / (width - 1)
+ print "real/width:%s, imag/height:%s" % (real_step, imag_step)
+
+ with atomic:
+ result = [[0] * width for y in xrange(height)]
+ for y in xrange(height):
+ zi = ai + y * imag_step
+ for x in xrange(width):
+ zr = ar + x * real_step
+ z = complex(zr, zi)
+ c = z
+ for i in xrange(max_iter):
+ if abs(z) > 2.0:
+ break
+ z = z * z + c
+ result[y][x] = i
+
+ return result
+
+def save_img(image, file_name='out.png'):
+ im = Image.new("RGB", (len(image[0]), len(image)))
+ out = im.load()
+
+ for y in xrange(len(image)):
+ for x in xrange(len(image[0])):
+ c = image[y][x]
+ out[x,y] = c, c, c
+ im.save(file_name, 'PNG')
+
+def save_to_file(image, file_name='out.txt'):
+ with atomic:
+ s = "\n".join(map(str, image))
+ with open(file_name, 'w') as f:
+ f.write(s)
+
+
+def merge_imgs(imgs):
+ res = []
+ for img in imgs:
+ for y in img:
+ res.append(y)
+ return res
+
+
+def run(threads=2):
+ threads = int(threads)
+ ar, ai = -2.0, -1.5
+ br, bi = 1.0, 1.5
+ width, height = 4096, 4096
+
+ step = (bi - ai) / threads
+ res = []
+ ai = -1.5
+ bi = ai + step
+ for i in xrange(threads):
+ res.append(Future(calculate,
+ a=(ar, ai + i * step),
+ b=(br, bi + i * step),
+ im_size=(width, int(height / threads))
+ ))
+
+ res = [f() for f in res]
+ return merge_imgs(res)
+
+
+
+if __name__ == '__main__':
+ image = run(int(sys.argv[1]))
+ save_to_file(image)
+ # save_img(image) don't run on STM, allocates 4000GB of memory
diff --git a/multithread/raytrace/raytrace.py b/multithread/raytrace/raytrace.py
new file mode 100644
--- /dev/null
+++ b/multithread/raytrace/raytrace.py
@@ -0,0 +1,190 @@
+# From http://www.reddit.com/r/tinycode/comments/169ri9/ray_tracer_in_140_sloc_of_python_with_picture/
+# Date: 14.03.2013
+
+from math import sqrt, pow, pi
+from common.abstract_threading import atomic, Future
+import time
+
+AMBIENT = 0.1
+
+
+
+class Vector(object):
+ def __init__(self,x,y,z):
+ self.x = x
+ self.y = y
+ self.z = z
+
+ def dot(self, b):
+ return self.x*b.x + self.y*b.y + self.z*b.z
+
+ def cross(self, b):
+ return (self.y*b.z-self.z*b.y, self.z*b.x-self.x*b.z, self.x*b.y-self.y*b.x)
+
+ def magnitude(self):
+ return sqrt(self.x*self.x+self.y*self.y+self.z*self.z)
+
+ def normal(self):
+ mag = self.magnitude()
+ return Vector(self.x/mag,self.y/mag,self.z/mag)
+
+ def __add__(self, b):
+ return Vector(self.x + b.x, self.y+b.y, self.z+b.z)
+
+ def __sub__(self, b):
+ return Vector(self.x-b.x, self.y-b.y, self.z-b.z)
+
+ def __mul__(self, b):
+ #assert type(b) == float or type(b) == int
+ return Vector(self.x*b, self.y*b, self.z*b)
+
+
+class Sphere(object):
+ def __init__(self, center, radius, color):
+ self.c = center
+ self.r = radius
+ self.col = color
+
+ def intersection(self, l):
+ q = l.d.dot(l.o - self.c)**2 - (l.o - self.c).dot(l.o - self.c) + self.r**2
+ if q < 0:
+ return Intersection( Vector(0,0,0), -1, Vector(0,0,0), self)
+ else:
+ d = -l.d.dot(l.o - self.c)
+ d1 = d - sqrt(q)
+ d2 = d + sqrt(q)
+ if 0 < d1 and ( d1 < d2 or d2 < 0):
+ return Intersection(l.o+l.d*d1, d1, self.normal(l.o+l.d*d1), self)
+ elif 0 < d2 and ( d2 < d1 or d1 < 0):
+ return Intersection(l.o+l.d*d2, d2, self.normal(l.o+l.d*d2), self)
+ else:
+ return Intersection( Vector(0,0,0), -1, Vector(0,0,0), self)
+
+ def normal(self, b):
+ return (b - self.c).normal()
+
+
+class Plane(object):
+ def __init__(self, point, normal, color):
+ self.n = normal
+ self.p = point
+ self.col = color
+
+ def intersection(self, l):
+ d = l.d.dot(self.n)
+ if d == 0:
+ return Intersection( Vector(0,0,0), -1, Vector(0,0,0), self)
+ else:
+ d = (self.p - l.o).dot(self.n) / d
+ return Intersection(l.o+l.d*d, d, self.n, self)
+
+
+class Ray(object):
+ def __init__(self, origin, direction):
+ self.o = origin
+ self.d = direction
+
+
+class Intersection(object):
+ def __init__(self, point, distance, normal, obj):
+ self.p = point
+ self.d = distance
+ self.n = normal
+ self.obj = obj
+
+
+def testRay(ray, objects, ignore=None):
+ intersect = Intersection( Vector(0,0,0), -1, Vector(0,0,0), None)
+
+ for obj in objects:
+ if obj is not ignore:
+ currentIntersect = obj.intersection(ray)
+ if currentIntersect.d > 0 and intersect.d < 0:
+ intersect = currentIntersect
+ elif 0 < currentIntersect.d < intersect.d:
+ intersect = currentIntersect
+ return intersect
+
+
+def trace(ray, objects, light, maxRecur):
+ if maxRecur < 0:
+ return (0,0,0)
+ intersect = testRay(ray, objects)
+ if intersect.d == -1:
+ col = Vector(AMBIENT,AMBIENT,AMBIENT)
+ elif intersect.n.dot(light - intersect.p) < 0:
+ col = intersect.obj.col * AMBIENT
+ else:
+ lightRay = Ray(intersect.p, (light-intersect.p).normal())
+ if testRay(lightRay, objects, intersect.obj).d == -1:
+ lightIntensity = 1000.0/(4*pi*(light-intersect.p).magnitude()**2)
+ col = intersect.obj.col * max(intersect.n.normal().dot((light - intersect.p).normal()*lightIntensity), AMBIENT)
+ else:
+ col = intersect.obj.col * AMBIENT
+ return col
+
+
+
+tasks = 0
+def task(x, h, cameraPos, objs, lightSource):
+ # force a transaction break here (STM not yet smart enough
+ # to figure out that it should break here)
+ time.sleep(0)
+
+ with atomic:
+ for y in range(h):
+ ray = Ray(cameraPos,
+ (Vector(x/50.0-5,y/50.0-5,0)-cameraPos).normal())
+ trace(ray, objs, lightSource, 10)
+
+ # force a transaction break. updating a global var should
+ # be done in a separate transaction:
+ time.sleep(0)
+
+ global tasks
+ with atomic:
+ tasks -= 1
+ time.sleep(0)
+
+futures = []
+def future_dispatcher(ths, *args):
+ global tasks
+
+ while tasks >= ths:
+ time.sleep(0)
+
+ with atomic:
+ tasks += 1
+
+ futures.append(Future(task, *args))
+ time.sleep(0)
+
+
+
+
+def run(ths=8, w=1024, h=1024):
+ ths = int(ths)
+ w = int(w)
+ h = int(h)
+
+ objs = []
+ objs.append(Sphere( Vector(-2,0,-10), 2, Vector(0,255,0)))
+ objs.append(Sphere( Vector(2,0,-10), 3.5, Vector(255,0,0)))
+ objs.append(Sphere( Vector(0,-4,-10), 3, Vector(0,0,255)))
+ objs.append(Plane( Vector(0,0,-12), Vector(0,0,1), Vector(255,255,255)))
+ lightSource = Vector(0,10,0)
+
+ cameraPos = Vector(0,0,20)
+
+ for x in range(w):
+ print x
+ future_dispatcher(ths, x, h, cameraPos, objs, lightSource)
+
+ for f in futures:
+ f()
+ del futures[:]
+ assert tasks == 0
+
+
+if __name__ == '__main__':
+ run()
More information about the pypy-commit
mailing list