[pypy-commit] pypy py3.5: CPython Issue #22982: Improve BOM handling when seeking to multiple positions of a writable text file.
amauryfa
pypy.commits at gmail.com
Tue Apr 18 18:55:36 EDT 2017
Author: Amaury Forgeot d'Arc <amauryfa at gmail.com>
Branch: py3.5
Changeset: r91089:32c611c850d8
Date: 2017-04-18 22:12 +0200
http://bitbucket.org/pypy/pypy/changeset/32c611c850d8/
Log: CPython Issue #22982: Improve BOM handling when seeking to multiple
positions of a writable text file.
diff --git a/pypy/module/_io/interp_textio.py b/pypy/module/_io/interp_textio.py
--- a/pypy/module/_io/interp_textio.py
+++ b/pypy/module/_io/interp_textio.py
@@ -878,14 +878,18 @@
space.newtuple([space.newbytes(""),
space.newint(cookie.dec_flags)]))
- def _encoder_setstate(self, space, cookie):
- if cookie.start_pos == 0 and cookie.dec_flags == 0:
+ def _encoder_reset(self, space, start_of_stream):
+ if start_of_stream:
space.call_method(self.w_encoder, "reset")
self.encoding_start_of_stream = True
else:
space.call_method(self.w_encoder, "setstate", space.newint(0))
self.encoding_start_of_stream = False
+ def _encoder_setstate(self, space, cookie):
+ self._encoder_reset(space,
+ cookie.start_pos == 0 and cookie.dec_flags == 0)
+
@unwrap_spec(whence=int)
def seek_w(self, space, w_pos, whence=0):
self._check_attached(space)
@@ -913,8 +917,13 @@
self.snapshot = None
if self.w_decoder:
space.call_method(self.w_decoder, "reset")
- return space.call_method(self.w_buffer, "seek",
- w_pos, space.newint(whence))
+ w_res = space.call_method(self.w_buffer, "seek",
+ w_pos, space.newint(whence))
+ if self.w_encoder:
+ # If seek() == 0, we are at the start of stream
+ start_of_stream = space.eq_w(w_res, space.newint(0))
+ self._encoder_reset(space, start_of_stream)
+ return w_res
elif whence != 0:
raise oefmt(space.w_ValueError,
diff --git a/pypy/module/_io/test/test_fileio.py b/pypy/module/_io/test/test_fileio.py
--- a/pypy/module/_io/test/test_fileio.py
+++ b/pypy/module/_io/test/test_fileio.py
@@ -302,6 +302,36 @@
return -1
raises(ValueError, _io.FileIO, "foo", 'r', opener=opener)
+ def test_seek_bom(self):
+ # The BOM is not written again when seeking manually
+ import _io
+ filename = self.tmpfile + '_x3'
+ for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
+ with _io.open(filename, 'w', encoding=charset) as f:
+ f.write('aaa')
+ pos = f.tell()
+ with _io.open(filename, 'r+', encoding=charset) as f:
+ f.seek(pos)
+ f.write('zzz')
+ f.seek(0)
+ f.write('bbb')
+ with _io.open(filename, 'rb') as f:
+ assert f.read() == 'bbbzzz'.encode(charset)
+
+ def test_seek_append_bom(self):
+ # Same test, but first seek to the start and then to the end
+ import _io, os
+ filename = self.tmpfile + '_x3'
+ for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
+ with _io.open(filename, 'w', encoding=charset) as f:
+ f.write('aaa')
+ with _io.open(filename, 'a', encoding=charset) as f:
+ f.seek(0)
+ f.seek(0, os.SEEK_END)
+ f.write('xxx')
+ with _io.open(filename, 'rb') as f:
+ assert f.read() == 'aaaxxx'.encode(charset)
+
def test_flush_at_exit():
from pypy import conftest
More information about the pypy-commit
mailing list