[Python-checkins] cpython (3.5): Issue #28664: test_bz2 now works on non-Windows platforms without bunzip2

serhiy.storchaka python-checkins at python.org
Fri Nov 11 10:12:21 EST 2016


https://hg.python.org/cpython/rev/648cd8450f4f
changeset:   105059:648cd8450f4f
branch:      3.5
parent:      105052:db220f2df5a9
user:        Serhiy Storchaka <storchaka at gmail.com>
date:        Fri Nov 11 17:10:24 2016 +0200
summary:
  Issue #28664: test_bz2 now works on non-Windows platforms without bunzip2
(e.g. on Android).

files:
  Lib/test/test_bz2.py |  59 ++++++++++++++-----------------
  1 files changed, 26 insertions(+), 33 deletions(-)


diff --git a/Lib/test/test_bz2.py b/Lib/test/test_bz2.py
--- a/Lib/test/test_bz2.py
+++ b/Lib/test/test_bz2.py
@@ -7,6 +7,7 @@
 import pickle
 import glob
 import random
+import shutil
 import subprocess
 import sys
 from test.support import unlink
@@ -21,6 +22,16 @@
 bz2 = support.import_module('bz2')
 from bz2 import BZ2File, BZ2Compressor, BZ2Decompressor
 
+has_cmdline_bunzip2 = None
+
+def ext_decompress(data):
+    global has_cmdline_bunzip2
+    if has_cmdline_bunzip2 is None:
+        has_cmdline_bunzip2 = bool(shutil.which('bunzip2'))
+    if has_cmdline_bunzip2:
+        return subprocess.check_output(['bunzip2'], input=data)
+    else:
+        return bz2.decompress(data)
 
 class BaseTest(unittest.TestCase):
     "Base for other testcases."
@@ -73,24 +84,6 @@
         if os.path.isfile(self.filename):
             os.unlink(self.filename)
 
-    if sys.platform == "win32":
-        # bunzip2 isn't available to run on Windows.
-        def decompress(self, data):
-            return bz2.decompress(data)
-    else:
-        def decompress(self, data):
-            pop = subprocess.Popen("bunzip2", shell=True,
-                                   stdin=subprocess.PIPE,
-                                   stdout=subprocess.PIPE,
-                                   stderr=subprocess.STDOUT)
-            pop.stdin.write(data)
-            pop.stdin.close()
-            ret = pop.stdout.read()
-            pop.stdout.close()
-            if pop.wait() != 0:
-                ret = bz2.decompress(data)
-            return ret
-
 
 class BZ2FileTest(BaseTest):
     "Test the BZ2File class."
@@ -251,7 +244,7 @@
             self.assertRaises(TypeError, bz2f.write)
             bz2f.write(self.TEXT)
         with open(self.filename, 'rb') as f:
-            self.assertEqual(self.decompress(f.read()), self.TEXT)
+            self.assertEqual(ext_decompress(f.read()), self.TEXT)
 
     def testWriteChunks10(self):
         with BZ2File(self.filename, "w") as bz2f:
@@ -263,7 +256,7 @@
                 bz2f.write(str)
                 n += 1
         with open(self.filename, 'rb') as f:
-            self.assertEqual(self.decompress(f.read()), self.TEXT)
+            self.assertEqual(ext_decompress(f.read()), self.TEXT)
 
     def testWriteNonDefaultCompressLevel(self):
         expected = bz2.compress(self.TEXT, compresslevel=5)
@@ -280,7 +273,7 @@
         # should raise an exception.
         self.assertRaises(ValueError, bz2f.writelines, ["a"])
         with open(self.filename, 'rb') as f:
-            self.assertEqual(self.decompress(f.read()), self.TEXT)
+            self.assertEqual(ext_decompress(f.read()), self.TEXT)
 
     def testWriteMethodsOnReadOnlyFile(self):
         with BZ2File(self.filename, "w") as bz2f:
@@ -298,7 +291,7 @@
             self.assertRaises(TypeError, bz2f.write)
             bz2f.write(self.TEXT)
         with open(self.filename, 'rb') as f:
-            self.assertEqual(self.decompress(f.read()), self.TEXT * 2)
+            self.assertEqual(ext_decompress(f.read()), self.TEXT * 2)
 
     def testSeekForward(self):
         self.createTempFile()
