[pypy-svn] r29959 - in pypy/dist/pypy/module/fcntl: . test

rhymes at codespeak.net rhymes at codespeak.net
Tue Jul 11 20:40:23 CEST 2006


Author: rhymes
Date: Tue Jul 11 20:40:19 2006
New Revision: 29959

Modified:
   pypy/dist/pypy/module/fcntl/__init__.py
   pypy/dist/pypy/module/fcntl/interp_fcntl.py
   pypy/dist/pypy/module/fcntl/test/test_fcntl.py
Log:
implemented fcntl.flock(). still compiles...


Modified: pypy/dist/pypy/module/fcntl/__init__.py
==============================================================================
--- pypy/dist/pypy/module/fcntl/__init__.py	(original)
+++ pypy/dist/pypy/module/fcntl/__init__.py	Tue Jul 11 20:40:19 2006
@@ -2,7 +2,8 @@
 
 class Module(MixedModule):
     interpleveldefs = {
-        'fcntl': 'interp_fcntl.fcntl'
+        'fcntl': 'interp_fcntl.fcntl',
+        'flock': 'interp_fcntl.flock'
     }
 
     appleveldefs = {

Modified: pypy/dist/pypy/module/fcntl/interp_fcntl.py
==============================================================================
--- pypy/dist/pypy/module/fcntl/interp_fcntl.py	(original)
+++ pypy/dist/pypy/module/fcntl/interp_fcntl.py	Tue Jul 11 20:40:19 2006
@@ -65,6 +65,16 @@
 fcntl_str.argtypes = [c_int, c_int, c_char_p]
 fcntl_str.restype = c_int
 
+fcntl_flock = libc['fcntl']
+fcntl_flock.argtypes = [c_int, c_int, POINTER(_flock)]
+fcntl_flock.restype = c_int
+
+has_flock = False
+if hasattr(libc, "flock"):
+    libc.flock.argtypes = [c_int, c_int]
+    libc.flock.restype = c_int
+    has_flock = True
+
 def _get_error_msg():
     errno = geterrno()
     return libc.strerror(errno)
@@ -79,6 +89,20 @@
     w_fd = space.call_function(w_conv_descriptor, w_f)
     return space.int_w(w_fd)
 
+def _check_flock_op(space, op):
+    l = _flock()
+
+    if op == LOCK_UN:
+        l.l_type = F_UNLCK
+    elif op & LOCK_SH:
+        l.l_type = F_RDLCK
+    elif op & LOCK_EX:
+        l.l_type = F_WRLCK
+    else:
+        raise OperationError(space.w_ValueError,
+            space.wrap("unrecognized flock argument"))
+    return l
+
 def fcntl(space, w_fd, op, w_arg=0):
     """fcntl(fd, op, [arg])
 
@@ -114,3 +138,26 @@
         raise OperationError(space.w_TypeError,
             space.wrap("int or string required"))
 fcntl.unwrap_spec = [ObjSpace, W_Root, int, W_Root]
+
+
+def flock(space, w_fd, op):
+    """flock(fd, operation)
+
+    Perform the lock operation op on file descriptor fd.  See the Unix
+    manual flock(3) for details.  (On some systems, this function is
+    emulated using fcntl().)"""
+
+    fd = _conv_descriptor(space, w_fd)
+
+    if has_flock:
+        rv = libc.flock(fd, op)
+        if rv < 0:
+            raise OperationError(space.w_IOError,
+                space.wrap(_get_error_msg()))
+    else:
+        l = _check_flock_op(space, op)
+        l.l_whence = l.l_start = l.l_len = 0
+        op = (F_SETLKW, F_SETLK)[op & LOCK_NB]
+        
+        rv = _call_func("fcntl", fd, op, byref(l))
+flock.unwrap_spec = [ObjSpace, W_Root, int]

Modified: pypy/dist/pypy/module/fcntl/test/test_fcntl.py
==============================================================================
--- pypy/dist/pypy/module/fcntl/test/test_fcntl.py	(original)
+++ pypy/dist/pypy/module/fcntl/test/test_fcntl.py	Tue Jul 11 20:40:19 2006
@@ -26,7 +26,7 @@
         import sys
         import struct
         
-        f = open("/tmp/conv_descr", "w")
+        f = open("/tmp/fcntl", "w")
         
         fcntl.fcntl(f, 1, 0)
         fcntl.fcntl(f, 1)
@@ -101,17 +101,24 @@
             # with "Inappropriate ioctl for device"
             raises(IOError, fcntl.fcntl, f, fcntl.F_GETOWN)
             raises(IOError, fcntl.fcntl, f, fcntl.F_SETOWN, 20)
+        
+        f.close()
 
-
-# def test_flock():
-#     if "linux" in sys.platform:
-#         cfcntl.flock(f, cfcntl.LOCK_SH)
-#         # this is an error EWOULDBLOCK, man: The file is locked and the
-#         # LOCK_NB flag was selected.
-#         py.test.raises(IOError, cfcntl.flock, f, cfcntl.LOCK_NB)
-#         py.test.raises(IOError, cfcntl.flock, f, 3)
-#     py.test.raises(TypeError, cfcntl.flock, f, "foo")
-#     cfcntl.flock(f, cfcntl.LOCK_UN)
+    def test_flock(self):
+        import fcntl
+        import sys
+        
+        f = open("/tmp/flock", "w")
+        
+        raises(TypeError, fcntl.flock, "foo")
+        raises(TypeError, fcntl.flock, f, "foo")
+        fcntl.flock(f, fcntl.LOCK_SH)
+        # this is an error EWOULDBLOCK, man: The file is locked and the
+        # LOCK_NB flag was selected.
+        raises(IOError, fcntl.flock, f, fcntl.LOCK_NB)
+        fcntl.flock(f, fcntl.LOCK_UN)
+        
+        f.close()
 # 
 # def test_lockf():
 #     py.test.raises(TypeError, cfcntl.lockf, f, "foo")



More information about the Pypy-commit mailing list