[pypy-commit] pypy py3.5: merge bz2 and lzma changes

plan_rich pypy.commits at gmail.com
Thu Sep 29 05:01:02 EDT 2016


Author: Richard Plangger <planrichi at gmail.com>
Branch: py3.5
Changeset: r87440:43ca4b14e86e
Date: 2016-09-29 11:00 +0200
http://bitbucket.org/pypy/pypy/changeset/43ca4b14e86e/

Log:	merge bz2 and lzma changes

diff --git a/lib_pypy/_lzma.py b/lib_pypy/_lzma.py
--- a/lib_pypy/_lzma.py
+++ b/lib_pypy/_lzma.py
@@ -445,6 +445,9 @@
         self.eof = False
         self.lzs = _new_lzma_stream()
         self._bufsiz = max(8192, io.DEFAULT_BUFFER_SIZE)
+        self.needs_input = True
+        self._input_buffer = ffi.NULL
+        self._input_buffer_size = 0
 
         if format == FORMAT_AUTO:
             catch_lzma_error(m.lzma_auto_decoder, self.lzs, memlimit, decoder_flags)
@@ -473,9 +476,67 @@
         else:
             raise ValueError("invalid...")
 
-    def decompress(self, data):
+    def pre_decompress_left_data(self, buf, buf_size):
+        # in this case there is data left that needs to be processed before the first
+        # argument can be processed
+
+        lzs = self.lzs
+
+        addr_input_buffer = int(ffi.cast('uintptr_t', self._input_buffer))
+        addr_next_in = int(ffi.cast('uintptr_t', lzs.next_in))
+        avail_now = (addr_input_buffer + self._input_buffer_size) - \
+                    (addr_next_in + lzs.avail_in)
+        avail_total = self._input_buffer_size - lzs.avail_in
+        if avail_total < buf_size:
+            # resize the buffer, it is too small!
+            offset = addr_next_in - addr_input_buffer
+            new_size = self._input_buffer_size + buf_size - avail_now
+            # there is no realloc?
+            tmp = ffi.cast("uint8_t*",m.malloc(new_size))
+            if tmp == ffi.NULL:
+                raise MemoryError
+            ffi.memmove(tmp, lzs.next_in, lzs.avail_in)
+            lzs.next_in = tmp
+            m.free(self._input_buffer)
+            self._input_buffer = tmp
+            self._input_buffer_size = new_size
+        elif avail_now < buf_size:
+            # the buffer is not too small, but we cannot append it!
+            # move all data to the front
+            ffi.memmove(self._input_buffer, lzs.next_in, lzs.avail_in)
+            lzs.next_in = self._input_buffer
+        ffi.memmove(lzs.next_in+lzs.avail_in, buf, buf_size)
+        lzs.avail_in += buf_size
+        return lzs.next_in, lzs.avail_in
+
+    def post_decompress_avail_data(self):
+        lzs = self.lzs
+        # free buffer it is to small
+        if self._input_buffer is not ffi.NULL and \
+           self._input_buffer_size < lzs.avail_in:
+            m.free(self._input_buffer)
+            self._input_buffer = ffi.NONE
+
+        # allocate if necessary
+        if self._input_buffer is ffi.NULL:
+            self._input_buffer = ffi.cast("uint8_t*",m.malloc(lzs.avail_in))
+            if self._input_buffer == ffi.NULL:
+                raise MemoryError
+            self._input_buffer_size = lzs.avail_in
+
+        ffi.memmove(self._input_buffer, lzs.next_in, lzs.avail_in)
+        lzs.next_in = self._input_buffer
+
+    def clear_input_buffer(self):
+        # clean the buffer
+        if self._input_buffer is not ffi.NULL:
+            m.free(self._input_buffer)
+            self._input_buffer = ffi.NULL
+            self._input_buffer_size = 0
+
+    def decompress(self, data, max_length=-1):
         """
-        decompress(data) -> bytes
+        decompress(data, max_length=-1) -> bytes
 
         Provide data to the decompressor object. Returns a chunk of
         decompressed data if possible, or b"" otherwise.
@@ -484,21 +545,53 @@
         reached raises an EOFError. Any data found after the end of the
         stream is ignored, and saved in the unused_data attribute.
         """
