[pypy-commit] pypy py3.5: merge bz2 and lzma changes
plan_rich
pypy.commits at gmail.com
Thu Sep 29 05:01:02 EDT 2016
Author: Richard Plangger <planrichi at gmail.com>
Branch: py3.5
Changeset: r87440:43ca4b14e86e
Date: 2016-09-29 11:00 +0200
http://bitbucket.org/pypy/pypy/changeset/43ca4b14e86e/
Log: merge bz2 and lzma changes
diff --git a/lib_pypy/_lzma.py b/lib_pypy/_lzma.py
--- a/lib_pypy/_lzma.py
+++ b/lib_pypy/_lzma.py
@@ -445,6 +445,9 @@
self.eof = False
self.lzs = _new_lzma_stream()
self._bufsiz = max(8192, io.DEFAULT_BUFFER_SIZE)
+ self.needs_input = True
+ self._input_buffer = ffi.NULL
+ self._input_buffer_size = 0
if format == FORMAT_AUTO:
catch_lzma_error(m.lzma_auto_decoder, self.lzs, memlimit, decoder_flags)
@@ -473,9 +476,67 @@
else:
raise ValueError("invalid...")
- def decompress(self, data):
+ def pre_decompress_left_data(self, buf, buf_size):
+ # in this case there is data left that needs to be processed before the first
+ # argument can be processed
+
+ lzs = self.lzs
+
+ addr_input_buffer = int(ffi.cast('uintptr_t', self._input_buffer))
+ addr_next_in = int(ffi.cast('uintptr_t', lzs.next_in))
+ avail_now = (addr_input_buffer + self._input_buffer_size) - \
+ (addr_next_in + lzs.avail_in)
+ avail_total = self._input_buffer_size - lzs.avail_in
+ if avail_total < buf_size:
+ # resize the buffer, it is too small!
+ offset = addr_next_in - addr_input_buffer
+ new_size = self._input_buffer_size + buf_size - avail_now
+ # there is no realloc?
+ tmp = ffi.cast("uint8_t*",m.malloc(new_size))
+ if tmp == ffi.NULL:
+ raise MemoryError
+ ffi.memmove(tmp, lzs.next_in, lzs.avail_in)
+ lzs.next_in = tmp
+ m.free(self._input_buffer)
+ self._input_buffer = tmp
+ self._input_buffer_size = new_size
+ elif avail_now < buf_size:
+ # the buffer is not too small, but we cannot append it!
+ # move all data to the front
+ ffi.memmove(self._input_buffer, lzs.next_in, lzs.avail_in)
+ lzs.next_in = self._input_buffer
+ ffi.memmove(lzs.next_in+lzs.avail_in, buf, buf_size)
+ lzs.avail_in += buf_size
+ return lzs.next_in, lzs.avail_in
+
+ def post_decompress_avail_data(self):
+ lzs = self.lzs
+ # free buffer it is to small
+ if self._input_buffer is not ffi.NULL and \
+ self._input_buffer_size < lzs.avail_in:
+ m.free(self._input_buffer)
+ self._input_buffer = ffi.NONE
+
+ # allocate if necessary
+ if self._input_buffer is ffi.NULL:
+ self._input_buffer = ffi.cast("uint8_t*",m.malloc(lzs.avail_in))
+ if self._input_buffer == ffi.NULL:
+ raise MemoryError
+ self._input_buffer_size = lzs.avail_in
+
+ ffi.memmove(self._input_buffer, lzs.next_in, lzs.avail_in)
+ lzs.next_in = self._input_buffer
+
+ def clear_input_buffer(self):
+ # clean the buffer
+ if self._input_buffer is not ffi.NULL:
+ m.free(self._input_buffer)
+ self._input_buffer = ffi.NULL
+ self._input_buffer_size = 0
+
+ def decompress(self, data, max_length=-1):
"""
- decompress(data) -> bytes
+ decompress(data, max_length=-1) -> bytes
Provide data to the decompressor object. Returns a chunk of
decompressed data if possible, or b"" otherwise.
@@ -484,21 +545,53 @@
reached raises an EOFError. Any data found after the end of the
stream is ignored, and saved in the unused_data attribute.
"""
+ if not isinstance(max_length, int):
+ raise TypeError("max_length parameter object cannot be interpreted as an integer")
with self.lock:
if self.eof:
raise EOFError("Already...")
- return self._decompress(data)
+ lzs = self.lzs
+ data = to_bytes(data)
+ buf = ffi.new('uint8_t[]', data)
+ buf_size = len(data)
- def _decompress(self, data):
+ if lzs.next_in:
+ buf, buf_size = self.pre_decompress_left_data(buf, buf_size)
+ used__input_buffer = True
+ else:
+ lzs.avail_in = buf_size
+ lzs.next_in = ffi.cast("uint8_t*",buf)
+ used__input_buffer = False
+
+ # actual decompression
+ result = self._decompress(buf, buf_size, max_length)
+
+ if self.eof:
+ self.needs_input = False
+ if lzs.avail_in > 0:
+ self.unused_data = ffi.buffer(lzs.next_in, lzs.avail_in)[:]
+ self.clear_input_buffer()
+ elif lzs.avail_in == 0:
+ # completed successfully!
+ self.needs_input = True
+ lzs.next_in = ffi.NULL
+ self.clear_input_buffer()
+ else:
+ self.needs_input = False
+ if not used__input_buffer:
+ self.post_decompress_avail_data()
+
+ return result
+
+ def _decompress(self, buf, buf_len, max_length):
lzs = self.lzs
- # we need in_ so that lzs.next_in doesn't get garbage collected until
- # in_ goes out of scope
- data = to_bytes(data)
- lzs.next_in = in_ = ffi.new('char[]', data)
- lzs.avail_in = len(data)
+ lzs.next_in = buf
+ lzs.avail_in = buf_len
bufsiz = self._bufsiz
+ if not (max_length < 0 or max_length > io.DEFAULT_BUFFER_SIZE):
+ bufsiz = max_length
lzs.next_out = orig_out = m.malloc(bufsiz)
if orig_out == ffi.NULL:
@@ -519,13 +612,13 @@
if ret == m.LZMA_STREAM_END:
self.eof = True
- if lzs.avail_in > 0:
- self.unused_data = ffi.buffer(lzs.next_in, lzs.avail_in)[:]
break
elif lzs.avail_in == 0:
# it ate everything
break
elif lzs.avail_out == 0:
+ if data_size == max_length:
+ break
# ran out of space in the output buffer, let's grow it
bufsiz += (bufsiz >> 3) + 6
next_out = m.realloc(orig_out, bufsiz)
@@ -639,9 +732,9 @@
lzs = self.lzs
- lzs.next_in = input_ = ffi.new('char[]', to_bytes(data))
+ lzs.next_in = input_ = ffi.new('uint8_t[]', to_bytes(data))
lzs.avail_in = len(data)
- outs = [ffi.new('char[]', BUFSIZ)]
+ outs = [ffi.new('uint8_t[]', BUFSIZ)]
lzs.next_out, = outs
lzs.avail_out = BUFSIZ
@@ -658,7 +751,7 @@
# ran out of space in the output buffer
#siz = (BUFSIZ << 1) + 6
siz = 512
- outs.append(ffi.new('char[]', siz))
+ outs.append(ffi.new('uint8_t[]', siz))
lzs.next_out = outs[-1]
lzs.avail_out = siz
last_out = outs.pop()
diff --git a/pypy/module/bz2/interp_bz2.py b/pypy/module/bz2/interp_bz2.py
--- a/pypy/module/bz2/interp_bz2.py
+++ b/pypy/module/bz2/interp_bz2.py
@@ -96,9 +96,11 @@
BZ_SEQUENCE_ERROR = cConfig.BZ_SEQUENCE_ERROR
if BUFSIZ < 8192:
- SMALLCHUNK = 8192
+ INITIAL_BUFFER_SIZE = 8192
else:
- SMALLCHUNK = BUFSIZ
+ INITIAL_BUFFER_SIZE = 8192
+
+UINT_MAX = 2**32-1
if rffi.sizeof(rffi.INT) > 4:
BIGCHUNK = 512 * 32
@@ -187,12 +189,21 @@
encapsulate the logic of setting up the fields of 'bzs' and
allocating raw memory as needed.
"""
- def __init__(self, bzs, initial_size=SMALLCHUNK):
+ def __init__(self, bzs, initial_size=INITIAL_BUFFER_SIZE, max_length=-1):
# when the constructor is called, allocate a piece of memory
# of length 'piece_size' and make bzs ready to dump there.
self.temp = []
self.bzs = bzs
- self._allocate_chunk(initial_size)
+ self.max_length = max_length
+ if max_length < 0 or max_length >= initial_size:
+ size = initial_size
+ else:
+ size = max_length
+ self._allocate_chunk(size)
+ self.left = 0
+
+ def get_data_size(self):
+ return self.current_size - rffi.getintfield(self.bzs, 'c_avail_out')
def _allocate_chunk(self, size):
self.raw_buf, self.gc_buf, self.case_num = rffi.alloc_buffer(size)
@@ -214,7 +225,10 @@
def prepare_next_chunk(self):
size = self.current_size
self.temp.append(self._get_chunk(size))
- self._allocate_chunk(_new_buffer_size(size))
+ newsize = size
+ if self.max_length == -1:
+ newsize = _new_buffer_size(size)
+ self._allocate_chunk(newsize)
def make_result_string(self):
count_unoccupied = rffi.getintfield(self.bzs, 'c_avail_out')
@@ -357,7 +371,6 @@
W_BZ2Decompressor.__init__(x, space)
return space.wrap(x)
-
class W_BZ2Decompressor(W_Root):
"""BZ2Decompressor() -> decompressor object
@@ -372,6 +385,9 @@
try:
self.running = False
self.unused_data = ""
+ self.needs_input = True
+ self.input_buffer = ""
+ self.left_to_process = 0
self._init_bz2decomp()
except:
@@ -397,15 +413,56 @@
def descr_getstate(self):
raise oefmt(self.space.w_TypeError, "cannot serialize '%T' object", self)
+ def needs_input_w(self, space):
+ """ True if more input is needed before more decompressed
+ data can be produced. """
+ return space.wrap(self.needs_input)
+
def eof_w(self, space):
if self.running:
return space.w_False
else:
return space.w_True
- @unwrap_spec(data='bufferstr')
- def decompress(self, data):
- """decompress(data) -> string
+ def _decompress_buf(self, data, max_length):
+ in_bufsize = len(data)
+ with rffi.scoped_nonmovingbuffer(data) as in_buf:
+ # setup the input and the size it can consume
+ self.bzs.c_next_in = in_buf
+ rffi.setintfield(self.bzs, 'c_avail_in', in_bufsize)
+
+ with OutBuffer(self.bzs, max_length=max_length) as out:
+ while True:
+ bzreturn = BZ2_bzDecompress(self.bzs)
+ # add up the size that has not been processed
+ avail_in = rffi.getintfield(self.bzs, 'c_avail_in')
+ self.left_to_process = avail_in
+ if bzreturn == BZ_STREAM_END:
+ self.running = False
+ break
+ if bzreturn != BZ_OK:
+ _catch_bz2_error(self.space, bzreturn)
+
+ if self.left_to_process == 0:
+ break
+ elif rffi.getintfield(self.bzs, 'c_avail_out') == 0:
+ if out.get_data_size() == max_length:
+ break
+ out.prepare_next_chunk()
+
+ if not self.running:
+ self.needs_input = False
+ if self.left_to_process != 0:
+ end = len(data)
+ start = end - self.left_to_process
+ assert start > 0
+ self.unused_data = data[start:]
+ res = out.make_result_string()
+ return self.space.newbytes(res)
+
+ @unwrap_spec(data='bufferstr', max_length=int)
+ def decompress(self, data, max_length=-1):
+ """decompress(data, max_length=-1) -> bytes
Provide more data to the decompressor object. It will return chunks
of decompressed data whenever possible. If you try to decompress data
@@ -416,37 +473,30 @@
if not self.running:
raise oefmt(self.space.w_EOFError,
"end of stream was already found")
- if data == '':
- return self.space.newbytes('')
+ datalen = len(data)
+ if len(self.input_buffer) > 0:
+ input_buffer_in_use = True
+ data = self.input_buffer + data
+ datalen = len(data)
+ result = self._decompress_buf(data, max_length)
+ else:
+ input_buffer_in_use = False
+ result = self._decompress_buf(data, max_length)
- in_bufsize = len(data)
+ if self.left_to_process == 0:
+ self.input_buffer = ""
+ self.needs_input = True
+ else:
+ self.needs_input = False
+ if not input_buffer_in_use:
+ start = datalen-self.left_to_process
+ assert start > 0
+ self.input_buffer = data[start:]
- with rffi.scoped_nonmovingbuffer(data) as in_buf:
- self.bzs.c_next_in = in_buf
- rffi.setintfield(self.bzs, 'c_avail_in', in_bufsize)
+ return result
- with OutBuffer(self.bzs) as out:
- while True:
- bzerror = BZ2_bzDecompress(self.bzs)
- if bzerror == BZ_STREAM_END:
- if rffi.getintfield(self.bzs, 'c_avail_in') != 0:
- unused = [self.bzs.c_next_in[i]
- for i in range(
- rffi.getintfield(self.bzs,
- 'c_avail_in'))]
- self.unused_data = "".join(unused)
- self.running = False
- break
- if bzerror != BZ_OK:
- _catch_bz2_error(self.space, bzerror)
- if rffi.getintfield(self.bzs, 'c_avail_in') == 0:
- break
- elif rffi.getintfield(self.bzs, 'c_avail_out') == 0:
- out.prepare_next_chunk()
- res = out.make_result_string()
- return self.space.newbytes(res)
W_BZ2Decompressor.typedef = TypeDef("_bz2.BZ2Decompressor",
@@ -456,5 +506,6 @@
unused_data = interp_attrproperty_bytes("unused_data", W_BZ2Decompressor),
eof = GetSetProperty(W_BZ2Decompressor.eof_w),
decompress = interp2app(W_BZ2Decompressor.decompress),
+ needs_input = GetSetProperty(W_BZ2Decompressor.needs_input_w),
)
W_BZ2Decompressor.typedef.acceptable_as_base_class = False
diff --git a/pypy/module/bz2/test/test_bz2_compdecomp.py b/pypy/module/bz2/test/test_bz2_compdecomp.py
--- a/pypy/module/bz2/test/test_bz2_compdecomp.py
+++ b/pypy/module/bz2/test/test_bz2_compdecomp.py
@@ -1,6 +1,8 @@
import os
import py
+import glob
+import bz2
from pypy.module.bz2.test.support import CheckAllocation
from pypy.module.bz2 import interp_bz2
@@ -34,11 +36,11 @@
mod.decompress = decompress
#
# For tests, patch the value of SMALLCHUNK
- mod.OLD_SMALLCHUNK = interp_bz2.SMALLCHUNK
- interp_bz2.SMALLCHUNK = 32
+ mod.OLD_SMALLCHUNK = interp_bz2.INITIAL_BUFFER_SIZE
+ interp_bz2.INITIAL_BUFFER_SIZE = 32
def teardown_module(mod):
- interp_bz2.SMALLCHUNK = mod.OLD_SMALLCHUNK
+ interp_bz2.INITIAL_BUFFER_SIZE = mod.OLD_SMALLCHUNK
class AppTestBZ2Compressor(CheckAllocation):
spaceconfig = dict(usemodules=('bz2', 'time', 'struct'))
@@ -200,6 +202,22 @@
exc = raises(TypeError, pickle.dumps, BZ2Decompressor())
assert exc.value.args[0] == "cannot serialize '_bz2.BZ2Decompressor' object"
+ def test_decompress_max_length(self):
+ from bz2 import BZ2Decompressor
+
+ bz2d = BZ2Decompressor()
+ decomp= []
+
+ length = len(self.DATA)
+ decomp.append(bz2d.decompress(self.DATA, max_length=100))
+ assert len(decomp[-1]) == 100
+
+ while not bz2d.eof:
+ decomp.append(bz2d.decompress(b"", max_length=50))
+ assert len(decomp[-1]) <= 50
+
+ assert b''.join(decomp) == self.TEXT
+
class AppTestBZ2ModuleFunctions(CheckAllocation):
spaceconfig = dict(usemodules=('bz2', 'time'))
diff --git a/pypy/module/bz2/test/test_bz2_file.py b/pypy/module/bz2/test/test_bz2_file.py
--- a/pypy/module/bz2/test/test_bz2_file.py
+++ b/pypy/module/bz2/test/test_bz2_file.py
@@ -256,7 +256,6 @@
self.create_temp_file()
bz2f = BZ2File(self.temppath)
- raises(TypeError, bz2f.read, None)
text_read = bz2f.read()
assert text_read == self.TEXT
bz2f.close()
diff --git a/pypy/tool/build_cffi_imports.py b/pypy/tool/build_cffi_imports.py
--- a/pypy/tool/build_cffi_imports.py
+++ b/pypy/tool/build_cffi_imports.py
@@ -44,7 +44,10 @@
return failures
if __name__ == '__main__':
- import py, os
+ # NOTE: it does not work to execute this file to rebuild the cffi backends
+ # for pypy3. This script is python 2! Thus you can specify
+ # exefile as an argument to still be able to run this script with a pypy2 vm
+ import py, os, argparse
if '__pypy__' not in sys.builtin_module_names:
print 'Call with a pypy interpreter'
sys.exit(-1)
@@ -52,8 +55,15 @@
class Options(object):
pass
- exename = py.path.local(sys.executable)
+ parser = argparse.ArgumentParser(description='Build all cffi backends in lib_pypy')
+ parser.add_argument('--exefile', dest='exefile', default=sys.executable,
+ help='instead of executing sys.executable' \
+ ' you can specify an alternative pypy vm here')
+ args = parser.parse_args()
+
+ exename = py.path.local(args.exefile)
basedir = exename
+
while not basedir.join('include').exists():
_basedir = basedir.dirpath()
if _basedir == basedir:
diff --git a/rpython/tool/runsubprocess.py b/rpython/tool/runsubprocess.py
--- a/rpython/tool/runsubprocess.py
+++ b/rpython/tool/runsubprocess.py
@@ -8,10 +8,15 @@
import os
from subprocess import PIPE, Popen
+PY3 = sys.version_info[0] >= 3
+
def run_subprocess(executable, args, env=None, cwd=None):
if isinstance(args, list):
- args = [a.encode('latin1') if isinstance(a, unicode) else a
- for a in args]
+ if PY3:
+ args = [a for a in args]
+ else:
+ args = [a.encode('latin1') if isinstance(a, unicode) else a
+ for a in args]
return _run(executable, args, env, cwd)
shell_default = False
@@ -83,7 +88,7 @@
def _run(*args):
try:
- _child.stdin.write('%r\n' % (args,))
+ _child.stdin.write(b'%r\n' % (args,))
except (OSError, IOError):
# lost the child. Try again...
spawn_subprocess()
More information about the pypy-commit
mailing list