[Python-checkins] r68377 - sandbox/trunk/iobench/iobench.py

antoine.pitrou python-checkins at python.org
Wed Jan 7 16:03:16 CET 2009


Author: antoine.pitrou
Date: Wed Jan  7 16:03:14 2009
New Revision: 68377

Log:
Add an option to run text IO tests with a different encoding than utf8



Modified:
   sandbox/trunk/iobench/iobench.py

Modified: sandbox/trunk/iobench/iobench.py
==============================================================================
--- sandbox/trunk/iobench/iobench.py	(original)
+++ sandbox/trunk/iobench/iobench.py	Wed Jan  7 16:03:14 2009
@@ -11,6 +11,7 @@
 
 out = sys.stdout
 
+TEXT_ENCODING = 'utf8'
 
 # Compatibility
 try:
@@ -18,9 +19,9 @@
 except NameError:
     xrange = range
 
-def text_open(fn, mode):
+def text_open(fn, mode, encoding=None):
     try:
-        return open(fn, mode, encoding="utf-8")
+        return open(fn, mode, encoding=encoding or TEXT_ENCODING)
     except TypeError:
         return open(fn, mode)
 
@@ -30,6 +31,13 @@
         size = int(size) * {'KB': 1024, 'MB': 1024 ** 2}[unit]
         yield s.replace(' ', ''), size
 
+def get_binary_files():
+    return ((name + ".bin", size) for name, size in get_file_sizes())
+
+def get_text_files():
+    return ((name + "-" + TEXT_ENCODING + ".txt", size)
+        for name, size in get_file_sizes())
+
 def with_open_mode(mode):
     def decorate(f):
         f.file_open_mode = mode
@@ -88,7 +96,7 @@
 @with_open_mode("r")
 @with_sizes("medium")
 def seek_forward_bytewise(f):
-    """ seek forward one byte at a time """
+    """ seek forward one unit at a time """
     f.seek(0, 2)
     size = f.tell()
     f.seek(0, 0)
@@ -98,7 +106,7 @@
 @with_open_mode("r")
 @with_sizes("medium")
 def seek_forward_blockwise(f):
-    """ seek forward 1000 bytes at a time """
+    """ seek forward 1000 units at a time """
     f.seek(0, 2)
     size = f.tell()
     f.seek(0, 0)
@@ -252,10 +260,11 @@
 
 
 def run_all_tests(options):
-    def print_label(name, func):
+    def print_label(filename, func):
+        name = re.split(r'[-.]', filename)[0]
         out.write(
             ("[%s] %s... "
-                % (name.center(9), func.__doc__.strip())
+                % (name.center(7), func.__doc__.strip())
             ).ljust(52))
         out.flush()
 
@@ -276,7 +285,7 @@
             n, real, cpu = run_during(1.5, lambda: test_func(f, *args))
         print_results(size, n, real, cpu)
 
-    def run_test_family(tests, mode_filter, file_ext, open_func, *make_args):
+    def run_test_family(tests, mode_filter, files, open_func, *make_args):
         for test_func in tests:
             if test_func is None:
                 out.write("\n")
@@ -284,8 +293,8 @@
             if mode_filter in test_func.file_open_mode:
                 continue
             for s in test_func.file_sizes:
-                name, size = file_sizes[size_names[s]]
-                name += file_ext
+                name, size = files[size_names[s]]
+                #name += file_ext
                 args = tuple(f(name, size) for f in make_args)
                 run_one_test(name, size,
                     open_func, test_func, *args)
@@ -295,21 +304,23 @@
         "medium": 1,
         "large": 2,
     }
-    file_sizes = list(get_file_sizes())
 
+    binary_files = list(get_binary_files())
+    text_files = list(get_text_files())
     if "b" in options:
         print("Binary unit = one byte")
     if "t" in options:
-        print("Text unit = one character")
+        print("Text unit = one character (%s-decoded)" % TEXT_ENCODING)
+
     # Binary reads
     if "b" in options and "r" in options:
         print("\n** Binary input **\n")
-        run_test_family(read_tests, "t", ".bin", lambda fn: open(fn, "rb"))
+        run_test_family(read_tests, "t", binary_files, lambda fn: open(fn, "rb"))
 
     # Text reads
     if "t" in options and "r" in options:
         print("\n** Text input **\n")
