[pypy-commit] pypy py3.5: CPython Issue #22982: Improve BOM handling when seeking to multiple positions of a writable text file.

amauryfa pypy.commits at gmail.com
Tue Apr 18 18:55:36 EDT 2017


Author: Amaury Forgeot d'Arc <amauryfa at gmail.com>
Branch: py3.5
Changeset: r91089:32c611c850d8
Date: 2017-04-18 22:12 +0200
http://bitbucket.org/pypy/pypy/changeset/32c611c850d8/

Log:	CPython Issue #22982: Improve BOM handling when seeking to multiple
	positions of a writable text file.

diff --git a/pypy/module/_io/interp_textio.py b/pypy/module/_io/interp_textio.py
--- a/pypy/module/_io/interp_textio.py
+++ b/pypy/module/_io/interp_textio.py
@@ -878,14 +878,18 @@
                               space.newtuple([space.newbytes(""),
                                               space.newint(cookie.dec_flags)]))
 
-    def _encoder_setstate(self, space, cookie):
-        if cookie.start_pos == 0 and cookie.dec_flags == 0:
+    def _encoder_reset(self, space, start_of_stream):
+        if start_of_stream:
             space.call_method(self.w_encoder, "reset")
             self.encoding_start_of_stream = True
         else:
             space.call_method(self.w_encoder, "setstate", space.newint(0))
             self.encoding_start_of_stream = False
 
+    def _encoder_setstate(self, space, cookie):
+        self._encoder_reset(space,
+                            cookie.start_pos == 0 and cookie.dec_flags == 0)
+
     @unwrap_spec(whence=int)
     def seek_w(self, space, w_pos, whence=0):
         self._check_attached(space)
@@ -913,8 +917,13 @@
             self.snapshot = None
             if self.w_decoder:
                 space.call_method(self.w_decoder, "reset")
-            return space.call_method(self.w_buffer, "seek",
-                                     w_pos, space.newint(whence))
+            w_res = space.call_method(self.w_buffer, "seek",
+                                      w_pos, space.newint(whence))
+            if self.w_encoder:
+                # If seek() == 0, we are at the start of stream
+                start_of_stream = space.eq_w(w_res, space.newint(0))
+                self._encoder_reset(space, start_of_stream)
+            return w_res
 
         elif whence != 0:
             raise oefmt(space.w_ValueError,
diff --git a/pypy/module/_io/test/test_fileio.py b/pypy/module/_io/test/test_fileio.py
--- a/pypy/module/_io/test/test_fileio.py
+++ b/pypy/module/_io/test/test_fileio.py
@@ -302,6 +302,36 @@
             return -1
         raises(ValueError, _io.FileIO, "foo", 'r', opener=opener)
 
+    def test_seek_bom(self):
+        # The BOM is not written again when seeking manually
+        import _io
+        filename = self.tmpfile + '_x3'
+        for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
+            with _io.open(filename, 'w', encoding=charset) as f:
+                f.write('aaa')
+                pos = f.tell()
+            with _io.open(filename, 'r+', encoding=charset) as f:
+                f.seek(pos)
+                f.write('zzz')
+                f.seek(0)
+                f.write('bbb')
+            with _io.open(filename, 'rb') as f:
+                assert f.read() == 'bbbzzz'.encode(charset)
+
+    def test_seek_append_bom(self):
+        # Same test, but first seek to the start and then to the end
+        import _io, os
+        filename = self.tmpfile + '_x3'
+        for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
+            with _io.open(filename, 'w', encoding=charset) as f:
+                f.write('aaa')
+            with _io.open(filename, 'a', encoding=charset) as f:
+                f.seek(0)
+                f.seek(0, os.SEEK_END)
+                f.write('xxx')
+            with _io.open(filename, 'rb') as f:
+                assert f.read() == 'aaaxxx'.encode(charset)
+
 
 def test_flush_at_exit():
     from pypy import conftest


More information about the pypy-commit mailing list