[pypy-svn] r29957 - in pypy/dist/pypy/module/fcntl: . test
rhymes at codespeak.net
rhymes at codespeak.net
Tue Jul 11 19:55:28 CEST 2006
Author: rhymes
Date: Tue Jul 11 19:55:23 2006
New Revision: 29957
Modified:
pypy/dist/pypy/module/fcntl/__init__.py
pypy/dist/pypy/module/fcntl/interp_fcntl.py
pypy/dist/pypy/module/fcntl/test/test_fcntl.py
Log:
implemented fcntl.fcntl() and exposed all the constants.
Modified: pypy/dist/pypy/module/fcntl/__init__.py
==============================================================================
--- pypy/dist/pypy/module/fcntl/__init__.py (original)
+++ pypy/dist/pypy/module/fcntl/__init__.py Tue Jul 11 19:55:23 2006
@@ -2,8 +2,16 @@
class Module(MixedModule):
interpleveldefs = {
+ 'fcntl': 'interp_fcntl.fcntl'
}
appleveldefs = {
'_conv_descriptor': 'app_fcntl._conv_descriptor',
}
+
+ def buildloaders(cls):
+ from pypy.module.fcntl import interp_fcntl
+ for constant, value in interp_fcntl.constants.iteritems():
+ Module.interpleveldefs[constant] = "space.wrap(%r)" % value
+ super(Module, cls).buildloaders()
+ buildloaders = classmethod(buildloaders)
Modified: pypy/dist/pypy/module/fcntl/interp_fcntl.py
==============================================================================
--- pypy/dist/pypy/module/fcntl/interp_fcntl.py (original)
+++ pypy/dist/pypy/module/fcntl/interp_fcntl.py Tue Jul 11 19:55:23 2006
@@ -18,6 +18,7 @@
# constants, look in fcntl.h and platform docs for the meaning
# some constants are linux only so they will be correctly exposed outside
# depending on the OS
+constants = {}
constant_names = ['LOCK_SH', 'LOCK_EX', 'LOCK_NB', 'LOCK_UN', 'F_DUPFD',
'F_GETFD', 'F_SETFD', 'F_GETFL', 'F_SETFL', 'F_UNLCK', 'FD_CLOEXEC',
'LOCK_MAND', 'LOCK_READ', 'LOCK_WRITE', 'LOCK_RW', 'F_GETSIG', 'F_SETSIG',
@@ -39,16 +40,70 @@
cConfig.__dict__.update(ctypes_platform.configure(CConfig))
cConfig.flock.__name__ = "_flock"
+# needed to export the constants outside. see __init__.py
+for name in constant_names:
+ value = getattr(cConfig, name)
+ if value is not None:
+ constants[name] = value
+
_flock = cConfig.flock
+libc.strerror.restype = c_char_p
+libc.strerror.argtypes = [c_int]
+
+fcntl_int = libc['fcntl']
+fcntl_int.argtypes = [c_int, c_int, c_int]
+fcntl_int.restype = c_int
+
+fcntl_str = libc['fcntl']
+fcntl_str.argtypes = [c_int, c_int, c_char_p]
+fcntl_str.restype = c_int
def _get_error_msg():
errno = geterrno()
return libc.strerror(errno)
-def _conv_descriptor(space, f):
- w_conv_descriptor = _get_module_object(space, "_check_float")
- space.call_function(w_conv_descriptor, space.wrap(f))
-
-
-
-
+def _get_module_object(space, obj_name):
+ w_module = space.getbuiltinmodule('fcntl')
+ w_obj = space.getattr(w_module, space.wrap(obj_name))
+ return w_obj
+
+def _conv_descriptor(space, w_f):
+ w_conv_descriptor = _get_module_object(space, "_conv_descriptor")
+ w_fd = space.call_function(w_conv_descriptor, w_f)
+ return space.int_w(w_fd)
+
+def fcntl(space, w_fd, op, w_arg=0):
+ """fcntl(fd, op, [arg])
+
+ Perform the requested operation on file descriptor fd. The operation
+ is defined by op and is operating system dependent. These constants are
+ available from the fcntl module. The argument arg is optional, and
+ defaults to 0; it may be an int or a string. If arg is given as a string,
+ the return value of fcntl is a string of that length, containing the
+ resulting value put in the arg buffer by the operating system. The length
+ of the arg string is not allowed to exceed 1024 bytes. If the arg given
+ is an integer or if none is specified, the result value is an integer
+ corresponding to the return value of the fcntl call in the C code."""
+
+ fd = _conv_descriptor(space, w_fd)
+
+ if space.is_w(space.type(w_arg), space.w_int):
+ rv = fcntl_int(fd, op, space.int_w(w_arg))
+ if rv < 0:
+ raise OperationError(space.w_IOError,
+ space.wrap(_get_error_msg()))
+ return space.wrap(rv)
+ elif space.is_w(space.type(w_arg), space.w_str):
+ arg = space.str_w(w_arg)
+ if len(arg) > 1024:
+ raise OperationError(space.w_ValueError,
+ space.wrap("fcntl string arg too long"))
+ rv = fcntl_str(fd, op, arg)
+ if rv < 0:
+ raise OperationError(space.w_IOError,
+ space.wrap(_get_error_msg()))
+ return space.wrap(arg)
+ else:
+ raise OperationError(space.w_TypeError,
+ space.wrap("int or string required"))
+fcntl.unwrap_spec = [ObjSpace, W_Root, int, W_Root]
Modified: pypy/dist/pypy/module/fcntl/test/test_fcntl.py
==============================================================================
--- pypy/dist/pypy/module/fcntl/test/test_fcntl.py (original)
+++ pypy/dist/pypy/module/fcntl/test/test_fcntl.py Tue Jul 11 19:55:23 2006
@@ -20,77 +20,87 @@
assert res == 10
assert res_1 == f.fileno()
-# def test_fcntl():
-# assert cfcntl.fcntl(f, 1, 0) == 0
-# assert cfcntl.fcntl(f, 2, "foo") == "foo"
-# py.test.raises(TypeError, cfcntl.fcntl, "foo")
-# py.test.raises(IOError, cfcntl.fcntl, -1, 1, 0)
-#
-# try:
-# os.O_LARGEFILE
-# except AttributeError:
-# start_len = "ll"
-# else:
-# start_len = "qq"
-#
-# if sys.platform in ('netbsd1', 'netbsd2', 'netbsd3',
-# 'Darwin1.2', 'darwin',
-# 'freebsd2', 'freebsd3', 'freebsd4', 'freebsd5',
-# 'freebsd6', 'freebsd7',
-# 'bsdos2', 'bsdos3', 'bsdos4',
-# 'openbsd', 'openbsd2', 'openbsd3'):
-# if struct.calcsize('l') == 8:
-# off_t = 'l'
-# pid_t = 'i'
-# else:
-# off_t = 'lxxxx'
-# pid_t = 'l'
-#
-# format = "%s%s%shh" % (off_t, off_t, pid_t)
-# lockdata = struct.pack(format, 0, 0, 0, cfcntl.F_WRLCK, 0)
-# else:
-# format = "hh%shh" % start_len
-# lockdata = struct.pack(format, cfcntl.F_WRLCK, 0, 0, 0, 0, 0)
-#
-# rv = cfcntl.fcntl(f.fileno(), cfcntl.F_SETLKW, lockdata)
-# assert rv == lockdata
-# assert cfcntl.fcntl(f, cfcntl.F_SETLKW, lockdata) == lockdata
-#
-# # test duplication of file descriptor
-# rv = cfcntl.fcntl(f, cfcntl.F_DUPFD)
-# assert rv > 2 # > (stdin, stdout, stderr) at least
-# assert cfcntl.fcntl(f, cfcntl.F_DUPFD) > rv
-# assert cfcntl.fcntl(f, cfcntl.F_DUPFD, 99) == 99
-#
-# # test descriptor flags
-# assert cfcntl.fcntl(f, cfcntl.F_GETFD) == 0
-# cfcntl.fcntl(f, cfcntl.F_SETFD, 1)
-# assert cfcntl.fcntl(f, cfcntl.F_GETFD, cfcntl.FD_CLOEXEC) == 1
-#
-# # test status flags
-# assert cfcntl.fcntl(f.fileno(), cfcntl.F_SETFL, os.O_NONBLOCK) == 0
-# assert cfcntl.fcntl(f.fileno(), cfcntl.F_SETFL, os.O_NDELAY) == 0
-# assert cfcntl.fcntl(f, cfcntl.F_SETFL, os.O_NONBLOCK) == 0
-# assert cfcntl.fcntl(f, cfcntl.F_SETFL, os.O_NDELAY) == 0
+ def test_fcntl(self):
+ import fcntl
+ import os
+ import sys
+ import struct
+
+ f = open("/tmp/conv_descr", "w")
+
+ fcntl.fcntl(f, 1, 0)
+ fcntl.fcntl(f, 1)
+ raises(TypeError, fcntl.fcntl, "foo")
+ raises(TypeError, fcntl.fcntl, f, "foo")
+ raises(IOError, fcntl.fcntl, -1, 1, 0)
+ assert fcntl.fcntl(f, 1, 0) == 0
+ assert fcntl.fcntl(f, 2, "foo") == "foo"
+
+ try:
+ os.O_LARGEFILE
+ except AttributeError:
+ start_len = "ll"
+ else:
+ start_len = "qq"
+
+ if sys.platform in ('netbsd1', 'netbsd2', 'netbsd3',
+ 'Darwin1.2', 'darwin',
+ 'freebsd2', 'freebsd3', 'freebsd4', 'freebsd5',
+ 'freebsd6', 'freebsd7',
+ 'bsdos2', 'bsdos3', 'bsdos4',
+ 'openbsd', 'openbsd2', 'openbsd3'):
+ if struct.calcsize('l') == 8:
+ off_t = 'l'
+ pid_t = 'i'
+ else:
+ off_t = 'lxxxx'
+ pid_t = 'l'
+
+ format = "%s%s%shh" % (off_t, off_t, pid_t)
+ lockdata = struct.pack(format, 0, 0, 0, fcntl.F_WRLCK, 0)
+ else:
+ format = "hh%shh" % start_len
+ lockdata = struct.pack(format, fcntl.F_WRLCK, 0, 0, 0, 0, 0)
+
+ rv = fcntl.fcntl(f.fileno(), fcntl.F_SETLKW, lockdata)
+ assert rv == lockdata
+ assert fcntl.fcntl(f, fcntl.F_SETLKW, lockdata) == lockdata
+
+ # test duplication of file descriptor
+ rv = fcntl.fcntl(f, fcntl.F_DUPFD)
+ assert rv > 2 # > (stdin, stdout, stderr) at least
+ assert fcntl.fcntl(f, fcntl.F_DUPFD) > rv
+ assert fcntl.fcntl(f, fcntl.F_DUPFD, 99) == 99
+
+ # test descriptor flags
+ assert fcntl.fcntl(f, fcntl.F_GETFD) == 0
+ fcntl.fcntl(f, fcntl.F_SETFD, 1)
+ assert fcntl.fcntl(f, fcntl.F_GETFD, fcntl.FD_CLOEXEC) == 1
+
+ # test status flags
+ assert fcntl.fcntl(f.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) == 0
+ assert fcntl.fcntl(f.fileno(), fcntl.F_SETFL, os.O_NDELAY) == 0
+ assert fcntl.fcntl(f, fcntl.F_SETFL, os.O_NONBLOCK) == 0
+ assert fcntl.fcntl(f, fcntl.F_SETFL, os.O_NDELAY) == 0
#
# if "linux" in sys.platform:
# # test managing signals
-# assert cfcntl.fcntl(f, cfcntl.F_GETOWN) == 0
-# cfcntl.fcntl(f, cfcntl.F_SETOWN, 20)
-# assert cfcntl.fcntl(f, cfcntl.F_GETOWN) == 20
-# assert cfcntl.fcntl(f, cfcntl.F_GETSIG) == 0
-# cfcntl.fcntl(f, cfcntl.F_SETSIG, 20)
-# assert cfcntl.fcntl(f, cfcntl.F_GETSIG) == 20
+# assert fcntl.fcntl(f, fcntl.F_GETOWN) == 0
+# fcntl.fcntl(f, fcntl.F_SETOWN, 20)
+# assert fcntl.fcntl(f, fcntl.F_GETOWN) == 20
+# assert fcntl.fcntl(f, fcntl.F_GETSIG) == 0
+# fcntl.fcntl(f, fcntl.F_SETSIG, 20)
+# assert fcntl.fcntl(f, fcntl.F_GETSIG) == 20
#
# # test leases
-# assert cfcntl.fcntl(f, cfcntl.F_GETLEASE) == cfcntl.F_UNLCK
-# cfcntl.fcntl(f, cfcntl.F_SETLEASE, cfcntl.F_WRLCK)
-# assert cfcntl.fcntl(f, cfcntl.F_GETLEASE) == cfcntl.F_WRLCK
+# assert fcntl.fcntl(f, fcntl.F_GETLEASE) == fcntl.F_UNLCK
+# fcntl.fcntl(f, fcntl.F_SETLEASE, fcntl.F_WRLCK)
+# assert fcntl.fcntl(f, fcntl.F_GETLEASE) == fcntl.F_WRLCK
# else:
# # this tests should fail under BSD
# # with "Inappropriate ioctl for device"
-# py.test.raises(IOError, cfcntl.fcntl, f, cfcntl.F_GETOWN)
-# py.test.raises(IOError, cfcntl.fcntl, f, cfcntl.F_SETOWN, 20)
+# py.test.raises(IOError, fcntl.fcntl, f, fcntl.F_GETOWN)
+# py.test.raises(IOError, fcntl.fcntl, f, fcntl.F_SETOWN, 20)
#
#
# def test_flock():
More information about the Pypy-commit
mailing list