[Python-checkins] r75442 - in sandbox/trunk/ccbench: ccbench.py

antoine.pitrou python-checkins at python.org
Fri Oct 16 00:06:01 CEST 2009


Author: antoine.pitrou
Date: Fri Oct 16 00:06:01 2009
New Revision: 75442

Log:
Draft of a concurrency benchmark



Added:
   sandbox/trunk/ccbench/
   sandbox/trunk/ccbench/ccbench.py   (contents, props changed)

Added: sandbox/trunk/ccbench/ccbench.py
==============================================================================
--- (empty file)
+++ sandbox/trunk/ccbench/ccbench.py	Fri Oct 16 00:06:01 2009
@@ -0,0 +1,215 @@
+# -*- coding: utf-8 -*-
+# This file should be kept compatible with both Python 2.6 and Python >= 3.0.
+
+from __future__ import division
+from __future__ import print_function
+
+"""
+ccbench, a Python concurrency benchmark.
+"""
+
+import time
+import os
+import sys
+import functools
+import itertools
+import zlib
+import threading
+import subprocess
+import copy
+from optparse import OptionParser
+
+# Compatibility
+try:
+    xrange
+except NameError:
+    xrange = range
+
+try:
+    map = itertools.imap
+except AttributeError:
+    pass
+
+
+THROUGHPUT_DURATION = 2.0
+
+
+def task_pidigits():
+    """Pi calculation (Python)"""
+    _map = map
+    _count = itertools.count
+    _islice = itertools.islice
+
+    def calc_ndigits(n):
+        # From http://shootout.alioth.debian.org/
+        def gen_x():
+            return _map(lambda k: (k, 4*k + 2, 0, 2*k + 1), _count(1))
+        
+        def compose(a, b):
+            aq, ar, as_, at = a
+            bq, br, bs, bt = b
+            return (aq * bq,
+                    aq * br + ar * bt,
+                    as_ * bq + at * bs,
+                    as_ * br + at * bt)
+        
+        def extract(z, j):
+            q, r, s, t = z
+            return (q*j + r) // (s*j + t)
+        
+        def pi_digits():
+            z = (1, 0, 0, 1)
+            x = gen_x()
+            while 1:
+                y = extract(z, 3)
+                while y != extract(z, 4):
+                    z = compose(z, next(x))
+                    y = extract(z, 3)
+                z = compose((10, -10*y, 0, 1), z)
+                yield y
+        
+        return list(_islice(pi_digits(), n))
+
+    return calc_ndigits, (20, )
+
+def task_compress():
+    """zlib compression (C)"""
+    with open(__file__, "rb") as f:
+        arg = f.read(5000) * 3
+
+    def compress(s):
+        zlib.decompress(zlib.compress(s))
+    return compress, (arg, )
+
+
+throughput_tasks = [task_pidigits, task_compress]
+
+
+class TimedLoop:
+    def __init__(self, func, args):
+        self.func = func
+        self.args = args
+    
+    def __call__(self, start_time, min_duration, end_event, do_yield=True):
+        step = 100
+        niters = 0
+        duration = 0.0
+        _time = time.time
+        _sleep = time.sleep
+        _func = self.func
+        _args = self.args
+        t1 = start_time
+        while True:
+            for i in range(step):
+                _func(*_args)
+            t2 = _time()
+            # If another thread terminated, the current measurement is invalid
+            # => return the previous one.
+            # (note: Event.is_set() is cheap)
+            if end_event.is_set():
+                return niters, duration
+            niters += step
+            duration = t2 - start_time
+            if duration >= min_duration:
+                end_event.set()
+                return niters, duration
+            if t2 - t1 < 0.02:
+                # Minimize interference of measurement on overall runtime
+                step *= 2
+            else:
+                if do_yield:
+                    # OS scheduling of Python threads is sometimes so bad that we
+                    # have to force thread switching ourselves, otherwise we get
+                    # completely useless results.
+                    _sleep(0.0001)
+            t1 = t2
+
+
+def run_throughput_test(func, args, nthreads):
+    assert nthreads >= 1
+    
+    results = []
+    loop = TimedLoop(func, args)
+    end_event = threading.Event()
+    
+    if nthreads == 1:
+        # Pure single-threaded performance, without any switching or
+        # synchronization overhead.
+        start_time = time.time()
+        results.append(loop(start_time, THROUGHPUT_DURATION,
+                            end_event, do_yield=False))
+        return results
+    
+    start_cond = threading.Condition()
+    ready = []
+
+    def run():
+        ready.append(None)
+        with start_cond:
+            start_cond.wait()
+        results.append(loop(start_time, THROUGHPUT_DURATION,
+                            end_event))
+
+    threads = []
+    for i in range(nthreads):
+        threads.append(threading.Thread(target=run))
+    for t in threads:
+        t.start()
+    # We don't want measurements to include thread startup overhead,
+    # so we arrange for timing to start after all threads are ready.
+    while len(ready) < nthreads:
+        time.sleep(0.1)
+    with start_cond:
+        start_time = time.time()
+        start_cond.notify(nthreads)
+    for t in threads:
+        t.join()
+    
+    return results
+
+def run_throughput_tests(max_threads=4):
+    for task in throughput_tasks:
+        print(task.__doc__)
+        print()
+        func, args = task()
+        nthreads = 1
+        baseline_speed = None
+        while nthreads <= max_threads:
+            results = run_throughput_test(func, args, nthreads)
+            # The max duration is assumed to be more representative of
+            # the total duration than the average would, especially with
+            # imperfect scheduling.
+            speed = sum(r[0] for r in results) / max(r[1] for r in results)
+            print("threads=%d: %d" % (nthreads, speed), end="")
+            if baseline_speed is None:
+                print(" iterations/s.")
+                baseline_speed = speed
+            else:
+                print(" ( %d %%)" % (speed / baseline_speed * 100))
+            nthreads += 1
+        print()
+
+
+def main():
+    usage = "usage: %prog [-h|--help] [options]"
+    parser = OptionParser(usage=usage)
+    parser.add_option("-t", "--throughput",
+                      action="store_true", dest="throughput", default=False,
+                      help="run throughput tests")
+    parser.add_option("-l", "--latency",
+                      action="store_true", dest="latency", default=False,
+                      help="run latency tests")
+    options, args = parser.parse_args()
+    if args:
+        parser.error("unexpected arguments")
+
+    if not options.throughput and not options.latency:
+        options.throughput = options.latency = True
+    
+    if options.throughput:
+        print("--- Throughput ---")
+        print()
+        run_throughput_tests()
+
+if __name__ == "__main__":
+    main()


More information about the Python-checkins mailing list