[pypy-svn] r74980 - in pypy/trunk/pypy/module/bz2: . test
arigo at codespeak.net
arigo at codespeak.net
Mon May 31 20:37:47 CEST 2010
Author: arigo
Date: Mon May 31 20:37:43 2010
New Revision: 74980
Modified:
pypy/trunk/pypy/module/bz2/interp_bz2.py
pypy/trunk/pypy/module/bz2/test/test_large.py
Log:
issue541 resolved
Kill a lot of "approximate code duplication" in interp_bz2,
i.e. a lot of code that does all more or less the same thing
with more or less bugs. Replace it with a single separate
implementation. Fixes the crashes.
Modified: pypy/trunk/pypy/module/bz2/interp_bz2.py
==============================================================================
--- pypy/trunk/pypy/module/bz2/interp_bz2.py (original)
+++ pypy/trunk/pypy/module/bz2/interp_bz2.py Mon May 31 20:37:43 2010
@@ -172,14 +172,58 @@
space.wrap("wrong sequence of bz2 library commands used"))
def _new_buffer_size(current_size):
- if current_size > SMALLCHUNK:
- # keep doubling until we reach BIGCHUNK
- # then keep adding BIGCHUNK
- if current_size <= BIGCHUNK:
- return current_size + current_size
+ # keep doubling until we reach BIGCHUNK; then the buffer size is no
+ # longer increased
+ if current_size < BIGCHUNK:
+ return current_size + current_size
+ return current_size
+
+# ____________________________________________________________
+
+class OutBuffer(object):
+ """Handler for the output buffer. A bit custom code trying to
+ encapsulate the logic of setting up the fields of 'bzs' and
+ allocating raw memory as needed.
+ """
+ def __init__(self, bzs, initial_size=SMALLCHUNK):
+ # when the constructor is called, allocate a piece of memory
+ # of length 'piece_size' and make bzs ready to dump there.
+ self.temp = []
+ self.bzs = bzs
+ self._allocate_chunk(initial_size)
+
+ def _allocate_chunk(self, size):
+ self.raw_buf, self.gc_buf = rffi.alloc_buffer(size)
+ self.current_size = size
+ self.bzs.c_next_out = self.raw_buf
+ rffi.setintfield(self.bzs, 'c_avail_out', size)
+
+ def _get_chunk(self, chunksize):
+ assert 0 <= chunksize <= self.current_size
+ raw_buf = self.raw_buf
+ gc_buf = self.gc_buf
+ s = rffi.str_from_buffer(raw_buf, gc_buf, self.current_size, chunksize)
+ rffi.keep_buffer_alive_until_here(raw_buf, gc_buf)
+ self.current_size = 0
+ return s
+
+ def prepare_next_chunk(self):
+ size = self.current_size
+ self.temp.append(self._get_chunk(size))
+ self._allocate_chunk(_new_buffer_size(size))
+
+ def make_result_string(self):
+ count_unoccupied = rffi.getintfield(self.bzs, 'c_avail_out')
+ s = self._get_chunk(self.current_size - count_unoccupied)
+ if self.temp:
+ self.temp.append(s)
+ return ''.join(self.temp)
else:
- return current_size + BIGCHUNK
- return current_size + SMALLCHUNK
+ return s
+
+ def free(self):
+ if self.current_size > 0:
+ rffi.keep_buffer_alive_until_here(self.raw_buf, self.gc_buf)
# ____________________________________________________________
#
@@ -488,10 +532,7 @@
raise OperationError(self.space.w_ValueError,
self.space.wrap("this object was already flushed"))
- out_bufsize = SMALLCHUNK
- out_buf = lltype.malloc(rffi.CCHARP.TO, out_bufsize, flavor='raw',
- zero=True)
-
+ out = OutBuffer(self.bzs)
in_bufsize = datasize
in_buf = lltype.malloc(rffi.CCHARP.TO, in_bufsize, flavor='raw')
for i in range(datasize):
@@ -501,10 +542,7 @@
self.bzs.c_next_in = in_buf
rffi.setintfield(self.bzs, 'c_avail_in', in_bufsize)
- self.bzs.c_next_out = out_buf
- rffi.setintfield(self.bzs, 'c_avail_out', out_bufsize)
-
- temp = []
+
while True:
bzerror = BZ2_bzCompress(self.bzs, BZ_RUN)
if bzerror != BZ_RUN_OK:
@@ -513,29 +551,13 @@
if rffi.getintfield(self.bzs, 'c_avail_in') == 0:
break
elif rffi.getintfield(self.bzs, 'c_avail_out') == 0:
- total_out = _bzs_total_out(self.bzs)
- data = "".join([out_buf[i] for i in range(total_out)])
- temp.append(data)
-
- out_bufsize = _new_buffer_size(out_bufsize)
- lltype.free(out_buf, flavor='raw')
- out_buf = lltype.malloc(rffi.CCHARP.TO, out_bufsize,
- flavor='raw', zero=True)
- self.bzs.c_next_out = out_buf
- rffi.setintfield(self.bzs, 'c_avail_out', out_bufsize)
-
- if temp:
- total_out = _bzs_total_out(self.bzs)
- data = "".join([out_buf[i] for i in range(total_out - len(temp[0]))])
- temp.append(data)
- return self.space.wrap("".join(temp))
+ out.prepare_next_chunk()
- total_out = _bzs_total_out(self.bzs)
- res = "".join([out_buf[i] for i in range(total_out)])
+ res = out.make_result_string()
return self.space.wrap(res)
finally:
lltype.free(in_buf, flavor='raw')
- lltype.free(out_buf, flavor='raw')
+ out.free()
compress.unwrap_spec = ['self', 'bufferstr']
@@ -545,18 +567,8 @@
self.space.wrap("this object was already flushed"))
self.running = False
- out_bufsize = SMALLCHUNK
- out_buf = lltype.malloc(rffi.CCHARP.TO, out_bufsize, flavor='raw',
- zero=True)
-
+ out = OutBuffer(self.bzs)
try:
-
- self.bzs.c_next_out = out_buf
- rffi.setintfield(self.bzs, 'c_avail_out', out_bufsize)
-
- total_out = _bzs_total_out(self.bzs)
-
- temp = []
while True:
bzerror = BZ2_bzCompress(self.bzs, BZ_FINISH)
if bzerror == BZ_STREAM_END:
@@ -565,30 +577,12 @@
_catch_bz2_error(self.space, bzerror)
if rffi.getintfield(self.bzs, 'c_avail_out') == 0:
- data = "".join([out_buf[i] for i in range(_bzs_total_out(self.bzs))])
- temp.append(data)
-
- out_bufsize = _new_buffer_size(out_bufsize)
- lltype.free(out_buf, flavor='raw')
- out_buf = lltype.malloc(rffi.CCHARP.TO, out_bufsize,
- flavor='raw', zero=True)
- self.bzs.c_next_out = out_buf
- rffi.setintfield(self.bzs, 'c_avail_out', out_bufsize)
-
+ out.prepare_next_chunk()
- if rffi.getintfield(self.bzs, 'c_avail_out'):
- size = _bzs_total_out(self.bzs) - total_out
- res = "".join([out_buf[i] for i in range(size)])
- else:
- total_out = _bzs_total_out(self.bzs)
- res = "".join([out_buf[i] for i in range(total_out)])
- if not temp:
- return self.space.wrap(res)
- else:
- temp.append(res)
- return self.space.wrap("".join(temp))
+ res = out.make_result_string()
+ return self.space.wrap(res)
finally:
- lltype.free(out_buf, flavor='raw')
+ out.free()
flush.unwrap_spec = ['self']
W_BZ2Compressor.typedef = TypeDef("BZ2Compressor",
@@ -653,20 +647,11 @@
for i in range(in_bufsize):
in_buf[i] = data[i]
- out_bufsize = SMALLCHUNK
- out_buf = lltype.malloc(rffi.CCHARP.TO, out_bufsize, flavor='raw',
- zero=True)
-
+ out = OutBuffer(self.bzs)
try:
-
self.bzs.c_next_in = in_buf
rffi.setintfield(self.bzs, 'c_avail_in', in_bufsize)
- self.bzs.c_next_out = out_buf
- rffi.setintfield(self.bzs, 'c_avail_out', out_bufsize)
- total_out = _bzs_total_out(self.bzs)
-
- temp = []
while True:
bzerror = BZ2_bzDecompress(self.bzs)
if bzerror == BZ_STREAM_END:
@@ -681,31 +666,13 @@
if rffi.getintfield(self.bzs, 'c_avail_in') == 0:
break
elif rffi.getintfield(self.bzs, 'c_avail_out') == 0:
- new_total_out = _bzs_total_out(self.bzs)
- extra_out = new_total_out - total_out
- assert out_bufsize >= extra_out
- total_out = new_total_out
- data = "".join([out_buf[i] for i in range(extra_out)])
- temp.append(data)
- lltype.free(out_buf, flavor='raw')
- out_bufsize = _new_buffer_size(out_bufsize)
- out_buf = lltype.malloc(rffi.CCHARP.TO, out_bufsize, flavor='raw')
- self.bzs.c_next_out = out_buf
- rffi.setintfield(self.bzs, 'c_avail_out', out_bufsize)
-
- new_total_out = _bzs_total_out(self.bzs)
- extra_out = new_total_out - total_out
-
- if temp:
- data = "".join([out_buf[i] for i in range(extra_out)])
- temp.append(data)
- return self.space.wrap("".join(temp))
+ out.prepare_next_chunk()
- res = "".join([out_buf[i] for i in range(extra_out)])
+ res = out.make_result_string()
return self.space.wrap(res)
finally:
lltype.free(in_buf, flavor='raw')
- lltype.free(out_buf, flavor='raw')
+ out.free()
decompress.unwrap_spec = ['self', 'bufferstr']
@@ -733,10 +700,8 @@
in_bufsize = len(data)
# conforming to bz2 manual, this is large enough to fit compressed
# data in one shot. We will check it later anyway.
- out_bufsize = in_bufsize + (in_bufsize / 100 + 1) + 600
-
- out_buf = lltype.malloc(rffi.CCHARP.TO, out_bufsize, flavor='raw',
- zero=True)
+ out = OutBuffer(bzs, in_bufsize + (in_bufsize / 100 + 1) + 600)
+
in_buf = lltype.malloc(rffi.CCHARP.TO, in_bufsize, flavor='raw')
for i in range(in_bufsize):
in_buf[i] = data[i]
@@ -744,15 +709,11 @@
try:
bzs.c_next_in = in_buf
rffi.setintfield(bzs, 'c_avail_in', in_bufsize)
- bzs.c_next_out = out_buf
- rffi.setintfield(bzs, 'c_avail_out', out_bufsize)
bzerror = BZ2_bzCompressInit(bzs, compresslevel, 0, 0)
if bzerror != BZ_OK:
_catch_bz2_error(space, bzerror)
-
- total_out = _bzs_total_out(bzs)
- temp = []
+
while True:
bzerror = BZ2_bzCompress(bzs, BZ_FINISH)
if bzerror == BZ_STREAM_END:
@@ -762,32 +723,15 @@
_catch_bz2_error(space, bzerror)
if rffi.getintfield(bzs, 'c_avail_out') == 0:
- data = "".join([out_buf[i] for i in range(_bzs_total_out(bzs))])
- temp.append(data)
+ out.prepare_next_chunk()
- lltype.free(out_buf, flavor='raw')
- out_bufsize = _new_buffer_size(out_bufsize)
- out_buf = lltype.malloc(rffi.CCHARP.TO, out_bufsize,
- flavor='raw', zero=True)
- bzs.c_next_out = out_buf
- rffi.setintfield(bzs, 'c_avail_out', out_bufsize)
-
- if temp:
- res = "".join(temp)
-
- if rffi.getintfield(bzs, 'c_avail_out'):
- size = _bzs_total_out(bzs) - total_out
- res = "".join([out_buf[i] for i in range(size)])
- else:
- total_out = _bzs_total_out(bzs)
- res = "".join([out_buf[i] for i in range(total_out)])
-
+ res = out.make_result_string()
BZ2_bzCompressEnd(bzs)
return space.wrap(res)
finally:
lltype.free(bzs, flavor='raw')
lltype.free(in_buf, flavor='raw')
- lltype.free(out_buf, flavor='raw')
+ out.free()
compress.unwrap_spec = [ObjSpace, 'bufferstr', int]
def decompress(space, data):
@@ -805,22 +749,15 @@
for i in range(in_bufsize):
in_buf[i] = data[i]
- out_bufsize = SMALLCHUNK
- out_buf = lltype.malloc(rffi.CCHARP.TO, out_bufsize, flavor='raw',
- zero=True)
+ out = OutBuffer(bzs)
try:
-
-
bzs.c_next_in = in_buf
rffi.setintfield(bzs, 'c_avail_in', in_bufsize)
- bzs.c_next_out = out_buf
- rffi.setintfield(bzs, 'c_avail_out', out_bufsize)
bzerror = BZ2_bzDecompressInit(bzs, 0, 0)
if bzerror != BZ_OK:
_catch_bz2_error(space, bzerror)
- temp = []
while True:
bzerror = BZ2_bzDecompress(bzs)
if bzerror == BZ_STREAM_END:
@@ -834,29 +771,13 @@
raise OperationError(space.w_ValueError,
space.wrap("couldn't find end of stream"))
elif rffi.getintfield(bzs, 'c_avail_out') == 0:
- total_out = _bzs_total_out(bzs)
- data = "".join([out_buf[i] for i in range(total_out)])
- temp.append(data)
-
- lltype.free(out_buf, flavor='raw')
- out_bufsize = _new_buffer_size(out_bufsize)
- out_buf = lltype.malloc(rffi.CCHARP.TO, out_bufsize,
- flavor='raw', zero=True)
- bzs.c_next_out = out_buf
- rffi.setintfield(bzs, 'c_avail_out', out_bufsize)
-
- total_out = _bzs_total_out(bzs)
- if temp:
- data = "".join([out_buf[i] for i in range(total_out - len(temp[0]))])
- temp.append(data)
- res = "".join(temp)
- else:
- res = "".join([out_buf[i] for i in range(total_out) if out_buf[i] != '\x00'])
-
+ out.prepare_next_chunk()
+
+ res = out.make_result_string()
BZ2_bzDecompressEnd(bzs)
return space.wrap(res)
finally:
lltype.free(bzs, flavor='raw')
- lltype.free(out_buf, flavor='raw')
lltype.free(in_buf, flavor='raw')
+ out.free()
decompress.unwrap_spec = [ObjSpace, 'bufferstr']
Modified: pypy/trunk/pypy/module/bz2/test/test_large.py
==============================================================================
--- pypy/trunk/pypy/module/bz2/test/test_large.py (original)
+++ pypy/trunk/pypy/module/bz2/test/test_large.py Mon May 31 20:37:43 2010
@@ -1,9 +1,11 @@
import py
-from pypy.conftest import gettestobjspace
+from pypy.conftest import gettestobjspace, option
class AppTestBZ2File:
def setup_class(cls):
+ if not option.runappdirect:
+ py.test.skip("skipping this very slow test; try 'pypy-c -A'")
cls.space = gettestobjspace(usemodules=('bz2',))
largetest_bz2 = py.path.local(__file__).dirpath().join("largetest.bz2")
cls.w_compressed_data = cls.space.wrap(largetest_bz2.read())
More information about the Pypy-commit
mailing list