[pypy-svn] r29973 - in pypy/dist/pypy/module/mmap: . test
rhymes at codespeak.net
rhymes at codespeak.net
Wed Jul 12 12:15:14 CEST 2006
Author: rhymes
Date: Wed Jul 12 12:15:10 2006
New Revision: 29973
Added:
pypy/dist/pypy/module/mmap/
pypy/dist/pypy/module/mmap/__init__.py
pypy/dist/pypy/module/mmap/app_mmap.py
pypy/dist/pypy/module/mmap/interp_mmap.py
pypy/dist/pypy/module/mmap/test/
pypy/dist/pypy/module/mmap/test/test_mmap.py
Log:
scaffolding for mmap module. exported all the constants also.
Added: pypy/dist/pypy/module/mmap/__init__.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/module/mmap/__init__.py Wed Jul 12 12:15:10 2006
@@ -0,0 +1,26 @@
+from pypy.interpreter.mixedmodule import MixedModule
+
+class Module(MixedModule):
+ interpleveldefs = {
+ 'PAGESIZE': 'interp_mmap.PAGESIZE',
+ }
+
+ appleveldefs = {
+ 'ACCESS_READ': 'app_mmap.ACCESS_READ',
+ 'ACCESS_WRITE': 'app_mmap.ACCESS_WRITE',
+ 'ACCESS_COPY': 'app_mmap.ACCESS_COPY',
+ 'error': 'app_mmap.error'
+ }
+
+ def buildloaders(cls):
+ from pypy.module.mmap import interp_mmap
+
+ Module.interpleveldefs["PAGESIZE"] = 'space.wrap(%r)' %\
+ interp_mmap._get_page_size()
+
+ for constant, value in interp_mmap.constants.iteritems():
+ Module.interpleveldefs[constant] = "space.wrap(%r)" % value
+
+ super(Module, cls).buildloaders()
+ buildloaders = classmethod(buildloaders)
+
Added: pypy/dist/pypy/module/mmap/app_mmap.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/module/mmap/app_mmap.py Wed Jul 12 12:15:10 2006
@@ -0,0 +1,5 @@
+ACCESS_READ = 1
+ACCESS_WRITE = 2
+ACCESS_COPY = 3
+
+error = EnvironmentError
Added: pypy/dist/pypy/module/mmap/interp_mmap.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/module/mmap/interp_mmap.py Wed Jul 12 12:15:10 2006
@@ -0,0 +1,58 @@
+from pypy.rpython.rctypes.tool import ctypes_platform
+from pypy.rpython.rctypes.tool.libc import libc
+import pypy.rpython.rctypes.implementation # this defines rctypes magic
+from pypy.rpython.rctypes.aerrno import geterrno
+from pypy.interpreter.error import OperationError
+from pypy.interpreter.baseobjspace import W_Root, ObjSpace
+from ctypes import *
+import sys
+import os
+import platform
+
+_POSIX = os.name == "posix"
+_MS_WINDOWS = os.name == "nt"
+_FREEBSD = "freebsd" in sys.platform
+_64BIT = "64bit" in platform.architecture()[0]
+
+class CConfig:
+ _header_ = """
+ #include <sys/mman.h>
+ """
+
+# constants, look in sys/mman.h and platform docs for the meaning
+# some constants are linux only so they will be correctly exposed outside
+# depending on the OS
+constants = {}
+constant_names = ['MAP_SHARED', 'MAP_PRIVATE', 'MAP_ANON', 'MAP_ANONYMOUS',
+ 'PROT_READ', 'PROT_WRITE', 'PROT_EXEC', 'MAP_DENYWRITE', 'MAP_EXECUTABLE']
+for name in constant_names:
+ setattr(CConfig, name, ctypes_platform.DefinedConstantInteger(name))
+
+class cConfig:
+ pass
+
+cConfig.__dict__.update(ctypes_platform.configure(CConfig))
+
+# needed to export the constants inside and outside. see __init__.py
+for name in constant_names:
+ value = getattr(cConfig, name)
+ if value is not None:
+ constants[name] = value
+
+# MAP_ANONYMOUS is not always present but it's always available at CPython level
+if cConfig.MAP_ANONYMOUS is None:
+ cConfig.MAP_ANONYMOUS = cConfig.MAP_ANON
+ constants["MAP_ANONYMOUS"] = cConfig.MAP_ANON
+
+locals().update(constants)
+
+_MS_SYNC = ctypes_platform.DefinedConstantInteger("MS_SYNC")
+_ACCESS_DEFAULT = 0
+
+if _POSIX:
+ def _get_page_size():
+ return libc.getpagesize()
+
+ def _get_error_msg():
+ errno = geterrno()
+ return libc.strerror(errno)
Added: pypy/dist/pypy/module/mmap/test/test_mmap.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/module/mmap/test/test_mmap.py Wed Jul 12 12:15:10 2006
@@ -0,0 +1,484 @@
+from py.test import raises, skip
+from pypy.conftest import gettestobjspace
+
+class AppTestMMap:
+ def setup_class(cls):
+ space = gettestobjspace(usemodules=('mmap',))
+ cls.space = space
+
+ def test_page_size(self):
+ import mmap
+ assert mmap.PAGESIZE > 0
+ assert isinstance(mmap.PAGESIZE, int)
+
+ def test_attributes(self):
+ import mmap
+ assert isinstance(mmap.ACCESS_READ, int)
+ assert isinstance(mmap.ACCESS_WRITE, int)
+ assert isinstance(mmap.ACCESS_COPY, int)
+ assert isinstance(mmap.MAP_ANON, int)
+ assert isinstance(mmap.MAP_ANONYMOUS, int)
+ assert isinstance(mmap.MAP_PRIVATE, int)
+ assert isinstance(mmap.MAP_SHARED, int)
+ assert isinstance(mmap.PROT_EXEC, int)
+ assert isinstance(mmap.PROT_READ, int)
+ assert isinstance(mmap.PROT_WRITE, int)
+
+ assert mmap.error is EnvironmentError
+
+ # def test_args(self):
+ # raises(TypeError, mmap, "foo")
+ # raises(TypeError, mmap, 0, "foo")
+ #
+ # if _POSIX:
+ # py.test.raises(ValueError, mmap, 0, 1, 2, 3, 4, 5)
+ # py.test.raises(TypeError, mmap, 0, 1, 2, 3, "foo", 5)
+ # py.test.raises(TypeError, mmap, 0, 1, foo="foo")
+ # py.test.raises(TypeError, mmap, 0, -1)
+ # py.test.raises(OverflowError, mmap, 0, sys.maxint)
+ # py.test.raises(ValueError, mmap, 0, 1, flags=2, access=3)
+ # py.test.raises(ValueError, mmap, 0, 1, access=123)
+ # elif _MS_WINDOWS:
+ # py.test.raises(TypeError, mmap, 0, 1, 2, 3, 4)
+ # py.test.raises(TypeError, mmap, 0, 1, tagname=123)
+ # py.test.raises(TypeError, mmap, 0, 1, access="foo")
+ # py.test.raises(ValueError, mmap, 0, 1, access=-1)
+#
+#
+# def test_file_size(self):
+# if _MS_WINDOWS:
+# py.test.skip("Only Unix checks file size")
+# self.f.write("c")
+# self.f.flush()
+# py.test.raises(ValueError, mmap, self.f.fileno(), 123)
+#
+# def test_mmap_creation(self):
+# self.f.write("c")
+# self.f.flush()
+# m = mmap(self.f.fileno(), 1)
+# assert m._data[0] == "c"
+# m.close()
+#
+# def test_close(self):
+# self.f.write("c")
+# self.f.flush()
+# m = mmap(self.f.fileno(), 1)
+# m.close()
+# assert m._data == None
+# if _MS_WINDOWS:
+# assert m._map_handle.value == cmmap._INVALID_HANDLE_VALUE
+# assert m._file_handle.value == cmmap._INVALID_HANDLE_VALUE
+# elif _POSIX:
+# assert m._fd == -1
+# py.test.raises(ValueError, m._check_valid)
+#
+# def test_read_byte(self):
+# self.f.write("c")
+# self.f.flush()
+# m = mmap(self.f.fileno(), 1)
+# assert m.read_byte() == "c"
+# py.test.raises(ValueError, m.read_byte)
+# m.close()
+#
+# def test_readline(self):
+# self.f.seek(0)
+# self.f.write("foo\n")
+# self.f.flush()
+# m = mmap(self.f.fileno(), 4)
+# if _MS_WINDOWS:
+# # windows replaces \n with \r. it's time to change to \n only MS!
+# assert m.readline() == "foo\r"
+# elif _POSIX:
+# assert m.readline() == "foo\n"
+# assert m.readline() == ""
+# m.close()
+#
+# def test_read(self):
+# self.f.seek(0)
+# self.f.write("foobar")
+# self.f.flush()
+# m = mmap(self.f.fileno(), 6)
+# py.test.raises(TypeError, m.read, "foo")
+# assert m.read(1) == "f"
+# assert m.read(6) == "oobar"
+# assert m.read(1) == ""
+# m.close()
+#
+# def test_find(self):
+# self.f.seek(0)
+# self.f.write("foobar\0")
+# self.f.flush()
+# m = mmap(self.f.fileno(), 7)
+# py.test.raises(TypeError, m.find, 123)
+# py.test.raises(TypeError, m.find, "foo", "baz")
+# assert m.find("b") == 3
+# assert m.find("z") == -1
+# assert m.find("o", 5) == -1
+# assert m.find("ob") == 2
+# assert m.find("\0") == 6
+# m.close()
+#
+# def test_is_modifiable(self):
+# self.f.seek(0)
+# self.f.write("foobar")
+# self.f.flush()
+# m = mmap(self.f.fileno(), 6, access=cmmap.ACCESS_READ)
+# py.test.raises(TypeError, m._check_writeable)
+# py.test.raises(TypeError, m._check_resizeable)
+# m.close()
+#
+# def test_seek(self):
+# self.f.seek(0)
+# self.f.write("foobar")
+# self.f.flush()
+# m = mmap(self.f.fileno(), 6)
+# py.test.raises(TypeError, m.seek, "foo")
+# py.test.raises(TypeError, m.seek, 0, "foo")
+# py.test.raises(ValueError, m.seek, -1, 0)
+# py.test.raises(ValueError, m.seek, -1, 1)
+# py.test.raises(ValueError, m.seek, -7, 2)
+# py.test.raises(ValueError, m.seek, 1, 3)
+# py.test.raises(ValueError, m.seek, 10)
+# m.seek(0)
+# assert m._pos == 0
+# m.read(1)
+# m.seek(1, 1)
+# assert m._pos == 2
+# m.seek(0)
+# m.seek(-1, 2)
+# assert m._pos == 5
+#
+# def test_write(self):
+# self.f.seek(0)
+# self.f.write("foobar")
+# self.f.flush()
+# m = mmap(self.f.fileno(), 6, access=cmmap.ACCESS_READ)
+# py.test.raises(TypeError, m.write, "foo")
+# m = mmap(self.f.fileno(), 6, access=cmmap.ACCESS_WRITE)
+# py.test.raises(TypeError, m.write, 123)
+# py.test.raises(ValueError, m.write, "c"*10)
+# m.write("ciao\n")
+# m.seek(0)
+# assert m.read(6) == "ciao\nr"
+# m.close()
+#
+# def test_write_byte(self):
+# self.f.seek(0)
+# self.f.write("foobar")
+# self.f.flush()
+# m = mmap(self.f.fileno(), 6, access=cmmap.ACCESS_READ)
+# py.test.raises(TypeError, m.write_byte, "foo")
+# m = mmap(self.f.fileno(), 6, access=cmmap.ACCESS_WRITE)
+# py.test.raises(TypeError, m.write_byte, 123)
+# py.test.raises(TypeError, m.write_byte, "ab")
+# m.write_byte("x")
+# m.seek(0)
+# assert m.read(6) == "xoobar"
+# m.close()
+#
+# def test_size(self):
+# self.f.seek(0)
+# self.f.write("foobar")
+# self.f.flush()
+# m = mmap(self.f.fileno(), 5)
+# assert m.size() > m._size
+# m.close()
+#
+# def test_tell(self):
+# m = mmap(self.f.fileno(), 1)
+# assert m.tell() >= 0
+# m.close()
+#
+# def test_flush(self):
+# self.f.seek(0)
+# self.f.write("foobar")
+# m = mmap(self.f.fileno(), 6)
+# py.test.raises(TypeError, m.flush, 1, 2, 3)
+# py.test.raises(TypeError, m.flush, 1, "a")
+# py.test.raises(ValueError, m.flush, 0, 99)
+# assert m.flush() == 0
+# m.close()
+#
+# def test_move(self):
+# self.f.seek(0)
+# self.f.write("foobar")
+# m = mmap(self.f.fileno(), 6, access=cmmap.ACCESS_READ)
+# py.test.raises(TypeError, m.move, 1)
+# py.test.raises(TypeError, m.move, 1, "foo", 2)
+# m = mmap(self.f.fileno(), 6, access=cmmap.ACCESS_WRITE)
+# py.test.raises(ValueError, m.move, 7, 1, 2)
+# py.test.raises(ValueError, m.move, 1, 7, 2)
+# m.move(1, 3, 3)
+# assert m.read(6) == "fbarar"
+# m.seek(0)
+# m.move(1, 3, 2)
+# a = m.read(6)
+# assert a == "frarar"
+# m.close()
+#
+# def test_resize(self):
+# if "darwin" in sys.platform or _FREEBSD:
+# py.test.skip("resize does not work under OSX or FreeBSD")
+# self.f = open(filename, "w+")
+# self.f.write("foobar")
+# m = mmap(self.f.fileno(), 6, access=cmmap.ACCESS_READ)
+# py.test.raises(TypeError, m.resize, 1)
+# m = mmap(self.f.fileno(), 6, access=cmmap.ACCESS_COPY)
+# py.test.raises(TypeError, m.resize, 1)
+# m = mmap(self.f.fileno(), 6, access=cmmap.ACCESS_WRITE)
+# f_size = os.fstat(self.f.fileno()).st_size
+# assert m.size() == f_size == 6
+# m.resize(10)
+# f_size = os.fstat(self.f.fileno()).st_size
+# assert m.size() == f_size == 10
+#
+# def test_len(self):
+# m = mmap(self.f.fileno(), 6)
+# assert len(m) == 6
+#
+# def test_get_item(self):
+# self.f.seek(0)
+# m = mmap(self.f.fileno(), 6)
+# f = lambda: m["foo"]
+# py.test.raises(TypeError, f)
+# f = lambda: m[-7]
+# py.test.raises(IndexError, f)
+# assert m[0] == 'f'
+# assert m[-1] == 'r'
+# sl = slice(1, 2)
+# assert m[sl] == 'o'
+#
+# def test_set_item(self):
+# self.f.seek(0)
+# m = mmap(self.f.fileno(), 6, access=cmmap.ACCESS_READ)
+# def f(m): m[1] = 'a'
+# py.test.raises(TypeError, f, m)
+# m = mmap(self.f.fileno(), 6, access=cmmap.ACCESS_WRITE)
+# def f(m): m["foo"] = 'a'
+# py.test.raises(TypeError, f, m)
+# def f(m): m[-7] = 'a'
+# py.test.raises(IndexError, f, m)
+# def f(m): m[0] = 'ab'
+# py.test.raises(IndexError, f, m)
+# def f(m): m[1:3] = u'xx'
+# py.test.raises(IndexError, f, m)
+# def f(m): m[1:4] = "zz"
+# py.test.raises(IndexError, f, m)
+# def f(m): m[1:6] = "z" * 6
+# py.test.raises(IndexError, f, m)
+# def f(m): m[:2] = "z" * 5
+# m[1:3] = 'xx'
+# assert m.read(6) == "fxxbar"
+# m.seek(0)
+# m[0] = 'x'
+# assert m[0] == 'x'
+# m[-6] = 'y'
+# assert m.read(6) == "yxxbar"
+#
+# def test_del_item(self):
+# m = mmap(self.f.fileno(), 6)
+# def f(m): del m["foo"]
+# py.test.raises(TypeError, f, m)
+# def f(m): del m[1:3]
+# py.test.raises(TypeError, f, m)
+# def f(m): del m[1]
+# py.test.raises(TypeError, f, m)
+#
+# def test_concatenation(self):
+# m = mmap(self.f.fileno(), 6)
+# f = lambda: m + 1
+# py.test.raises(SystemError, f)
+# def f(m): m += 1
+# py.test.raises(SystemError, f, m)
+# f = lambda: 1 + m
+# py.test.raises(TypeError, f)
+#
+# def test_repeatition(self):
+# m = mmap(self.f.fileno(), 6)
+# f = lambda: m * 1
+# py.test.raises(SystemError, f)
+# def f(m):
+# m *= 1
+# py.test.raises(SystemError, f, m)
+# f = lambda: 1 * m
+# py.test.raises(TypeError, f)
+#
+# def test_slicing(self):
+# self.f.seek(0)
+# m = mmap(self.f.fileno(), 6)
+# assert m[-3:7] == "bar"
+#
+# def test_all():
+# # this is a global test, ported from test_mmap.py
+#
+# f = open(filename, "w+")
+#
+# # write 2 pages worth of data to the file
+# f.write('\0' * PAGESIZE)
+# f.write('foo')
+# f.write('\0' * (PAGESIZE - 3))
+# f.flush()
+# m = mmap(f.fileno(), 2 * PAGESIZE)
+# f.close()
+#
+# # sanity checks
+# assert m.find("foo") == PAGESIZE
+# assert len(m) == 2 * PAGESIZE
+# assert m[0] == '\0'
+# assert m[0:3] == '\0\0\0'
+#
+# # modify the file's content
+# m[0] = '3'
+# m[PAGESIZE+3:PAGESIZE+3+3] = 'bar'
+#
+# # check that the modification worked
+# assert m[0] == '3'
+# assert m[0:3] == '3\0\0'
+# assert m[PAGESIZE-1:PAGESIZE+7] == '\0foobar\0'
+#
+# m.flush()
+#
+# # test seeking around
+# m.seek(0,0)
+# assert m.tell() == 0
+# m.seek(42, 1)
+# assert m.tell() == 42
+# m.seek(0, 2)
+# assert m.tell() == len(m)
+#
+# py.test.raises(ValueError, m.seek, -1)
+# py.test.raises(ValueError, m.seek, 1, 2)
+# py.test.raises(ValueError, m.seek, -len(m) - 1, 2)
+#
+# # try resizing map
+# if not ("darwin" in sys.platform or _FREEBSD):
+# m.resize(512)
+#
+# assert len(m) == 512
+# py.test.raises(ValueError, m.seek, 513, 0)
+#
+# # check that the underlying file is truncated too
+# f = open(filename)
+# f.seek(0, 2)
+# assert f.tell() == 512
+# f.close()
+# assert m.size() == 512
+#
+# m.close()
+# del f, m
+#
+# # test access=ACCESS_READ
+# mapsize = 10
+# open(filename, "wb").write("a" * mapsize)
+# f = open(filename, "rb")
+# m = mmap(f.fileno(), mapsize, access=cmmap.ACCESS_READ)
+# assert m[:] == 'a' * mapsize
+# def f(m): m[:] = 'b' * mapsize
+# py.test.raises(TypeError, f, m)
+# def f(m): m[0] = 'b'
+# py.test.raises(TypeError, f, m)
+# def f(m): m.seek(0, 0); m.write("abc")
+# py.test.raises(TypeError, f, m)
+# def f(m): m.seek(0, 0); m.write_byte("d")
+# py.test.raises(TypeError, f, m)
+# if not ("darwin" in sys.platform or _FREEBSD):
+# py.test.raises(TypeError, m.resize, 2 * mapsize)
+# assert open(filename, "rb").read() == 'a' * mapsize
+#
+# # opening with size too big
+# f = open(filename, "r+b")
+# if not _MS_WINDOWS:
+# # this should work under windows
+# py.test.raises(ValueError, mmap, f.fileno(), mapsize + 1)
+# f.close()
+#
+# if _MS_WINDOWS:
+# # repair damage from the resizing test.
+# f = open(filename, 'r+b')
+# f.truncate(mapsize)
+# f.close()
+# del f, m
+#
+# # test access=ACCESS_WRITE"
+# f = open(filename, "r+b")
+# m = mmap(f.fileno(), mapsize, access=cmmap.ACCESS_WRITE)
+# m[:] = 'c'*mapsize
+# assert m[:] == 'c' * mapsize
+# m.flush()
+# m.close()
+# f.close()
+# f = open(filename, 'rb')
+# stuff = f.read()
+# f.close()
+# stuff == 'c' * mapsize
+# del f, m
+#
+# # test access=ACCESS_COPY
+# f = open(filename, "r+b")
+# m = mmap(f.fileno(), mapsize, access=cmmap.ACCESS_COPY)
+# m[:] = 'd' * mapsize
+# assert m[:] == 'd' * mapsize
+# m.flush()
+# assert open(filename, "rb").read() == 'c'*mapsize
+# if not ("darwin" in sys.platform or _FREEBSD):
+# py.test.raises(TypeError, m.resize, 2 * mapsize)
+# del f, m
+#
+# # test invalid access
+# f = open(filename, "r+b")
+# py.test.raises(ValueError, mmap, f.fileno(), mapsize, access=4)
+# f.close()
+# del f
+#
+# # test incompatible parameters
+# if _POSIX:
+# f = open(filename, "r+b")
+# py.test.raises(ValueError, mmap, f.fileno(), mapsize, flags=cmmap.MAP_PRIVATE,
+# prot=cmmap.PROT_READ, access=cmmap.ACCESS_WRITE)
+# f.close()
+# del f
+#
+# # bad file descriptor
+# py.test.raises(cmmap.error, mmap, -2, 4096)
+#
+# # do a tougher .find() test. SF bug 515943 pointed out that, in 2.2,
+# # searching for data with embedded \0 bytes didn't work.
+# f = open(filename, 'w+')
+# data = 'aabaac\x00deef\x00\x00aa\x00'
+# n = len(data)
+# f.write(data)
+# f.flush()
+# m = mmap(f.fileno(), n)
+# f.close()
+#
+# for start in range(n + 1):
+# for finish in range(start, n + 1):
+# sl = data[start:finish]
+# assert m.find(sl) == data.find(sl)
+# assert m.find(sl + 'x') == -1
+# m.close()
+# del f, m
+#
+# # test mapping of entire file by passing 0 for map length
+# f = open(filename, "w+")
+# f.write(2**16 * 'm')
+# f.close()
+# f = open(filename, "rb+")
+# m = mmap(f.fileno(), 0)
+# assert len(m) == 2**16
+# assert m.read(2**16), 2**16 * "m"
+# m.close()
+# f.close()
+# del f, m
+#
+# # make move works everywhere (64-bit format problem earlier)
+# f = open(filename, 'w+')
+# f.write("ABCDEabcde")
+# f.flush()
+# m = mmap(f.fileno(), 10)
+# m.move(5, 0, 5)
+# assert m[:] == "ABCDEABCDE"
+# m.close()
+# f.close()
+# del f, m
+#
More information about the Pypy-commit
mailing list