-        run_test_family(read_tests, "b", ".txt", lambda fn: text_open(fn, "r"))
+        run_test_family(read_tests, "b", text_files, lambda fn: text_open(fn, "r"))
 
     # Binary writes
     if "b" in options and "w" in options:
@@ -317,7 +328,7 @@
         def make_test_source(name, size):
             with open(name, "rb") as f:
                 return f.read()
-        run_test_family(write_tests, "t", ".bin",
+        run_test_family(write_tests, "t", binary_files,
             lambda fn: open(os.devnull, "wb"), make_test_source)
 
     # Text writes
@@ -326,7 +337,7 @@
         def make_test_source(name, size):
             with text_open(name, "r") as f:
                 return f.read()
-        run_test_family(write_tests, "b", ".txt",
+        run_test_family(write_tests, "b", text_files,
             lambda fn: text_open(os.devnull, "w"), make_test_source)
 
     # Binary overwrites
@@ -335,7 +346,7 @@
         def make_test_source(name, size):
             with open(name, "rb") as f:
                 return f.read()
-        run_test_family(modify_tests, "t", ".bin",
+        run_test_family(modify_tests, "t", binary_files,
             lambda fn: open(fn, "r+b"), make_test_source)
 
     # Text overwrites
@@ -344,22 +355,21 @@
         def make_test_source(name, size):
             with text_open(name, "r") as f:
                 return f.read()
-        run_test_family(modify_tests, "b", ".txt",
+        run_test_family(modify_tests, "b", text_files,
             lambda fn: open(fn, "r+"), make_test_source)
 
 
 def prepare_files():
     print("Preparing files...")
     # Binary files
-    for name, size in get_file_sizes():
-        name += ".bin"
+    for name, size in get_binary_files():
         if os.path.isfile(name) and os.path.getsize(name) == size:
             continue
         with open(name, "wb") as f:
             f.write(os.urandom(size))
     # Text files
     chunk = []
-    with text_open(__file__, "r") as f:
+    with text_open(__file__, "r", encoding='utf8') as f:
         for line in f:
             if line.startswith("# <iobench text chunk marker>"):
                 break
@@ -367,16 +377,10 @@
             raise RuntimeError(
                 "Couldn't find chunk marker in %s !" % __file__)
         chunk = "".join(f)
-        if not isinstance(chunk, bytes):
-            chunk = chunk.encode('utf-8')
-        h = hashlib.sha1(chunk).hexdigest()
-        expected = "6a8b36cf5e86905965d3bd0e67bd647c92eaa285"
-        if h != expected:
-            raise RuntimeError(
-                "Wrong hash for text content, expected %s, got %s"
-                % (expected, h))
-    for name, size in get_file_sizes():
-        name += ".txt"
+        if isinstance(chunk, bytes):
+            chunk = chunk.decode('utf8')
+        chunk = chunk.encode(TEXT_ENCODING)
+    for name, size in get_text_files():
         if os.path.isfile(name) and os.path.getsize(name) == size:
             continue
         head = chunk * (size // len(chunk))
@@ -384,7 +388,7 @@
         # Adjust tail to end on a character boundary
         while True:
             try:
-                tail.decode('utf-8')
+                tail.decode(TEXT_ENCODING)
                 break
             except UnicodeDecodeError:
                 tail = tail[:-1]
@@ -393,6 +397,8 @@
             f.write(tail)
 
 def main():
+    global TEXT_ENCODING
+
     usage = "usage: %prog [-h|--help] [options]"
     parser = OptionParser(usage=usage)
     parser.add_option("-b", "--binary",
@@ -407,10 +413,13 @@
     parser.add_option("-w", "--write",
                       action="store_true", dest="write", default=False,
                       help="run write & modify tests")
+    parser.add_option("-E", "--encoding",
+                      action="store", dest="encoding", default=None,
+                      help="encoding for text tests (default: %s)" % TEXT_ENCODING)
     options, args = parser.parse_args()
     if args:
         parser.error("unexpected arguments")
-    
+
     test_options = ""
     if options.read:
         test_options += "r"
@@ -425,6 +434,9 @@
     elif not options.text:
         test_options += "tb"
 
+    if options.encoding:
+        TEXT_ENCODING = options.encoding
+
     prepare_files()
     run_all_tests(test_options)
 


More information about the Python-checkins mailing list