[Python-3000-checkins] r54740 - in python/branches/p3yk: Lib/io.py Lib/test/test_io.py Modules/_fileio.c

guido.van.rossum python-3000-checkins at python.org
Tue Apr 10 23:07:04 CEST 2007


Author: guido.van.rossum
Date: Tue Apr 10 23:06:59 2007
New Revision: 54740

Modified:
   python/branches/p3yk/Lib/io.py
   python/branches/p3yk/Lib/test/test_io.py
   python/branches/p3yk/Modules/_fileio.c
Log:
truncate() returns the new size and position.
write() returns the number of bytes/characters written/buffered.
FileIO.close() calls self.flush().
Implement readinto() for buffered readers.
Tests th check all these.
Test proper behavior of __enter__/__exit__.


Modified: python/branches/p3yk/Lib/io.py
==============================================================================
--- python/branches/p3yk/Lib/io.py	(original)
+++ python/branches/p3yk/Lib/io.py	Tue Apr 10 23:06:59 2007
@@ -177,10 +177,11 @@
         """tell() -> int.  Return current stream position."""
         return self.seek(0, 1)
 
-    def truncate(self, pos: int = None) -> None:
-        """truncate(size: int = None) -> None. Truncate file to size bytes.
+    def truncate(self, pos: int = None) -> int:
+        """truncate(size: int = None) -> int. Truncate file to size bytes.
 
         Size defaults to the current IO position as reported by tell().
+        Returns the new size.
         """
         self._unsupported("truncate")
 
@@ -329,6 +330,10 @@
     would be hard to do since _fileio.c is written in C).
     """
 
+    def close(self):
+        _fileio._FileIO.close(self)
+        RawIOBase.close(self)
+
 
 class SocketIO(RawIOBase):
 
@@ -413,7 +418,10 @@
         Raises BlockingIOError if the underlying raw stream has no
         data at the moment.
         """
-        self._unsupported("readinto")
+        data = self.read(len(b))
+        n = len(data)
+        b[:n] = data
+        return n
 
     def write(self, b: bytes) -> int:
         """write(b: bytes) -> int.  Write the given buffer to the IO stream.
@@ -448,7 +456,7 @@
         return self.raw.tell()
 
     def truncate(self, pos=None):
-        self.raw.truncate(pos)
+        return self.raw.truncate(pos)
 
     ### Flush and close ###
 
@@ -503,12 +511,6 @@
         self._pos = newpos
         return b
 
-    def readinto(self, b):
-        tmp = self.read(len(b))
-        n = len(tmp)
-        b[:n] = tmp
-        return n
-
     def write(self, b):
         n = len(b)
         newpos = self._pos + n
@@ -536,6 +538,7 @@
         else:
             self._pos = max(0, pos)
         del self._buffer[pos:]
+        return pos
 
     def readable(self):
         return True
@@ -652,7 +655,6 @@
 
     def write(self, b):
         # XXX we can implement some more tricks to try and avoid partial writes
-        ##assert issubclass(type(b), bytes)
         if len(self._write_buf) > self.buffer_size:
             # We're full, so let's pre-flush the buffer
             try:
@@ -672,6 +674,7 @@
                     overage = len(self._write_buf) - self.max_buffer_size
                     self._write_buf = self._write_buf[:self.max_buffer_size]
                     raise BlockingIOError(e.errno, e.strerror, overage)
+        return len(b)
 
     def flush(self):
         written = 0

Modified: python/branches/p3yk/Lib/test/test_io.py
==============================================================================
--- python/branches/p3yk/Lib/test/test_io.py	(original)
+++ python/branches/p3yk/Lib/test/test_io.py	Tue Apr 10 23:06:59 2007
@@ -79,17 +79,19 @@
         test_support.unlink(test_support.TESTFN)
 
     def write_ops(self, f):
-        f.write(b"blah.")
-        f.seek(0)
-        f.write(b"Hello.")
+        self.assertEqual(f.write(b"blah."), 5)
+        self.assertEqual(f.seek(0), 0)
+        self.assertEqual(f.write(b"Hello."), 6)
         self.assertEqual(f.tell(), 6)
-        f.seek(-1, 1)
+        self.assertEqual(f.seek(-1, 1), 5)
         self.assertEqual(f.tell(), 5)
-        f.write(" world\n\n\n")
-        f.seek(0)
-        f.write("h")
-        f.seek(-2, 2)
-        f.truncate()
+        self.assertEqual(f.write(" world\n\n\n"), 9)
+        self.assertEqual(f.seek(0), 0)
+        self.assertEqual(f.write("h"), 1)
+        self.assertEqual(f.seek(-1, 2), 13)
+        self.assertEqual(f.tell(), 13)
+        self.assertEqual(f.truncate(12), 12)
+        self.assertEqual(f.tell(), 12)
 
     LARGE = 2**31
 
