[Python-checkins] r75442 - in sandbox/trunk/ccbench: ccbench.py
antoine.pitrou
python-checkins at python.org
Fri Oct 16 00:06:01 CEST 2009
Author: antoine.pitrou
Date: Fri Oct 16 00:06:01 2009
New Revision: 75442
Log:
Draft of a concurrency benchmark
Added:
sandbox/trunk/ccbench/
sandbox/trunk/ccbench/ccbench.py (contents, props changed)
Added: sandbox/trunk/ccbench/ccbench.py
==============================================================================
--- (empty file)
+++ sandbox/trunk/ccbench/ccbench.py Fri Oct 16 00:06:01 2009
@@ -0,0 +1,215 @@
+# -*- coding: utf-8 -*-
+# This file should be kept compatible with both Python 2.6 and Python >= 3.0.
+
+from __future__ import division
+from __future__ import print_function
+
+"""
+ccbench, a Python concurrency benchmark.
+"""
+
+import time
+import os
+import sys
+import functools
+import itertools
+import zlib
+import threading
+import subprocess
+import copy
+from optparse import OptionParser
+
+# Compatibility
+try:
+ xrange
+except NameError:
+ xrange = range
+
+try:
+ map = itertools.imap
+except AttributeError:
+ pass
+
+
+THROUGHPUT_DURATION = 2.0
+
+
+def task_pidigits():
+ """Pi calculation (Python)"""
+ _map = map
+ _count = itertools.count
+ _islice = itertools.islice
+
+ def calc_ndigits(n):
+ # From http://shootout.alioth.debian.org/
+ def gen_x():
+ return _map(lambda k: (k, 4*k + 2, 0, 2*k + 1), _count(1))
+
+ def compose(a, b):
+ aq, ar, as_, at = a
+ bq, br, bs, bt = b
+ return (aq * bq,
+ aq * br + ar * bt,
+ as_ * bq + at * bs,
+ as_ * br + at * bt)
+
+ def extract(z, j):
+ q, r, s, t = z
+ return (q*j + r) // (s*j + t)
+
+ def pi_digits():
+ z = (1, 0, 0, 1)
+ x = gen_x()
+ while 1:
+ y = extract(z, 3)
+ while y != extract(z, 4):
+ z = compose(z, next(x))
+ y = extract(z, 3)
+ z = compose((10, -10*y, 0, 1), z)
+ yield y
+
+ return list(_islice(pi_digits(), n))
+
+ return calc_ndigits, (20, )
+
+def task_compress():
+ """zlib compression (C)"""
+ with open(__file__, "rb") as f:
+ arg = f.read(5000) * 3
+
+ def compress(s):
+ zlib.decompress(zlib.compress(s))
+ return compress, (arg, )
+
+
+throughput_tasks = [task_pidigits, task_compress]
+
+
+class TimedLoop:
+ def __init__(self, func, args):
+ self.func = func
+ self.args = args
+
+ def __call__(self, start_time, min_duration, end_event, do_yield=True):
+ step = 100
+ niters = 0
+ duration = 0.0
+ _time = time.time
+ _sleep = time.sleep
+ _func = self.func
+ _args = self.args
+ t1 = start_time
+ while True:
+ for i in range(step):
+ _func(*_args)
+ t2 = _time()
+ # If another thread terminated, the current measurement is invalid
+ # => return the previous one.
+ # (note: Event.is_set() is cheap)
+ if end_event.is_set():
+ return niters, duration
+ niters += step
+ duration = t2 - start_time
+ if duration >= min_duration:
+ end_event.set()
+ return niters, duration
+ if t2 - t1 < 0.02:
+ # Minimize interference of measurement on overall runtime
+ step *= 2
+ else:
+ if do_yield:
+ # OS scheduling of Python threads is sometimes so bad that we
+ # have to force thread switching ourselves, otherwise we get
+ # completely useless results.
+ _sleep(0.0001)
+ t1 = t2
+
+
+def run_throughput_test(func, args, nthreads):
+ assert nthreads >= 1
+
+ results = []
+ loop = TimedLoop(func, args)
+ end_event = threading.Event()
+
+ if nthreads == 1:
+ # Pure single-threaded performance, without any switching or
+ # synchronization overhead.
+ start_time = time.time()
+ results.append(loop(start_time, THROUGHPUT_DURATION,
+ end_event, do_yield=False))
+ return results
+
+ start_cond = threading.Condition()
+ ready = []
+
+ def run():
+ ready.append(None)
+ with start_cond:
+ start_cond.wait()
+ results.append(loop(start_time, THROUGHPUT_DURATION,
+ end_event))
+
+ threads = []
+ for i in range(nthreads):
+ threads.append(threading.Thread(target=run))
+ for t in threads:
+ t.start()
+ # We don't want measurements to include thread startup overhead,
+ # so we arrange for timing to start after all threads are ready.
+ while len(ready) < nthreads:
+ time.sleep(0.1)
+ with start_cond:
+ start_time = time.time()
+ start_cond.notify(nthreads)
+ for t in threads:
+ t.join()
+
+ return results
+
+def run_throughput_tests(max_threads=4):
+ for task in throughput_tasks:
+ print(task.__doc__)
+ print()
+ func, args = task()
+ nthreads = 1
+ baseline_speed = None
+ while nthreads <= max_threads:
+ results = run_throughput_test(func, args, nthreads)
+ # The max duration is assumed to be more representative of
+ # the total duration than the average would, especially with
+ # imperfect scheduling.
+ speed = sum(r[0] for r in results) / max(r[1] for r in results)
+ print("threads=%d: %d" % (nthreads, speed), end="")
+ if baseline_speed is None:
+ print(" iterations/s.")
+ baseline_speed = speed
+ else:
+ print(" ( %d %%)" % (speed / baseline_speed * 100))
+ nthreads += 1
+ print()
+
+
+def main():
+ usage = "usage: %prog [-h|--help] [options]"
+ parser = OptionParser(usage=usage)
+ parser.add_option("-t", "--throughput",
+ action="store_true", dest="throughput", default=False,
+ help="run throughput tests")
+ parser.add_option("-l", "--latency",
+ action="store_true", dest="latency", default=False,
+ help="run latency tests")
+ options, args = parser.parse_args()
+ if args:
+ parser.error("unexpected arguments")
+
+ if not options.throughput and not options.latency:
+ options.throughput = options.latency = True
+
+ if options.throughput:
+ print("--- Throughput ---")
+ print()
+ run_throughput_tests()
+
+if __name__ == "__main__":
+ main()
More information about the Python-checkins
mailing list