[pypy-svn] r29939 - in pypy/dist/pypy/module/fcntl: . test
rhymes at codespeak.net
rhymes at codespeak.net
Tue Jul 11 11:39:44 CEST 2006
Author: rhymes
Date: Tue Jul 11 11:39:39 2006
New Revision: 29939
Added:
pypy/dist/pypy/module/fcntl/
pypy/dist/pypy/module/fcntl/__init__.py
pypy/dist/pypy/module/fcntl/app_fcntl.py
pypy/dist/pypy/module/fcntl/interp_fcntl.py
pypy/dist/pypy/module/fcntl/test/
pypy/dist/pypy/module/fcntl/test/test_fcntl.py
Log:
add fcntl module. initial stuff
Added: pypy/dist/pypy/module/fcntl/__init__.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/module/fcntl/__init__.py Tue Jul 11 11:39:39 2006
@@ -0,0 +1,9 @@
+from pypy.interpreter.mixedmodule import MixedModule
+
+class Module(MixedModule):
+ interpleveldefs = {
+ }
+
+ appleveldefs = {
+ '_conv_descriptor': 'app_fcntl._conv_descriptor',
+ }
Added: pypy/dist/pypy/module/fcntl/app_fcntl.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/module/fcntl/app_fcntl.py Tue Jul 11 11:39:39 2006
@@ -0,0 +1,8 @@
+
+def _conv_descriptor(f):
+ if hasattr(f, "fileno"):
+ return f.fileno()
+ elif isinstance(f, (int, long)):
+ return f
+ else:
+ raise TypeError, "argument must be an int, or have a fileno() method."
Added: pypy/dist/pypy/module/fcntl/interp_fcntl.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/module/fcntl/interp_fcntl.py Tue Jul 11 11:39:39 2006
@@ -0,0 +1,54 @@
+from pypy.rpython.rctypes.tool import ctypes_platform
+from pypy.rpython.rctypes.tool.libc import libc
+import pypy.rpython.rctypes.implementation # this defines rctypes magic
+from pypy.rpython.rctypes.aerrno import geterrno
+from pypy.interpreter.error import OperationError
+from pypy.interpreter.baseobjspace import W_Root, ObjSpace
+from ctypes import *
+
+class CConfig:
+ _header_ = """
+ #include <fcntl.h>
+ """
+ flock = ctypes_platform.Struct("struct flock",
+ [('l_start', c_longlong), ('l_len', c_longlong),
+ ('l_pid', c_long), ('l_type', c_short),
+ ('l_whence', c_short)])
+
+# constants, look in fcntl.h and platform docs for the meaning
+# some constants are linux only so they will be correctly exposed outside
+# depending on the OS
+constant_names = ['LOCK_SH', 'LOCK_EX', 'LOCK_NB', 'LOCK_UN', 'F_DUPFD',
+ 'F_GETFD', 'F_SETFD', 'F_GETFL', 'F_SETFL', 'F_UNLCK', 'FD_CLOEXEC',
+ 'LOCK_MAND', 'LOCK_READ', 'LOCK_WRITE', 'LOCK_RW', 'F_GETSIG', 'F_SETSIG',
+ 'F_GETLK64', 'F_SETLK64', 'F_SETLKW64', 'F_GETLK', 'F_SETLK', 'F_SETLKW',
+ 'F_GETOWN', 'F_SETOWN', 'F_RDLCK', 'F_WRLCK', 'F_SETLEASE', 'F_GETLEASE',
+ 'F_NOTIFY', 'F_EXLCK', 'F_SHLCK', 'DN_ACCESS', 'DN_MODIFY', 'DN_CREATE',
+ 'DN_DELETE', 'DN_RENAME', 'DN_ATTRIB', 'DN_MULTISHOT', 'I_NREAD',
+ 'I_PUSH', 'I_POP', 'I_LOOK', 'I_FLUSH', 'I_SRDOPT', 'I_GRDOPT', 'I_STR',
+ 'I_SETSIG', 'I_GETSIG', 'I_FIND', 'I_LINK', 'I_UNLINK', 'I_PEEK',
+ 'I_FDINSERT', 'I_SENDFD', 'I_RECVFD', 'I_SWROPT', 'I_LIST', 'I_PLINK',
+ 'I_PUNLINK', 'I_FLUSHBAND', 'I_CKBAND', 'I_GETBAND', 'I_ATMARK',
+ 'I_SETCLTIME', 'I_GETCLTIME', 'I_CANPUT']
+for name in constant_names:
+ setattr(CConfig, name, ctypes_platform.DefinedConstantInteger(name))
+
+class cConfig:
+ pass
+
+cConfig.__dict__.update(ctypes_platform.configure(CConfig))
+cConfig.flock.__name__ = "_flock"
+
+_flock = cConfig.flock
+
+def _get_error_msg():
+ errno = geterrno()
+ return libc.strerror(errno)
+
+def _conv_descriptor(space, f):
+ w_conv_descriptor = _get_module_object(space, "_check_float")
+ space.call_function(w_conv_descriptor, space.wrap(f))
+
+
+
+
Added: pypy/dist/pypy/module/fcntl/test/test_fcntl.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/module/fcntl/test/test_fcntl.py Tue Jul 11 11:39:39 2006
@@ -0,0 +1,122 @@
+from py.test import raises, skip
+from pypy.conftest import gettestobjspace
+
+class AppTestFcntl:
+ def setup_class(cls):
+ space = gettestobjspace(usemodules=('fcntl',))
+ cls.space = space
+
+ def test_conv_descriptor(self):
+ import fcntl
+
+ f = open("/tmp/conv_descr", "w")
+
+ raises(TypeError, fcntl._conv_descriptor, "foo")
+ raises(TypeError, fcntl._conv_descriptor, 2.0)
+ import cStringIO
+ raises(TypeError, fcntl._conv_descriptor, cStringIO.StringIO())
+ res = fcntl._conv_descriptor(10)
+ res_1 = fcntl._conv_descriptor(f)
+ assert res == 10
+ assert res_1 == f.fileno()
+
+# def test_fcntl():
+# assert cfcntl.fcntl(f, 1, 0) == 0
+# assert cfcntl.fcntl(f, 2, "foo") == "foo"
+# py.test.raises(TypeError, cfcntl.fcntl, "foo")
+# py.test.raises(IOError, cfcntl.fcntl, -1, 1, 0)
+#
+# try:
+# os.O_LARGEFILE
+# except AttributeError:
+# start_len = "ll"
+# else:
+# start_len = "qq"
+#
+# if sys.platform in ('netbsd1', 'netbsd2', 'netbsd3',
+# 'Darwin1.2', 'darwin',
+# 'freebsd2', 'freebsd3', 'freebsd4', 'freebsd5',
+# 'freebsd6', 'freebsd7',
+# 'bsdos2', 'bsdos3', 'bsdos4',
+# 'openbsd', 'openbsd2', 'openbsd3'):
+# if struct.calcsize('l') == 8:
+# off_t = 'l'
+# pid_t = 'i'
+# else:
+# off_t = 'lxxxx'
+# pid_t = 'l'
+#
+# format = "%s%s%shh" % (off_t, off_t, pid_t)
+# lockdata = struct.pack(format, 0, 0, 0, cfcntl.F_WRLCK, 0)
+# else:
+# format = "hh%shh" % start_len
+# lockdata = struct.pack(format, cfcntl.F_WRLCK, 0, 0, 0, 0, 0)
+#
+# rv = cfcntl.fcntl(f.fileno(), cfcntl.F_SETLKW, lockdata)
+# assert rv == lockdata
+# assert cfcntl.fcntl(f, cfcntl.F_SETLKW, lockdata) == lockdata
+#
+# # test duplication of file descriptor
+# rv = cfcntl.fcntl(f, cfcntl.F_DUPFD)
+# assert rv > 2 # > (stdin, stdout, stderr) at least
+# assert cfcntl.fcntl(f, cfcntl.F_DUPFD) > rv
+# assert cfcntl.fcntl(f, cfcntl.F_DUPFD, 99) == 99
+#
+# # test descriptor flags
+# assert cfcntl.fcntl(f, cfcntl.F_GETFD) == 0
+# cfcntl.fcntl(f, cfcntl.F_SETFD, 1)
+# assert cfcntl.fcntl(f, cfcntl.F_GETFD, cfcntl.FD_CLOEXEC) == 1
+#
+# # test status flags
+# assert cfcntl.fcntl(f.fileno(), cfcntl.F_SETFL, os.O_NONBLOCK) == 0
+# assert cfcntl.fcntl(f.fileno(), cfcntl.F_SETFL, os.O_NDELAY) == 0
+# assert cfcntl.fcntl(f, cfcntl.F_SETFL, os.O_NONBLOCK) == 0
+# assert cfcntl.fcntl(f, cfcntl.F_SETFL, os.O_NDELAY) == 0
+#
+# if "linux" in sys.platform:
+# # test managing signals
+# assert cfcntl.fcntl(f, cfcntl.F_GETOWN) == 0
+# cfcntl.fcntl(f, cfcntl.F_SETOWN, 20)
+# assert cfcntl.fcntl(f, cfcntl.F_GETOWN) == 20
+# assert cfcntl.fcntl(f, cfcntl.F_GETSIG) == 0
+# cfcntl.fcntl(f, cfcntl.F_SETSIG, 20)
+# assert cfcntl.fcntl(f, cfcntl.F_GETSIG) == 20
+#
+# # test leases
+# assert cfcntl.fcntl(f, cfcntl.F_GETLEASE) == cfcntl.F_UNLCK
+# cfcntl.fcntl(f, cfcntl.F_SETLEASE, cfcntl.F_WRLCK)
+# assert cfcntl.fcntl(f, cfcntl.F_GETLEASE) == cfcntl.F_WRLCK
+# else:
+# # this tests should fail under BSD
+# # with "Inappropriate ioctl for device"
+# py.test.raises(IOError, cfcntl.fcntl, f, cfcntl.F_GETOWN)
+# py.test.raises(IOError, cfcntl.fcntl, f, cfcntl.F_SETOWN, 20)
+#
+#
+# def test_flock():
+# if "linux" in sys.platform:
+# cfcntl.flock(f, cfcntl.LOCK_SH)
+# # this is an error EWOULDBLOCK, man: The file is locked and the
+# # LOCK_NB flag was selected.
+# py.test.raises(IOError, cfcntl.flock, f, cfcntl.LOCK_NB)
+# py.test.raises(IOError, cfcntl.flock, f, 3)
+# py.test.raises(TypeError, cfcntl.flock, f, "foo")
+# cfcntl.flock(f, cfcntl.LOCK_UN)
+#
+# def test_lockf():
+# py.test.raises(TypeError, cfcntl.lockf, f, "foo")
+# py.test.raises(TypeError, cfcntl.lockf, f, cfcntl.LOCK_UN, "foo")
+# py.test.raises(ValueError, cfcntl.lockf, f, 0)
+#
+# def test_ioctl():
+# py.test.raises(TypeError, cfcntl.ioctl, "foo")
+# py.test.raises(TypeError, cfcntl.ioctl, 0, "foo")
+# py.test.raises(TypeError, cfcntl.ioctl, 0, termios.TIOCGPGRP, float(0))
+# py.test.raises(TypeError, cfcntl.ioctl, 0, termios.TIOCGPGRP, 1, "foo")
+#
+# buf = array.array('h', [0])
+# assert cfcntl.ioctl(0, termios.TIOCGPGRP, buf, True) == 0
+# buf = array.array('c', "a"*1025)
+# py.test.raises(ValueError, cfcntl.ioctl, 0, termios.TIOCGPGRP, buf, 0)
+# py.test.raises(ValueError, cfcntl.ioctl, 0, termios.TIOCGPGRP,
+# "a"*1025, 0)
More information about the Pypy-commit
mailing list