@@ -594,7 +587,7 @@
             with BZ2File(bio, "w") as bz2f:
                 self.assertRaises(TypeError, bz2f.write)
                 bz2f.write(self.TEXT)
-            self.assertEqual(self.decompress(bio.getvalue()), self.TEXT)
+            self.assertEqual(ext_decompress(bio.getvalue()), self.TEXT)
             self.assertFalse(bio.closed)
 
     def testSeekForwardBytesIO(self):
@@ -631,7 +624,7 @@
         self.assertRaises(TypeError, bz2c.compress)
         data = bz2c.compress(self.TEXT)
         data += bz2c.flush()
-        self.assertEqual(self.decompress(data), self.TEXT)
+        self.assertEqual(ext_decompress(data), self.TEXT)
 
     def testCompressEmptyString(self):
         bz2c = BZ2Compressor()
@@ -650,7 +643,7 @@
             data += bz2c.compress(str)
             n += 1
         data += bz2c.flush()
-        self.assertEqual(self.decompress(data), self.TEXT)
+        self.assertEqual(ext_decompress(data), self.TEXT)
 
     @bigmemtest(size=_4G + 100, memuse=2)
     def testCompress4G(self, size):
@@ -830,7 +823,7 @@
 class CompressDecompressTest(BaseTest):
     def testCompress(self):
         data = bz2.compress(self.TEXT)
-        self.assertEqual(self.decompress(data), self.TEXT)
+        self.assertEqual(ext_decompress(data), self.TEXT)
 
     def testCompressEmptyString(self):
         text = bz2.compress(b'')
@@ -880,14 +873,14 @@
             with self.open(self.filename, mode) as f:
                 f.write(self.TEXT)
             with open(self.filename, "rb") as f:
-                file_data = self.decompress(f.read())
+                file_data = ext_decompress(f.read())
                 self.assertEqual(file_data, self.TEXT)
             with self.open(self.filename, "rb") as f:
                 self.assertEqual(f.read(), self.TEXT)
             with self.open(self.filename, "ab") as f:
                 f.write(self.TEXT)
             with open(self.filename, "rb") as f:
-                file_data = self.decompress(f.read())
+                file_data = ext_decompress(f.read())
                 self.assertEqual(file_data, self.TEXT * 2)
 
     def test_implicit_binary_modes(self):
@@ -898,14 +891,14 @@
             with self.open(self.filename, mode) as f:
                 f.write(self.TEXT)
             with open(self.filename, "rb") as f:
-                file_data = self.decompress(f.read())
+                file_data = ext_decompress(f.read())
                 self.assertEqual(file_data, self.TEXT)
             with self.open(self.filename, "r") as f:
                 self.assertEqual(f.read(), self.TEXT)
             with self.open(self.filename, "a") as f:
                 f.write(self.TEXT)
             with open(self.filename, "rb") as f:
-                file_data = self.decompress(f.read())
+                file_data = ext_decompress(f.read())
                 self.assertEqual(file_data, self.TEXT * 2)
 
     def test_text_modes(self):
@@ -917,14 +910,14 @@
             with self.open(self.filename, mode) as f:
                 f.write(text)
             with open(self.filename, "rb") as f:
-                file_data = self.decompress(f.read()).decode("ascii")
+                file_data = ext_decompress(f.read()).decode("ascii")
                 self.assertEqual(file_data, text_native_eol)
             with self.open(self.filename, "rt") as f:
                 self.assertEqual(f.read(), text)
             with self.open(self.filename, "at") as f:
                 f.write(text)
             with open(self.filename, "rb") as f:
-                file_data = self.decompress(f.read()).decode("ascii")
+                file_data = ext_decompress(f.read()).decode("ascii")
                 self.assertEqual(file_data, text_native_eol * 2)
 
     def test_x_mode(self):
@@ -965,7 +958,7 @@
         with self.open(self.filename, "wt", encoding="utf-16-le") as f:
             f.write(text)
         with open(self.filename, "rb") as f:
-            file_data = self.decompress(f.read()).decode("utf-16-le")
+            file_data = ext_decompress(f.read()).decode("utf-16-le")
             self.assertEqual(file_data, text_native_eol)
         with self.open(self.filename, "rt", encoding="utf-16-le") as f:
             self.assertEqual(f.read(), text)

-- 
Repository URL: https://hg.python.org/cpython


More information about the Python-checkins mailing list