+        if not isinstance(max_length, int):
+            raise TypeError("max_length parameter object cannot be interpreted as an integer")
         with self.lock:
             if self.eof:
                 raise EOFError("Already...")
-            return self._decompress(data)
+            lzs = self.lzs
+            data = to_bytes(data)
+            buf = ffi.new('uint8_t[]', data)
+            buf_size = len(data)
 
-    def _decompress(self, data):
+            if lzs.next_in:
+                buf, buf_size = self.pre_decompress_left_data(buf, buf_size)
+                used__input_buffer = True
+            else:
+                lzs.avail_in = buf_size
+                lzs.next_in = ffi.cast("uint8_t*",buf)
+                used__input_buffer = False
+
+            # actual decompression
+            result = self._decompress(buf, buf_size, max_length)
+
+            if self.eof:
+                self.needs_input = False
+                if lzs.avail_in > 0:
+                    self.unused_data = ffi.buffer(lzs.next_in, lzs.avail_in)[:]
+                self.clear_input_buffer()
+            elif lzs.avail_in == 0:
+                # completed successfully!
+                self.needs_input = True
+                lzs.next_in = ffi.NULL
+                self.clear_input_buffer()
+            else:
+                self.needs_input = False
+                if not used__input_buffer:
+                    self.post_decompress_avail_data()
+
+            return result
+
+    def _decompress(self, buf, buf_len, max_length):
         lzs = self.lzs
 
-        # we need in_ so that lzs.next_in doesn't get garbage collected until
-        # in_ goes out of scope
-        data = to_bytes(data)
-        lzs.next_in = in_ = ffi.new('char[]', data)
-        lzs.avail_in = len(data)
+        lzs.next_in = buf
+        lzs.avail_in = buf_len
 
         bufsiz = self._bufsiz
+        if not (max_length < 0 or max_length > io.DEFAULT_BUFFER_SIZE):
+            bufsiz = max_length
 
         lzs.next_out = orig_out = m.malloc(bufsiz)
         if orig_out == ffi.NULL:
@@ -519,13 +612,13 @@
 
                 if ret == m.LZMA_STREAM_END:
                     self.eof = True
-                    if lzs.avail_in > 0:
-                        self.unused_data = ffi.buffer(lzs.next_in, lzs.avail_in)[:]
                     break
                 elif lzs.avail_in == 0:
                     # it ate everything
                     break
                 elif lzs.avail_out == 0:
+                    if data_size == max_length:
+                        break
                     # ran out of space in the output buffer, let's grow it
                     bufsiz += (bufsiz >> 3) + 6
                     next_out = m.realloc(orig_out, bufsiz)
@@ -639,9 +732,9 @@
 
         lzs = self.lzs
 
-        lzs.next_in = input_ = ffi.new('char[]', to_bytes(data))
+        lzs.next_in = input_ = ffi.new('uint8_t[]', to_bytes(data))
         lzs.avail_in = len(data)
-        outs = [ffi.new('char[]', BUFSIZ)]
+        outs = [ffi.new('uint8_t[]', BUFSIZ)]
         lzs.next_out, = outs
         lzs.avail_out = BUFSIZ
 
@@ -658,7 +751,7 @@
                 # ran out of space in the output buffer
                 #siz = (BUFSIZ << 1) + 6
                 siz = 512
-                outs.append(ffi.new('char[]', siz))
+                outs.append(ffi.new('uint8_t[]', siz))
                 lzs.next_out = outs[-1]
                 lzs.avail_out = siz
         last_out = outs.pop()
diff --git a/pypy/module/bz2/interp_bz2.py b/pypy/module/bz2/interp_bz2.py
--- a/pypy/module/bz2/interp_bz2.py
+++ b/pypy/module/bz2/interp_bz2.py
@@ -96,9 +96,11 @@
 BZ_SEQUENCE_ERROR = cConfig.BZ_SEQUENCE_ERROR
 
 if BUFSIZ < 8192:
