[pypy-svn] r29957 - in pypy/dist/pypy/module/fcntl: . test

rhymes at codespeak.net rhymes at codespeak.net
Tue Jul 11 19:55:28 CEST 2006


Author: rhymes
Date: Tue Jul 11 19:55:23 2006
New Revision: 29957

Modified:
   pypy/dist/pypy/module/fcntl/__init__.py
   pypy/dist/pypy/module/fcntl/interp_fcntl.py
   pypy/dist/pypy/module/fcntl/test/test_fcntl.py
Log:
implemented fcntl.fcntl() and exposed all the constants.


Modified: pypy/dist/pypy/module/fcntl/__init__.py
==============================================================================
--- pypy/dist/pypy/module/fcntl/__init__.py	(original)
+++ pypy/dist/pypy/module/fcntl/__init__.py	Tue Jul 11 19:55:23 2006
@@ -2,8 +2,16 @@
 
 class Module(MixedModule):
     interpleveldefs = {
+        'fcntl': 'interp_fcntl.fcntl'
     }
 
     appleveldefs = {
         '_conv_descriptor': 'app_fcntl._conv_descriptor',
     }
+    
+    def buildloaders(cls):
+        from pypy.module.fcntl import interp_fcntl
+        for constant, value in interp_fcntl.constants.iteritems():
+            Module.interpleveldefs[constant] = "space.wrap(%r)" % value
+        super(Module, cls).buildloaders()
+    buildloaders = classmethod(buildloaders)

Modified: pypy/dist/pypy/module/fcntl/interp_fcntl.py
==============================================================================
--- pypy/dist/pypy/module/fcntl/interp_fcntl.py	(original)
+++ pypy/dist/pypy/module/fcntl/interp_fcntl.py	Tue Jul 11 19:55:23 2006
@@ -18,6 +18,7 @@
 # constants, look in fcntl.h and platform docs for the meaning
 # some constants are linux only so they will be correctly exposed outside 
 # depending on the OS
