[pypy-commit] pypy default: A quick writing and quick testing of MultibyteStream{Reader, Writer}.
arigo
noreply at buildbot.pypy.org
Mon Aug 1 16:18:41 CEST 2011
Author: Armin Rigo <arigo at tunes.org>
Branch:
Changeset: r46150:69fabe6fa63e
Date: 2011-08-01 14:15 +0200
http://bitbucket.org/pypy/pypy/changeset/69fabe6fa63e/
Log: A quick writing and quick testing of MultibyteStream{Reader,Writer}.
diff --git a/pypy/module/_multibytecodec/app_multibytecodec.py b/pypy/module/_multibytecodec/app_multibytecodec.py
--- a/pypy/module/_multibytecodec/app_multibytecodec.py
+++ b/pypy/module/_multibytecodec/app_multibytecodec.py
@@ -1,34 +1,48 @@
# NOT_RPYTHON
#
-# These classes are not supported so far.
-#
-# My theory is that they are not widely used on CPython either, because
-# I found two bugs just by looking at their .c source: they always call
-# encreset() after a piece of data, even though I think it's wrong ---
-# it should be called only once at the end; and mbiencoder_reset() calls
-# decreset() instead of encreset().
-#
+# The interface here may be a little bit on the lightweight side.
-class MultibyteIncrementalEncoder(object):
- def __init__(self, *args, **kwds):
- raise LookupError(
- "MultibyteIncrementalEncoder not implemented; "
- "see pypy/module/_multibytecodec/app_multibytecodec.py")
+from _multibytecodec import MultibyteIncrementalDecoder
+from _multibytecodec import MultibyteIncrementalEncoder
-class MultibyteIncrementalDecoder(object):
- def __init__(self, *args, **kwds):
- raise LookupError(
- "MultibyteIncrementalDecoder not implemented; "
- "see pypy/module/_multibytecodec/app_multibytecodec.py")
-class MultibyteStreamReader(object):
- def __init__(self, *args, **kwds):
- raise LookupError(
- "MultibyteStreamReader not implemented; "
- "see pypy/module/_multibytecodec/app_multibytecodec.py")
+class MultibyteStreamReader(MultibyteIncrementalDecoder):
+ def __new__(cls, stream, errors=None):
+ self = MultibyteIncrementalDecoder.__new__(cls, errors)
+ self.stream = stream
+ return self
-class MultibyteStreamWriter(object):
- def __init__(self, *args, **kwds):
- raise LookupError(
- "MultibyteStreamWriter not implemented; "
- "see pypy/module/_multibytecodec/app_multibytecodec.py")
+ def __read(self, read, size):
+ while True:
+ if size is None:
+ data = read()
+ else:
+ data = read(size)
+ final = not data
+ output = self.decode(data, final)
+ if output or final:
+ return output
+ size = 1 # read 1 more byte and retry
+
+ def read(self, size=None):
+ return self.__read(self.stream.read, size)
+
+ def readline(self, size=None):
+ return self.__read(self.stream.readline, size)
+
+ def readlines(self, sizehint=None):
+ return self.__read(self.stream.read, sizehint).splitlines(True)
+
+
+class MultibyteStreamWriter(MultibyteIncrementalEncoder):
+ def __new__(cls, stream, errors=None):
+ self = MultibyteIncrementalEncoder.__new__(cls, errors)
+ self.stream = stream
+ return self
+
+ def write(self, data):
+ self.stream.write(self.encode(data))
+
+ def writelines(self, lines):
+ for data in lines:
+ self.write(data)
diff --git a/pypy/module/_multibytecodec/test/test_app_stream.py b/pypy/module/_multibytecodec/test/test_app_stream.py
new file mode 100644
--- /dev/null
+++ b/pypy/module/_multibytecodec/test/test_app_stream.py
@@ -0,0 +1,54 @@
+from pypy.conftest import gettestobjspace
+
+
+class AppTestStreams:
+ def setup_class(cls):
+ cls.space = gettestobjspace(usemodules=['_multibytecodec'])
+ cls.w_HzStreamReader = cls.space.appexec([], """():
+ import _codecs_cn
+ from _multibytecodec import MultibyteStreamReader
+
+ class HzStreamReader(MultibyteStreamReader):
+ codec = _codecs_cn.getcodec('hz')
+
+ return HzStreamReader
+ """)
+ cls.w_HzStreamWriter = cls.space.appexec([], """():
+ import _codecs_cn
+ from _multibytecodec import MultibyteStreamWriter
+
+ class HzStreamWriter(MultibyteStreamWriter):
+ codec = _codecs_cn.getcodec('hz')
+
+ return HzStreamWriter
+ """)
+
+ def test_reader(self):
+ class FakeFile:
+ def __init__(self, data):
+ self.data = data
+ self.pos = 0
+ def read(self, size):
+ res = self.data[self.pos : self.pos + size]
+ self.pos += size
+ return res
+ #
+ r = self.HzStreamReader(FakeFile("!~{abcd~}xyz~{efgh"))
+ for expected in u'!\u5f95\u6c85xyz\u5f50\u73b7':
+ c = r.read(1)
+ assert c == expected
+ c = r.read(1)
+ assert c == ''
+
+ def test_writer(self):
+ class FakeFile:
+ def __init__(self):
+ self.output = []
+ def write(self, data):
+ self.output.append(data)
+ #
+ w = self.HzStreamWriter(FakeFile())
+ for input in u'!\u5f95\u6c85xyz\u5f50\u73b7':
+ w.write(input)
+ assert w.stream.output == ['!', '~{ab~}', '~{cd~}', 'x', 'y', 'z',
+ '~{ef~}', '~{gh~}']
More information about the pypy-commit
mailing list