-    SMALLCHUNK = 8192
+    INITIAL_BUFFER_SIZE = 8192
 else:
-    SMALLCHUNK = BUFSIZ
+    INITIAL_BUFFER_SIZE = 8192
+
+UINT_MAX = 2**32-1
 
 if rffi.sizeof(rffi.INT) > 4:
     BIGCHUNK = 512 * 32
@@ -187,12 +189,21 @@
     encapsulate the logic of setting up the fields of 'bzs' and
     allocating raw memory as needed.
     """
-    def __init__(self, bzs, initial_size=SMALLCHUNK):
+    def __init__(self, bzs, initial_size=INITIAL_BUFFER_SIZE, max_length=-1):
         # when the constructor is called, allocate a piece of memory
         # of length 'piece_size' and make bzs ready to dump there.
         self.temp = []
         self.bzs = bzs
-        self._allocate_chunk(initial_size)
+        self.max_length = max_length
+        if max_length < 0 or max_length >= initial_size:
+            size = initial_size
+        else:
+            size = max_length
+        self._allocate_chunk(size)
+        self.left = 0
+
+    def get_data_size(self):
+        return self.current_size - rffi.getintfield(self.bzs, 'c_avail_out')
 
     def _allocate_chunk(self, size):
         self.raw_buf, self.gc_buf, self.case_num = rffi.alloc_buffer(size)
@@ -214,7 +225,10 @@
     def prepare_next_chunk(self):
         size = self.current_size
         self.temp.append(self._get_chunk(size))
-        self._allocate_chunk(_new_buffer_size(size))
+        newsize = size
+        if self.max_length == -1:
+            newsize = _new_buffer_size(size)
+        self._allocate_chunk(newsize)
 
     def make_result_string(self):
         count_unoccupied = rffi.getintfield(self.bzs, 'c_avail_out')
@@ -357,7 +371,6 @@
     W_BZ2Decompressor.__init__(x, space)
     return space.wrap(x)
 
-
 class W_BZ2Decompressor(W_Root):
     """BZ2Decompressor() -> decompressor object
 
@@ -372,6 +385,9 @@
         try:
             self.running = False
             self.unused_data = ""
+            self.needs_input = True
+            self.input_buffer = ""
+            self.left_to_process = 0
 
             self._init_bz2decomp()
         except:
@@ -397,15 +413,56 @@
     def descr_getstate(self):
         raise oefmt(self.space.w_TypeError, "cannot serialize '%T' object", self)
 
+    def needs_input_w(self, space):
+        """ True if more input is needed before more decompressed
+            data can be produced. """
+        return space.wrap(self.needs_input)
+
     def eof_w(self, space):
         if self.running:
             return space.w_False
         else:
             return space.w_True
 
-    @unwrap_spec(data='bufferstr')
-    def decompress(self, data):
-        """decompress(data) -> string
+    def _decompress_buf(self, data, max_length):
+        in_bufsize = len(data)
+        with rffi.scoped_nonmovingbuffer(data) as in_buf:
+            # setup the input and the size it can consume
+            self.bzs.c_next_in = in_buf
+            rffi.setintfield(self.bzs, 'c_avail_in', in_bufsize)
+
+            with OutBuffer(self.bzs, max_length=max_length) as out:
+                while True:
+                    bzreturn = BZ2_bzDecompress(self.bzs)
+                    # add up the size that has not been processed
+                    avail_in = rffi.getintfield(self.bzs, 'c_avail_in')
+                    self.left_to_process = avail_in
+                    if bzreturn == BZ_STREAM_END:
+                        self.running = False
+                        break
+                    if bzreturn != BZ_OK:
+                        _catch_bz2_error(self.space, bzreturn)
+
+                    if self.left_to_process == 0:
+                        break
+                    elif rffi.getintfield(self.bzs, 'c_avail_out') == 0:
+                        if out.get_data_size() == max_length:
+                            break
+                        out.prepare_next_chunk()
+
+                if not self.running:
+                    self.needs_input = False
+                    if self.left_to_process != 0:
+                        end = len(data)
+                        start = end - self.left_to_process
+                        assert start > 0
+                        self.unused_data = data[start:]
+                res = out.make_result_string()
+                return self.space.newbytes(res)
+
+    @unwrap_spec(data='bufferstr', max_length=int)
+    def decompress(self, data, max_length=-1):
+        """decompress(data, max_length=-1) -> bytes
 
         Provide more data to the decompressor object. It will return chunks
         of decompressed data whenever possible. If you try to decompress data
