[pypy-commit] benchmarks default: add some multithreaded benchmarks and a script to run them (optional)

Remi Meier noreply at buildbot.pypy.org
Tue Mar 25 09:33:08 CET 2014


Author: Remi Meier
Branch: 
Changeset: r240:06ac9ee33205
Date: 2014-03-25 09:33 +0100
http://bitbucket.org/pypy/benchmarks/changeset/06ac9ee33205/

Log:	add some multithreaded benchmarks and a script to run them
	(optional)

diff --git a/multithread/bench.py b/multithread/bench.py
new file mode 100644
--- /dev/null
+++ b/multithread/bench.py
@@ -0,0 +1,116 @@
+#!/usr/bin/python
+
+import time
+import math
+import imp, os, sys
+import json
+import contextlib
+
+def import_file(filepath):
+    mod_name, file_ext = os.path.splitext(os.path.split(filepath)[-1])
+    return imp.load_source(mod_name, filepath)
+
+
+class DummyFile(object):
+    def write(self, x): pass
+
+ at contextlib.contextmanager
+def nostdout():
+    save_stdout = sys.stdout
+    sys.stdout = DummyFile()
+    yield
+    sys.stdout = save_stdout
+
+
+def avg(xs):
+    return sum(xs) / len(xs)
+
+def std_dev(xs):
+    N = len(xs)
+    mu = avg(xs)
+    var = sum([(x - mu)**2 for x in xs]) / N
+    return math.sqrt(var)
+
+def get_error(times):
+    ts = sorted(times)[:args.k]
+    best = float(ts[0])
+    
+    return max((t / best) - 1.0 for t in ts)
+
+def within_error(args, times):
+    return get_error(times) < args.error
+
+def main(args):
+    basedir = os.path.abspath(os.path.dirname(__file__))
+    sys.path.insert(0, basedir+'/')
+    import common
+    print __file__
+    folder = os.path.dirname(args.file)
+    os.chdir(folder)
+    sys.path.insert(0, os.path.abspath('.'))
+    test = import_file(os.path.basename(args.file))
+
+    times = []
+    k = 1
+    try:
+        while True:
+            time.sleep(0.2)
+            if not args.q:
+                print "Run {}/{}:".format(k, args.k)
+
+            test_time = time.time()
+            if args.p:
+                test.run(*args.more)
+            else:
+                with nostdout():
+                    test.run(*args.more)
+            times.append(time.time() - test_time)
+
+            if not args.q:
+                print "took {} s".format(times[-1])
+
+            if k >= args.k:
+                if within_error(args, times):
+                    break
+                elif not args.q:
+                    print "error was not within", args.error
+
+                if k > 2 * args.k:
+                    if not args.q:
+                        print "max number of iterations reached", \
+                            "error still too great, finish anyway"
+                    break
+            k += 1
+    finally:
+        if not args.q:
+            print "times:", times
+
+        if times:
+            times = sorted(times)[:args.k]
+            result = {'best':min(times),
+                      'error':get_error(times),
+                      'std_dev(k)':std_dev(times)}
+            print json.dumps(result)
+
+
+
+if __name__ == '__main__':
+    import argparse
+
+    parser = argparse.ArgumentParser()
+    parser.add_argument('-k', default=3, help='K-best K', type=int)
+    parser.add_argument('-e', '--error', default=0.05, type=float,
+                        help='relative allowed error [0.05]')
+    parser.add_argument('-q', action='store_const',
+                        const=True, default=False,
+                        help='mute except for best run')
+    parser.add_argument('-p', action='store_const',
+                        const=True, default=False,
+                        help='print to stdout what the benchmark prints')
+    parser.add_argument('file', help='file to run')
+    parser.add_argument('more', nargs="*", help='file.run() arguments')
+
+    args = parser.parse_args()
+    if not args.q:
+        print args
+    main(args)
diff --git a/multithread/common/__init__.py b/multithread/common/__init__.py
new file mode 100644
diff --git a/multithread/common/abstract_threading.py b/multithread/common/abstract_threading.py
new file mode 100644
--- /dev/null
+++ b/multithread/common/abstract_threading.py
@@ -0,0 +1,119 @@
+from Queue import Queue, Empty, Full
+from threading import Thread, Condition, Lock
+import thread
+
+try:
+    from __pypy__.thread import atomic
+except ImportError:
+    atomic = Lock()
+
+class Worker(Thread):
+    """Thread executing tasks from a given tasks queue"""
+    def __init__(self, queue):
+        Thread.__init__(self)
+        self.daemon = True
+        self.next_task = None
+        self.cond = Condition()
+        self.queue = queue
+        self.start()
+
+    def run(self):
+        # the next line registers the at_commit_cb on interpreter
+        # level for this thread. This should be fixed in the 
+        # interpreter (it causes a conflict in stmgcintf.register_at_commit_cb).
+        # thread.at_commit(lambda : 0, ())
+
+        while True:
+            with self.cond:
+                while self.next_task is None:
+                    self.cond.wait()
+
+                func, args, kargs = self.next_task
+                self.next_task = None
+
+                try:
+                    func(*args, **kargs)
+                except Exception as e:
+                    print e
+
+            # first time put in queue by threadpool on creation
+            try:
+                self.queue.put_nowait(self)
+            except Full:
+                # thread limit reached, I'll show myself out..
+                return
+
+
+class ThreadPool(object):
+    def __init__(self, thread_queue_size=12):
+        self.threads = Queue(thread_queue_size)
+
+    def add_task(self, func, *args, **kargs):
+        try:
+            worker = self.threads.get_nowait()
+        except Empty:
+            worker = Worker(self.threads)
+
+        with worker.cond:
+            worker.next_task = (func, args, kargs)
+            worker.cond.notify_all()
+
+
+
+
+import multiprocessing
+_thread_pool = ThreadPool(3 * multiprocessing.cpu_count())
+
+
+
+
+class Future(object):
+    def __init__(self, func, *args, **kwargs):
+        self._done = False
+        self._result = None
+        self._exception = None
+        self._cond = Condition()
+
+        assert hasattr(func, "__call__")
+
+        _thread_pool.add_task(self._task, func, *args, **kwargs)
+
+
+    def _task(self, func, *args, **kwargs):
+        with self._cond:
+            try:
+                self._result = func(*args, **kwargs)
+            except Exception as e:
+                self._exception = e
+            finally:
+                self._done = True
+                # several points/threads in the program
+                # may wait for the result (notify_all):
+                self._cond.notify_all()
+
+
+    def __call__(self):
+        with self._cond:
+            while not self._done:
+                self._cond.wait()
+
+        if self._exception:
+            raise self._exception
+
+        return self._result
+
+
+
+class AtomicFuture(Future):
+    def _task(self, func, *args, **kwargs):
+        with self._cond:
+            try:
+                with atomic:
+                    self._result = func(*args, **kwargs)
+            except Exception as e:
+                self._exception = e
+            finally:
+                self._done = True
+                # several points/threads in the program
+                # may wait for the result (notify_all):
+                self._cond.notify_all()
diff --git a/multithread/mandelbrot/mandelbrot.py b/multithread/mandelbrot/mandelbrot.py
new file mode 100644
--- /dev/null
+++ b/multithread/mandelbrot/mandelbrot.py
@@ -0,0 +1,80 @@
+from common.abstract_threading import Future, atomic
+import Image, sys
+
+
+def calculate(a, b, im_size, max_iter=255):
+    print "a:%s, b:%s, im_size:%s" % (a, b, im_size)
+    ar, ai = a
+    br, bi = b
+    width, height = im_size
+    imag_step = (bi - ai) / (height - 1)
+    real_step = (br - ar) / (width - 1)
+    print "real/width:%s, imag/height:%s" % (real_step, imag_step)
+
+    with atomic:
+        result = [[0] * width for y in xrange(height)]
+    for y in xrange(height):
+        zi = ai + y * imag_step
+        for x in xrange(width):
+            zr = ar + x * real_step
+            z = complex(zr, zi)
+            c = z
+            for i in xrange(max_iter):
+                if abs(z) > 2.0:
+                    break
+                z = z * z + c
+            result[y][x] = i
+
+    return result
+
+def save_img(image, file_name='out.png'):
+    im = Image.new("RGB", (len(image[0]), len(image)))
+    out = im.load()
+
+    for y in xrange(len(image)):
+        for x in xrange(len(image[0])):
+            c = image[y][x]
+            out[x,y] = c, c, c
+    im.save(file_name, 'PNG')
+
+def save_to_file(image, file_name='out.txt'):
+    with atomic:
+        s = "\n".join(map(str, image))
+    with open(file_name, 'w') as f:
+        f.write(s)
+
+
+def merge_imgs(imgs):
+    res = []
+    for img in imgs:
+        for y in img:
+            res.append(y)
+    return res
+
+
+def run(threads=2):
+    threads = int(threads)
+    ar, ai = -2.0, -1.5
+    br, bi = 1.0, 1.5
+    width, height = 4096, 4096
+
+    step = (bi - ai) / threads
+    res = []
+    ai = -1.5
+    bi = ai + step
+    for i in xrange(threads):
+        res.append(Future(calculate,
+                          a=(ar, ai + i * step),
+                          b=(br, bi + i * step),
+                          im_size=(width, int(height / threads))
+            ))
+
+    res = [f() for f in res]
+    return merge_imgs(res)
+
+
+
+if __name__ == '__main__':
+    image = run(int(sys.argv[1]))
+    save_to_file(image)
+    # save_img(image) don't run on STM, allocates 4000GB of memory
diff --git a/multithread/raytrace/raytrace.py b/multithread/raytrace/raytrace.py
new file mode 100644
--- /dev/null
+++ b/multithread/raytrace/raytrace.py
@@ -0,0 +1,190 @@
+# From http://www.reddit.com/r/tinycode/comments/169ri9/ray_tracer_in_140_sloc_of_python_with_picture/
+# Date: 14.03.2013
+
+from math import sqrt, pow, pi
+from common.abstract_threading import atomic, Future
+import time
+
+AMBIENT = 0.1
+
+
+
+class Vector(object):
+    def __init__(self,x,y,z):
+        self.x = x
+        self.y = y
+        self.z = z
+
+    def dot(self, b):
+        return self.x*b.x + self.y*b.y + self.z*b.z
+
+    def cross(self, b):
+        return (self.y*b.z-self.z*b.y, self.z*b.x-self.x*b.z, self.x*b.y-self.y*b.x)
+
+    def magnitude(self):
+        return sqrt(self.x*self.x+self.y*self.y+self.z*self.z)
+
+    def normal(self):
+        mag = self.magnitude()
+        return Vector(self.x/mag,self.y/mag,self.z/mag)
+
+    def __add__(self, b):
+        return Vector(self.x + b.x, self.y+b.y, self.z+b.z)
+
+    def __sub__(self, b):
+        return Vector(self.x-b.x, self.y-b.y, self.z-b.z)
+
+    def __mul__(self, b):
+        #assert type(b) == float or type(b) == int
+        return Vector(self.x*b, self.y*b, self.z*b)
+
+
+class Sphere(object):
+    def __init__(self, center, radius, color):
+        self.c = center
+        self.r = radius
+        self.col = color
+
+    def intersection(self, l):
+        q = l.d.dot(l.o - self.c)**2 - (l.o - self.c).dot(l.o - self.c) + self.r**2
+        if q < 0:
+            return Intersection( Vector(0,0,0), -1, Vector(0,0,0), self)
+        else:
+            d = -l.d.dot(l.o - self.c)
+            d1 = d - sqrt(q)
+            d2 = d + sqrt(q)
+            if 0 < d1 and ( d1 < d2 or d2 < 0):
+                return Intersection(l.o+l.d*d1, d1, self.normal(l.o+l.d*d1), self)
+            elif 0 < d2 and ( d2 < d1 or d1 < 0):
+                return Intersection(l.o+l.d*d2, d2, self.normal(l.o+l.d*d2), self)
+            else:
+                return Intersection( Vector(0,0,0), -1, Vector(0,0,0), self)
+
+    def normal(self, b):
+        return (b - self.c).normal()
+
+
+class Plane(object):
+    def __init__(self, point, normal, color):
+        self.n = normal
+        self.p = point
+        self.col = color
+
+    def intersection(self, l):
+        d = l.d.dot(self.n)
+        if d == 0:
+            return Intersection( Vector(0,0,0), -1, Vector(0,0,0), self)
+        else:
+            d = (self.p - l.o).dot(self.n) / d
+            return Intersection(l.o+l.d*d, d, self.n, self)
+
+
+class Ray(object):
+    def __init__(self, origin, direction):
+        self.o = origin
+        self.d = direction
+
+
+class Intersection(object):
+    def __init__(self, point, distance, normal, obj):
+        self.p = point
+        self.d = distance
+        self.n = normal
+        self.obj = obj
+
+
+def testRay(ray, objects, ignore=None):
+    intersect = Intersection( Vector(0,0,0), -1, Vector(0,0,0), None)
+
+    for obj in objects:
+        if obj is not ignore:
+            currentIntersect = obj.intersection(ray)
+            if currentIntersect.d > 0 and intersect.d < 0:
+                intersect = currentIntersect
+            elif 0 < currentIntersect.d < intersect.d:
+                intersect = currentIntersect
+    return intersect
+
+
+def trace(ray, objects, light, maxRecur):
+    if maxRecur < 0:
+        return (0,0,0)
+    intersect = testRay(ray, objects)
+    if intersect.d == -1:
+        col = Vector(AMBIENT,AMBIENT,AMBIENT)
+    elif intersect.n.dot(light - intersect.p) < 0:
+        col = intersect.obj.col * AMBIENT
+    else:
+        lightRay = Ray(intersect.p, (light-intersect.p).normal())
+        if testRay(lightRay, objects, intersect.obj).d == -1:
+            lightIntensity = 1000.0/(4*pi*(light-intersect.p).magnitude()**2)
+            col = intersect.obj.col * max(intersect.n.normal().dot((light - intersect.p).normal()*lightIntensity), AMBIENT)
+        else:
+            col = intersect.obj.col * AMBIENT
+    return col
+
+
+
+tasks = 0
+def task(x, h, cameraPos, objs, lightSource):
+    # force a transaction break here (STM not yet smart enough
+    # to figure out that it should break here)
+    time.sleep(0)
+    
+    with atomic:
+        for y in range(h):
+            ray = Ray(cameraPos,
+                      (Vector(x/50.0-5,y/50.0-5,0)-cameraPos).normal())
+            trace(ray, objs, lightSource, 10)
+
+    # force a transaction break. updating a global var should
+    # be done in a separate transaction:
+    time.sleep(0)
+    
+    global tasks
+    with atomic:
+        tasks -= 1
+    time.sleep(0)
+
+futures = []
+def future_dispatcher(ths, *args):
+    global tasks
+    
+    while tasks >= ths:
+        time.sleep(0)
+
+    with atomic:
+        tasks += 1
+    
+    futures.append(Future(task, *args))
+    time.sleep(0)
+
+
+
+
+def run(ths=8, w=1024, h=1024):
+    ths = int(ths)
+    w = int(w)
+    h = int(h)
+
+    objs = []
+    objs.append(Sphere( Vector(-2,0,-10), 2, Vector(0,255,0)))
+    objs.append(Sphere( Vector(2,0,-10), 3.5, Vector(255,0,0)))
+    objs.append(Sphere( Vector(0,-4,-10), 3, Vector(0,0,255)))
+    objs.append(Plane( Vector(0,0,-12), Vector(0,0,1), Vector(255,255,255)))
+    lightSource = Vector(0,10,0)
+
+    cameraPos = Vector(0,0,20)
+    
+    for x in range(w):
+        print x
+        future_dispatcher(ths, x, h, cameraPos, objs, lightSource)
+
+    for f in futures:
+        f()
+    del futures[:]
+    assert tasks == 0
+
+
+if __name__ == '__main__':
+    run()


More information about the pypy-commit mailing list