[Python-checkins] r68377 - sandbox/trunk/iobench/iobench.py
antoine.pitrou
python-checkins at python.org
Wed Jan 7 16:03:16 CET 2009
Author: antoine.pitrou
Date: Wed Jan 7 16:03:14 2009
New Revision: 68377
Log:
Add an option to run text IO tests with a different encoding than utf8
Modified:
sandbox/trunk/iobench/iobench.py
Modified: sandbox/trunk/iobench/iobench.py
==============================================================================
--- sandbox/trunk/iobench/iobench.py (original)
+++ sandbox/trunk/iobench/iobench.py Wed Jan 7 16:03:14 2009
@@ -11,6 +11,7 @@
out = sys.stdout
+TEXT_ENCODING = 'utf8'
# Compatibility
try:
@@ -18,9 +19,9 @@
except NameError:
xrange = range
-def text_open(fn, mode):
+def text_open(fn, mode, encoding=None):
try:
- return open(fn, mode, encoding="utf-8")
+ return open(fn, mode, encoding=encoding or TEXT_ENCODING)
except TypeError:
return open(fn, mode)
@@ -30,6 +31,13 @@
size = int(size) * {'KB': 1024, 'MB': 1024 ** 2}[unit]
yield s.replace(' ', ''), size
+def get_binary_files():
+ return ((name + ".bin", size) for name, size in get_file_sizes())
+
+def get_text_files():
+ return ((name + "-" + TEXT_ENCODING + ".txt", size)
+ for name, size in get_file_sizes())
+
def with_open_mode(mode):
def decorate(f):
f.file_open_mode = mode
@@ -88,7 +96,7 @@
@with_open_mode("r")
@with_sizes("medium")
def seek_forward_bytewise(f):
- """ seek forward one byte at a time """
+ """ seek forward one unit at a time """
f.seek(0, 2)
size = f.tell()
f.seek(0, 0)
@@ -98,7 +106,7 @@
@with_open_mode("r")
@with_sizes("medium")
def seek_forward_blockwise(f):
- """ seek forward 1000 bytes at a time """
+ """ seek forward 1000 units at a time """
f.seek(0, 2)
size = f.tell()
f.seek(0, 0)
@@ -252,10 +260,11 @@
def run_all_tests(options):
- def print_label(name, func):
+ def print_label(filename, func):
+ name = re.split(r'[-.]', filename)[0]
out.write(
("[%s] %s... "
- % (name.center(9), func.__doc__.strip())
+ % (name.center(7), func.__doc__.strip())
).ljust(52))
out.flush()
@@ -276,7 +285,7 @@
n, real, cpu = run_during(1.5, lambda: test_func(f, *args))
print_results(size, n, real, cpu)
- def run_test_family(tests, mode_filter, file_ext, open_func, *make_args):
+ def run_test_family(tests, mode_filter, files, open_func, *make_args):
for test_func in tests:
if test_func is None:
out.write("\n")
@@ -284,8 +293,8 @@
if mode_filter in test_func.file_open_mode:
continue
for s in test_func.file_sizes:
- name, size = file_sizes[size_names[s]]
- name += file_ext
+ name, size = files[size_names[s]]
+ #name += file_ext
args = tuple(f(name, size) for f in make_args)
run_one_test(name, size,
open_func, test_func, *args)
@@ -295,21 +304,23 @@
"medium": 1,
"large": 2,
}
- file_sizes = list(get_file_sizes())
+ binary_files = list(get_binary_files())
+ text_files = list(get_text_files())
if "b" in options:
print("Binary unit = one byte")
if "t" in options:
- print("Text unit = one character")
+ print("Text unit = one character (%s-decoded)" % TEXT_ENCODING)
+
# Binary reads
if "b" in options and "r" in options:
print("\n** Binary input **\n")
- run_test_family(read_tests, "t", ".bin", lambda fn: open(fn, "rb"))
+ run_test_family(read_tests, "t", binary_files, lambda fn: open(fn, "rb"))
# Text reads
if "t" in options and "r" in options:
print("\n** Text input **\n")
- run_test_family(read_tests, "b", ".txt", lambda fn: text_open(fn, "r"))
+ run_test_family(read_tests, "b", text_files, lambda fn: text_open(fn, "r"))
# Binary writes
if "b" in options and "w" in options:
@@ -317,7 +328,7 @@
def make_test_source(name, size):
with open(name, "rb") as f:
return f.read()
- run_test_family(write_tests, "t", ".bin",
+ run_test_family(write_tests, "t", binary_files,
lambda fn: open(os.devnull, "wb"), make_test_source)
# Text writes
@@ -326,7 +337,7 @@
def make_test_source(name, size):
with text_open(name, "r") as f:
return f.read()
- run_test_family(write_tests, "b", ".txt",
+ run_test_family(write_tests, "b", text_files,
lambda fn: text_open(os.devnull, "w"), make_test_source)
# Binary overwrites
@@ -335,7 +346,7 @@
def make_test_source(name, size):
with open(name, "rb") as f:
return f.read()
- run_test_family(modify_tests, "t", ".bin",
+ run_test_family(modify_tests, "t", binary_files,
lambda fn: open(fn, "r+b"), make_test_source)
# Text overwrites
@@ -344,22 +355,21 @@
def make_test_source(name, size):
with text_open(name, "r") as f:
return f.read()
- run_test_family(modify_tests, "b", ".txt",
+ run_test_family(modify_tests, "b", text_files,
lambda fn: open(fn, "r+"), make_test_source)
def prepare_files():
print("Preparing files...")
# Binary files
- for name, size in get_file_sizes():
- name += ".bin"
+ for name, size in get_binary_files():
if os.path.isfile(name) and os.path.getsize(name) == size:
continue
with open(name, "wb") as f:
f.write(os.urandom(size))
# Text files
chunk = []
- with text_open(__file__, "r") as f:
+ with text_open(__file__, "r", encoding='utf8') as f:
for line in f:
if line.startswith("# <iobench text chunk marker>"):
break
@@ -367,16 +377,10 @@
raise RuntimeError(
"Couldn't find chunk marker in %s !" % __file__)
chunk = "".join(f)
- if not isinstance(chunk, bytes):
- chunk = chunk.encode('utf-8')
- h = hashlib.sha1(chunk).hexdigest()
- expected = "6a8b36cf5e86905965d3bd0e67bd647c92eaa285"
- if h != expected:
- raise RuntimeError(
- "Wrong hash for text content, expected %s, got %s"
- % (expected, h))
- for name, size in get_file_sizes():
- name += ".txt"
+ if isinstance(chunk, bytes):
+ chunk = chunk.decode('utf8')
+ chunk = chunk.encode(TEXT_ENCODING)
+ for name, size in get_text_files():
if os.path.isfile(name) and os.path.getsize(name) == size:
continue
head = chunk * (size // len(chunk))
@@ -384,7 +388,7 @@
# Adjust tail to end on a character boundary
while True:
try:
- tail.decode('utf-8')
+ tail.decode(TEXT_ENCODING)
break
except UnicodeDecodeError:
tail = tail[:-1]
@@ -393,6 +397,8 @@
f.write(tail)
def main():
+ global TEXT_ENCODING
+
usage = "usage: %prog [-h|--help] [options]"
parser = OptionParser(usage=usage)
parser.add_option("-b", "--binary",
@@ -407,10 +413,13 @@
parser.add_option("-w", "--write",
action="store_true", dest="write", default=False,
help="run write & modify tests")
+ parser.add_option("-E", "--encoding",
+ action="store", dest="encoding", default=None,
+ help="encoding for text tests (default: %s)" % TEXT_ENCODING)
options, args = parser.parse_args()
if args:
parser.error("unexpected arguments")
-
+
test_options = ""
if options.read:
test_options += "r"
@@ -425,6 +434,9 @@
elif not options.text:
test_options += "tb"
+ if options.encoding:
+ TEXT_ENCODING = options.encoding
+
prepare_files()
run_all_tests(test_options)
More information about the Python-checkins
mailing list