@@ -416,37 +473,30 @@
         if not self.running:
             raise oefmt(self.space.w_EOFError,
                         "end of stream was already found")
-        if data == '':
-            return self.space.newbytes('')
+        datalen = len(data)
+        if len(self.input_buffer) > 0:
+            input_buffer_in_use = True
+            data = self.input_buffer + data
+            datalen = len(data)
+            result = self._decompress_buf(data, max_length)
+        else:
+            input_buffer_in_use = False
+            result = self._decompress_buf(data, max_length)
 
-        in_bufsize = len(data)
+        if self.left_to_process == 0:
+            self.input_buffer = ""
+            self.needs_input = True
+        else:
+            self.needs_input = False
+            if not input_buffer_in_use:
+                start = datalen-self.left_to_process
+                assert start > 0
+                self.input_buffer = data[start:]
 
-        with rffi.scoped_nonmovingbuffer(data) as in_buf:
-            self.bzs.c_next_in = in_buf
-            rffi.setintfield(self.bzs, 'c_avail_in', in_bufsize)
+        return result
 
-            with OutBuffer(self.bzs) as out:
-                while True:
-                    bzerror = BZ2_bzDecompress(self.bzs)
-                    if bzerror == BZ_STREAM_END:
-                        if rffi.getintfield(self.bzs, 'c_avail_in') != 0:
-                            unused = [self.bzs.c_next_in[i]
-                                      for i in range(
-                                          rffi.getintfield(self.bzs,
-                                                           'c_avail_in'))]
-                            self.unused_data = "".join(unused)
-                        self.running = False
-                        break
-                    if bzerror != BZ_OK:
-                        _catch_bz2_error(self.space, bzerror)
 
-                    if rffi.getintfield(self.bzs, 'c_avail_in') == 0:
-                        break
-                    elif rffi.getintfield(self.bzs, 'c_avail_out') == 0:
-                        out.prepare_next_chunk()
 
-                res = out.make_result_string()
-                return self.space.newbytes(res)
 
 
 W_BZ2Decompressor.typedef = TypeDef("_bz2.BZ2Decompressor",
@@ -456,5 +506,6 @@
     unused_data = interp_attrproperty_bytes("unused_data", W_BZ2Decompressor),
     eof = GetSetProperty(W_BZ2Decompressor.eof_w),
     decompress = interp2app(W_BZ2Decompressor.decompress),
+    needs_input = GetSetProperty(W_BZ2Decompressor.needs_input_w),
 )
 W_BZ2Decompressor.typedef.acceptable_as_base_class = False
diff --git a/pypy/module/bz2/test/test_bz2_compdecomp.py b/pypy/module/bz2/test/test_bz2_compdecomp.py
--- a/pypy/module/bz2/test/test_bz2_compdecomp.py
+++ b/pypy/module/bz2/test/test_bz2_compdecomp.py
@@ -1,6 +1,8 @@
 import os
 
 import py
+import glob
+import bz2
 
 from pypy.module.bz2.test.support import CheckAllocation
 from pypy.module.bz2 import interp_bz2
@@ -34,11 +36,11 @@
     mod.decompress = decompress
     #
     # For tests, patch the value of SMALLCHUNK
-    mod.OLD_SMALLCHUNK = interp_bz2.SMALLCHUNK
-    interp_bz2.SMALLCHUNK = 32
+    mod.OLD_SMALLCHUNK = interp_bz2.INITIAL_BUFFER_SIZE
+    interp_bz2.INITIAL_BUFFER_SIZE = 32
 
 def teardown_module(mod):