+constants = {}
 constant_names = ['LOCK_SH', 'LOCK_EX', 'LOCK_NB', 'LOCK_UN', 'F_DUPFD',
     'F_GETFD', 'F_SETFD', 'F_GETFL', 'F_SETFL', 'F_UNLCK', 'FD_CLOEXEC',
     'LOCK_MAND', 'LOCK_READ', 'LOCK_WRITE', 'LOCK_RW', 'F_GETSIG', 'F_SETSIG', 
@@ -39,16 +40,70 @@
 cConfig.__dict__.update(ctypes_platform.configure(CConfig))
 cConfig.flock.__name__ = "_flock"
 
+# needed to export the constants outside. see __init__.py
+for name in constant_names:
+    value = getattr(cConfig, name)
+    if value is not None:
+        constants[name] = value
+
 _flock = cConfig.flock
+libc.strerror.restype = c_char_p
+libc.strerror.argtypes = [c_int]
+
+fcntl_int = libc['fcntl']
+fcntl_int.argtypes = [c_int, c_int, c_int]
+fcntl_int.restype = c_int
+
+fcntl_str = libc['fcntl']
+fcntl_str.argtypes = [c_int, c_int, c_char_p]
+fcntl_str.restype = c_int
 
 def _get_error_msg():
     errno = geterrno()
     return libc.strerror(errno)
 
-def _conv_descriptor(space, f):
-    w_conv_descriptor = _get_module_object(space, "_check_float")
-    space.call_function(w_conv_descriptor, space.wrap(f))
-
-
-
-
+def _get_module_object(space, obj_name):
+    w_module = space.getbuiltinmodule('fcntl')
+    w_obj = space.getattr(w_module, space.wrap(obj_name))
+    return w_obj
+
+def _conv_descriptor(space, w_f):
+    w_conv_descriptor = _get_module_object(space, "_conv_descriptor")
+    w_fd = space.call_function(w_conv_descriptor, w_f)
+    return space.int_w(w_fd)
+
+def fcntl(space, w_fd, op, w_arg=0):
+    """fcntl(fd, op, [arg])
+
+    Perform the requested operation on file descriptor fd.  The operation
+    is defined by op and is operating system dependent.  These constants are
+    available from the fcntl module.  The argument arg is optional, and
+    defaults to 0; it may be an int or a string. If arg is given as a string,
+    the return value of fcntl is a string of that length, containing the
+    resulting value put in the arg buffer by the operating system. The length
+    of the arg string is not allowed to exceed 1024 bytes. If the arg given
+    is an integer or if none is specified, the result value is an integer
+    corresponding to the return value of the fcntl call in the C code."""
+
+    fd = _conv_descriptor(space, w_fd)
+    
+    if space.is_w(space.type(w_arg), space.w_int):
+        rv = fcntl_int(fd, op, space.int_w(w_arg))
+        if rv < 0:
+            raise OperationError(space.w_IOError,
+                space.wrap(_get_error_msg()))
+        return space.wrap(rv)
+    elif space.is_w(space.type(w_arg), space.w_str):
+        arg = space.str_w(w_arg)
+        if len(arg) > 1024:
+            raise OperationError(space.w_ValueError,
+                space.wrap("fcntl string arg too long"))
+        rv = fcntl_str(fd, op, arg)
+        if rv < 0:
+            raise OperationError(space.w_IOError,
+                space.wrap(_get_error_msg()))
+        return space.wrap(arg)
+    else:
+        raise OperationError(space.w_TypeError,
+            space.wrap("int or string required"))
+fcntl.unwrap_spec = [ObjSpace, W_Root, int, W_Root]

Modified: pypy/dist/pypy/module/fcntl/test/test_fcntl.py
==============================================================================
--- pypy/dist/pypy/module/fcntl/test/test_fcntl.py	(original)
+++ pypy/dist/pypy/module/fcntl/test/test_fcntl.py	Tue Jul 11 19:55:23 2006
@@ -20,77 +20,87 @@
         assert res == 10
         assert res_1 == f.fileno()
 
-# def test_fcntl():
-#     assert cfcntl.fcntl(f, 1, 0) == 0
-#     assert cfcntl.fcntl(f, 2, "foo") == "foo"
-#     py.test.raises(TypeError, cfcntl.fcntl, "foo")
-#     py.test.raises(IOError, cfcntl.fcntl, -1, 1, 0)
-# 
-#     try:
-#         os.O_LARGEFILE
-#     except AttributeError:
-#         start_len = "ll"
-#     else:
-#         start_len = "qq"
-# 
-#     if sys.platform in ('netbsd1', 'netbsd2', 'netbsd3', 
-#                         'Darwin1.2', 'darwin',
-#                         'freebsd2', 'freebsd3', 'freebsd4', 'freebsd5',
-#                         'freebsd6', 'freebsd7', 
-#                         'bsdos2', 'bsdos3', 'bsdos4',
-#                         'openbsd', 'openbsd2', 'openbsd3'):
-#         if struct.calcsize('l') == 8:
-#             off_t = 'l'
-#             pid_t = 'i'
-#         else:
-#             off_t = 'lxxxx'
-#             pid_t = 'l'
-# 
-#         format = "%s%s%shh" % (off_t, off_t, pid_t)
-#         lockdata = struct.pack(format, 0, 0, 0, cfcntl.F_WRLCK, 0)
-#     else:
-#         format = "hh%shh" % start_len
-#         lockdata = struct.pack(format, cfcntl.F_WRLCK, 0, 0, 0, 0, 0)
-# 
-#     rv = cfcntl.fcntl(f.fileno(), cfcntl.F_SETLKW, lockdata)
-#     assert rv == lockdata
-#     assert cfcntl.fcntl(f, cfcntl.F_SETLKW, lockdata) == lockdata
-# 
-#     # test duplication of file descriptor
-#     rv = cfcntl.fcntl(f, cfcntl.F_DUPFD)
-#     assert rv > 2 # > (stdin, stdout, stderr) at least
-#     assert cfcntl.fcntl(f, cfcntl.F_DUPFD) > rv
-#     assert cfcntl.fcntl(f, cfcntl.F_DUPFD, 99) == 99
-# 
-#     # test descriptor flags
-#     assert cfcntl.fcntl(f, cfcntl.F_GETFD) == 0
-#     cfcntl.fcntl(f, cfcntl.F_SETFD, 1)
-#     assert cfcntl.fcntl(f, cfcntl.F_GETFD, cfcntl.FD_CLOEXEC) == 1
-# 
-#     # test status flags
-#     assert cfcntl.fcntl(f.fileno(), cfcntl.F_SETFL, os.O_NONBLOCK) == 0
-#     assert cfcntl.fcntl(f.fileno(), cfcntl.F_SETFL, os.O_NDELAY) == 0
-#     assert cfcntl.fcntl(f, cfcntl.F_SETFL, os.O_NONBLOCK) == 0
-#     assert cfcntl.fcntl(f, cfcntl.F_SETFL, os.O_NDELAY) == 0
+    def test_fcntl(self):
+        import fcntl
+        import os
+        import sys
+        import struct
+        
+        f = open("/tmp/conv_descr", "w")
+        
+        fcntl.fcntl(f, 1, 0)
+        fcntl.fcntl(f, 1)
+        raises(TypeError, fcntl.fcntl, "foo")
+        raises(TypeError, fcntl.fcntl, f, "foo")
+        raises(IOError, fcntl.fcntl, -1, 1, 0)
+        assert fcntl.fcntl(f, 1, 0) == 0
+        assert fcntl.fcntl(f, 2, "foo") == "foo"
+        
+        try:
+            os.O_LARGEFILE
+        except AttributeError:
+            start_len = "ll"
+        else:
+            start_len = "qq"
+
+        if sys.platform in ('netbsd1', 'netbsd2', 'netbsd3', 
+                            'Darwin1.2', 'darwin',
+                            'freebsd2', 'freebsd3', 'freebsd4', 'freebsd5',
+                            'freebsd6', 'freebsd7', 
+                            'bsdos2', 'bsdos3', 'bsdos4',
+                            'openbsd', 'openbsd2', 'openbsd3'):
+            if struct.calcsize('l') == 8:
+                off_t = 'l'
+                pid_t = 'i'
+            else:
+                off_t = 'lxxxx'
+                pid_t = 'l'
+
+            format = "%s%s%shh" % (off_t, off_t, pid_t)
+            lockdata = struct.pack(format, 0, 0, 0, fcntl.F_WRLCK, 0)
+        else:
+            format = "hh%shh" % start_len
+            lockdata = struct.pack(format, fcntl.F_WRLCK, 0, 0, 0, 0, 0)
+
+        rv = fcntl.fcntl(f.fileno(), fcntl.F_SETLKW, lockdata)
+        assert rv == lockdata
+        assert fcntl.fcntl(f, fcntl.F_SETLKW, lockdata) == lockdata
+
+        # test duplication of file descriptor
+        rv = fcntl.fcntl(f, fcntl.F_DUPFD)
+        assert rv > 2 # > (stdin, stdout, stderr) at least
+        assert fcntl.fcntl(f, fcntl.F_DUPFD) > rv
+        assert fcntl.fcntl(f, fcntl.F_DUPFD, 99) == 99
+
+        # test descriptor flags
+        assert fcntl.fcntl(f, fcntl.F_GETFD) == 0
+        fcntl.fcntl(f, fcntl.F_SETFD, 1)
+        assert fcntl.fcntl(f, fcntl.F_GETFD, fcntl.FD_CLOEXEC) == 1
+
+        # test status flags
+        assert fcntl.fcntl(f.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) == 0
+        assert fcntl.fcntl(f.fileno(), fcntl.F_SETFL, os.O_NDELAY) == 0
+        assert fcntl.fcntl(f, fcntl.F_SETFL, os.O_NONBLOCK) == 0
+        assert fcntl.fcntl(f, fcntl.F_SETFL, os.O_NDELAY) == 0
 # 
 #     if "linux" in sys.platform:
 #         # test managing signals
-#         assert cfcntl.fcntl(f, cfcntl.F_GETOWN) == 0
-#         cfcntl.fcntl(f, cfcntl.F_SETOWN, 20)
-#         assert cfcntl.fcntl(f, cfcntl.F_GETOWN) == 20
-#         assert cfcntl.fcntl(f, cfcntl.F_GETSIG) == 0
-#         cfcntl.fcntl(f, cfcntl.F_SETSIG, 20)
-#         assert cfcntl.fcntl(f, cfcntl.F_GETSIG) == 20
+#         assert fcntl.fcntl(f, fcntl.F_GETOWN) == 0
+#         fcntl.fcntl(f, fcntl.F_SETOWN, 20)
+#         assert fcntl.fcntl(f, fcntl.F_GETOWN) == 20
+#         assert fcntl.fcntl(f, fcntl.F_GETSIG) == 0
+#         fcntl.fcntl(f, fcntl.F_SETSIG, 20)
+#         assert fcntl.fcntl(f, fcntl.F_GETSIG) == 20
 # 
 #         # test leases
-#         assert cfcntl.fcntl(f, cfcntl.F_GETLEASE) == cfcntl.F_UNLCK
-#         cfcntl.fcntl(f, cfcntl.F_SETLEASE, cfcntl.F_WRLCK)
-#         assert cfcntl.fcntl(f, cfcntl.F_GETLEASE) == cfcntl.F_WRLCK
+#         assert fcntl.fcntl(f, fcntl.F_GETLEASE) == fcntl.F_UNLCK
+#         fcntl.fcntl(f, fcntl.F_SETLEASE, fcntl.F_WRLCK)
+#         assert fcntl.fcntl(f, fcntl.F_GETLEASE) == fcntl.F_WRLCK
 #     else:
 #         # this tests should fail under BSD
 #         # with "Inappropriate ioctl for device"
-#         py.test.raises(IOError, cfcntl.fcntl, f, cfcntl.F_GETOWN)
-#         py.test.raises(IOError, cfcntl.fcntl, f, cfcntl.F_SETOWN, 20)
+#         py.test.raises(IOError, fcntl.fcntl, f, fcntl.F_GETOWN)
+#         py.test.raises(IOError, fcntl.fcntl, f, fcntl.F_SETOWN, 20)
 # 
 # 
 # def test_flock():



More information about the Pypy-commit mailing list