@@ -101,10 +103,10 @@
         self.assertEqual(f.write(b"xxx"), 3)
         self.assertEqual(f.tell(), self.LARGE + 3)
         self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
-        f.truncate()
+        self.assertEqual(f.truncate(), self.LARGE + 2)
         self.assertEqual(f.tell(), self.LARGE + 2)
         self.assertEqual(f.seek(0, 2), self.LARGE + 2)
-        f.truncate(self.LARGE + 1)
+        self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
         self.assertEqual(f.tell(), self.LARGE + 1)
         self.assertEqual(f.seek(0, 2), self.LARGE + 1)
         self.assertEqual(f.seek(-1, 2), self.LARGE)
@@ -142,6 +144,20 @@
         self.read_ops(f)
         f.close()
 
+    def test_buffered_file_io(self):
+        f = io.open(test_support.TESTFN, "wb")
+        self.assertEqual(f.readable(), False)
+        self.assertEqual(f.writable(), True)
+        self.assertEqual(f.seekable(), True)
+        self.write_ops(f)
+        f.close()
+        f = io.open(test_support.TESTFN, "rb")
+        self.assertEqual(f.readable(), True)
+        self.assertEqual(f.writable(), False)
+        self.assertEqual(f.seekable(), True)
+        self.read_ops(f)
+        f.close()
+
     def test_raw_bytes_io(self):
         f = io.BytesIO()
         self.write_ops(f)
@@ -163,9 +179,52 @@
                 print("Use 'regrtest.py -u largefile test_io' to run it.",
                       file=sys.stderr)
                 return
-        f = io.open(test_support.TESTFN, "w+b", buffering=0)
+        f = io.open(test_support.TESTFN, "w+b", 0)
         self.large_file_ops(f)
         f.close()
+        f = io.open(test_support.TESTFN, "w+b")
+        self.large_file_ops(f)
+        f.close()
+
+    def test_with_open(self):
+        for bufsize in (0, 1, 100):
+            f = None
+            with open(test_support.TESTFN, "wb", bufsize) as f:
+                f.write("xxx")
+            self.assertEqual(f.closed, True)
+            f = None
+            try:
+                with open(test_support.TESTFN, "wb", bufsize) as f:
+                    1/0
+            except ZeroDivisionError:
+                self.assertEqual(f.closed, True)
+            else:
+                self.fail("1/0 didn't raise an exception")
+
+    def test_destructor(self):
+        record = []
+        class MyFileIO(io.FileIO):
+            def __del__(self):
+                record.append(1)
+                io.FileIO.__del__(self)
+            def close(self):
+                record.append(2)
+                io.FileIO.close(self)
+            def flush(self):
+                record.append(3)
+                io.FileIO.flush(self)
+        f = MyFileIO(test_support.TESTFN, "w")
+        f.write("xxx")
+        del f
+        self.assertEqual(record, [1, 2, 3])
+
+    def test_close_flushes(self):
+        f = io.open(test_support.TESTFN, "wb")
+        f.write("xxx")
+        f.close()
+        f = io.open(test_support.TESTFN, "rb")
+        self.assertEqual(f.read(), b"xxx")
+        f.close()
 
 
 class MemorySeekTestMixin:

Modified: python/branches/p3yk/Modules/_fileio.c
==============================================================================
--- python/branches/p3yk/Modules/_fileio.c	(original)
+++ python/branches/p3yk/Modules/_fileio.c	Tue Apr 10 23:06:59 2007
@@ -530,6 +530,9 @@
 	if (!PyArg_ParseTuple(args, "|O", &posobj))
 		return NULL;
 
+	if (posobj == Py_None)
+		posobj = NULL;
+
 	if (posobj == NULL)
 		whence = 1;
 	else
@@ -545,19 +548,22 @@
 	pos = PyLong_Check(posobj) ?
 		PyLong_AsLongLong(posobj) : PyInt_AsLong(posobj);
 #endif
-	Py_DECREF(posobj);
-	if (PyErr_Occurred())
+	if (PyErr_Occurred()) {
+		Py_DECREF(posobj);
 		return NULL;
+	}
 
 	Py_BEGIN_ALLOW_THREADS
 	errno = 0;
 	pos = ftruncate(fd, pos);
 	Py_END_ALLOW_THREADS
 
-	if (errno < 0)
+	if (pos < 0) {
+		Py_DECREF(posobj);
 		PyErr_SetFromErrno(PyExc_IOError);
+	}
 
-	Py_RETURN_NONE;
+	return posobj;
 }
 
 static char *


More information about the Python-3000-checkins mailing list