-    interp_bz2.SMALLCHUNK = mod.OLD_SMALLCHUNK
+    interp_bz2.INITIAL_BUFFER_SIZE = mod.OLD_SMALLCHUNK
 
 class AppTestBZ2Compressor(CheckAllocation):
     spaceconfig = dict(usemodules=('bz2', 'time', 'struct'))
@@ -200,6 +202,22 @@
         exc = raises(TypeError, pickle.dumps, BZ2Decompressor())
         assert exc.value.args[0] == "cannot serialize '_bz2.BZ2Decompressor' object"
 
+    def test_decompress_max_length(self):
+        from bz2 import BZ2Decompressor
+
+        bz2d = BZ2Decompressor()
+        decomp= []
+
+        length = len(self.DATA)
+        decomp.append(bz2d.decompress(self.DATA, max_length=100))
+        assert len(decomp[-1]) == 100
+
+        while not bz2d.eof:
+            decomp.append(bz2d.decompress(b"", max_length=50))
+            assert len(decomp[-1]) <= 50
+
+        assert b''.join(decomp) == self.TEXT
+
 
 class AppTestBZ2ModuleFunctions(CheckAllocation):
     spaceconfig = dict(usemodules=('bz2', 'time'))
diff --git a/pypy/module/bz2/test/test_bz2_file.py b/pypy/module/bz2/test/test_bz2_file.py
--- a/pypy/module/bz2/test/test_bz2_file.py
+++ b/pypy/module/bz2/test/test_bz2_file.py
@@ -256,7 +256,6 @@
         self.create_temp_file()
 
         bz2f = BZ2File(self.temppath)
-        raises(TypeError, bz2f.read, None)
         text_read = bz2f.read()
         assert text_read == self.TEXT
         bz2f.close()
diff --git a/pypy/tool/build_cffi_imports.py b/pypy/tool/build_cffi_imports.py
--- a/pypy/tool/build_cffi_imports.py
+++ b/pypy/tool/build_cffi_imports.py
@@ -44,7 +44,10 @@
     return failures
 
 if __name__ == '__main__':
-    import py, os
+    # NOTE: it does not work to execute this file to rebuild the cffi backends
+    # for pypy3. This script is python 2! Thus you can specify
+    # exefile as an argument to still be able to run this script with a pypy2 vm
+    import py, os, argparse
     if '__pypy__' not in sys.builtin_module_names:
         print 'Call with a pypy interpreter'
         sys.exit(-1)
@@ -52,8 +55,15 @@
     class Options(object):
         pass
 
-    exename = py.path.local(sys.executable) 
+    parser = argparse.ArgumentParser(description='Build all cffi backends in lib_pypy')
+    parser.add_argument('--exefile', dest='exefile', default=sys.executable,
+                        help='instead of executing sys.executable' \
+                             ' you can specify an alternative pypy vm here')
+    args = parser.parse_args()
+
+    exename = py.path.local(args.exefile)
     basedir = exename
+
     while not basedir.join('include').exists():
         _basedir = basedir.dirpath()
         if _basedir == basedir:
diff --git a/rpython/tool/runsubprocess.py b/rpython/tool/runsubprocess.py
--- a/rpython/tool/runsubprocess.py
+++ b/rpython/tool/runsubprocess.py
@@ -8,10 +8,15 @@
 import os
 from subprocess import PIPE, Popen
 
+PY3  = sys.version_info[0] >= 3
+
 def run_subprocess(executable, args, env=None, cwd=None):
     if isinstance(args, list):
-        args = [a.encode('latin1') if isinstance(a, unicode) else a
-                for a in args]
+        if PY3:
+            args = [a for a in args]
+        else:
+            args = [a.encode('latin1') if isinstance(a, unicode) else a
+                    for a in args]
     return _run(executable, args, env, cwd)
 
 shell_default = False
@@ -83,7 +88,7 @@
 
     def _run(*args):
         try:
-            _child.stdin.write('%r\n' % (args,))
+            _child.stdin.write(b'%r\n' % (args,))
         except (OSError, IOError):
             # lost the child.  Try again...
             spawn_subprocess()


More information about the pypy-commit mailing list