[issue12981] rewrite multiprocessing (senfd|recvfd) in Python

Charles-François Natali report at bugs.python.org
Sat Sep 17 13:28:29 CEST 2011


Charles-François Natali <neologix at free.fr> added the comment:

Here's a patch taking into account the fact that
multiprocessing.reduction might not be available and importing it can
raise an ImportError (which is already the case with the C
implementation, but multiprocessing.reduction tests have been added
recently to test_multiprocessing), e.g. if the OS doesn't support FD
passing.
With this patch, the pure Python version can be applied, and passes on
Linux, FreeBSD, OS X, Windows and OpenSolaris (except that it's not
available on OpenSolaris until issue #12999 gets fixed).
I also slightly modified the struct format used in the pure Python
version to make sure the length is sent as a a native int ("@i")
instead of a standardized int ("=i"), which might break if sizeof(int)
!= 4 (not sure there are many ILP64 architectures out there, but you
never know...).

----------
Added file: http://bugs.python.org/file23179/skip_reduction.diff
Added file: http://bugs.python.org/file23180/multiprocessing_fd-2.diff

_______________________________________
Python tracker <report at bugs.python.org>
<http://bugs.python.org/issue12981>
_______________________________________
-------------- next part --------------
diff -r c6d52971dd2a Lib/test/test_multiprocessing.py
--- a/Lib/test/test_multiprocessing.py	Thu Sep 15 18:18:51 2011 +0200
+++ b/Lib/test/test_multiprocessing.py	Sat Sep 17 10:54:10 2011 +0200
@@ -35,7 +35,13 @@
 import multiprocessing.heap
 import multiprocessing.pool
 
-from multiprocessing import util, reduction
+from multiprocessing import util
+
+try:
+    from multiprocessing import reduction
+    HAS_REDUCTION = True
+except ImportError:
+    HAS_REDUCTION = False
 
 try:
     from multiprocessing.sharedctypes import Value, copy
@@ -1631,6 +1637,7 @@
         os.write(fd, data)
         os.close(fd)
 
+    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
     def test_fd_transfer(self):
         if self.TYPE != 'processes':
             self.skipTest("only makes sense with processes")
@@ -1648,6 +1655,7 @@
         with open(test.support.TESTFN, "rb") as f:
             self.assertEqual(f.read(), b"foo")
 
+    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
     @unittest.skipIf(sys.platform == "win32",
                      "test semantics don't make sense on Windows")
     @unittest.skipIf(MAXFD <= 256,
@@ -1987,10 +1995,12 @@
             'multiprocessing', 'multiprocessing.connection',
             'multiprocessing.heap', 'multiprocessing.managers',
             'multiprocessing.pool', 'multiprocessing.process',
-            'multiprocessing.reduction',
             'multiprocessing.synchronize', 'multiprocessing.util'
             ]
 
+        if HAS_REDUCTION:
+            modules.append('multiprocessing.reduction')
+
         if c_int is not None:
             # This module requires _ctypes
             modules.append('multiprocessing.sharedctypes')
-------------- next part --------------
diff -r c6d52971dd2a Lib/multiprocessing/reduction.py
--- a/Lib/multiprocessing/reduction.py	Thu Sep 15 18:18:51 2011 +0200
+++ b/Lib/multiprocessing/reduction.py	Fri Sep 16 19:44:51 2011 +0200
@@ -39,6 +39,7 @@
 import sys
 import socket
 import threading
+import struct
 
 import _multiprocessing
 from multiprocessing import current_process
@@ -51,7 +52,8 @@
 #
 #
 
-if not(sys.platform == 'win32' or hasattr(_multiprocessing, 'recvfd')):
+if not(sys.platform == 'win32' or (hasattr(socket, 'CMSG_LEN') and
+                                   hasattr(socket, 'SCM_RIGHTS'))):
     raise ImportError('pickling of connections not supported')
 
 #
@@ -77,10 +79,23 @@
 
 else:
     def send_handle(conn, handle, destination_pid):
-        _multiprocessing.sendfd(conn.fileno(), handle)
+        with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
+            s.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS,
+                                struct.pack("@i", handle))])
 
     def recv_handle(conn):
-        return _multiprocessing.recvfd(conn.fileno())
+        size = struct.calcsize("@i")
+        with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
+            msg, ancdata, flags, addr = s.recvmsg(1, socket.CMSG_SPACE(size))
+            try:
+                cmsg_level, cmsg_type, cmsg_data = ancdata[0]
+                if (cmsg_level == socket.SOL_SOCKET and
+                    cmsg_type == socket.SCM_RIGHTS):
+                    return struct.unpack("@i", cmsg_data[:size])[0]
+            except (ValueError, IndexError, struct.error):
+                pass
+            raise RuntimeError('Invalid data received')
+
 
 #
 # Support for a per-process server thread which caches pickled handles
diff -r c6d52971dd2a Modules/_multiprocessing/multiprocessing.c
--- a/Modules/_multiprocessing/multiprocessing.c	Thu Sep 15 18:18:51 2011 +0200
+++ b/Modules/_multiprocessing/multiprocessing.c	Fri Sep 16 19:44:51 2011 +0200
@@ -8,11 +8,6 @@
 
 #include "multiprocessing.h"
 
-#ifdef SCM_RIGHTS
-    #define HAVE_FD_TRANSFER 1
-#else
-    #define HAVE_FD_TRANSFER 0
-#endif
 
 PyObject *create_win32_namespace(void);
 
@@ -75,104 +70,7 @@
     return FALSE;
 }
 
-/*
- * Unix only
- */
-
-#else /* !MS_WINDOWS */
-
-#if HAVE_FD_TRANSFER
-
-/* Functions for transferring file descriptors between processes.
-   Reimplements some of the functionality of the fdcred
-   module at http://www.mca-ltd.com/resources/fdcred_1.tgz. */
-/* Based in http://resin.csoft.net/cgi-bin/man.cgi?section=3&topic=CMSG_DATA */
-
-static PyObject *
-multiprocessing_sendfd(PyObject *self, PyObject *args)
-{
-    int conn, fd, res;
-    struct iovec dummy_iov;
-    char dummy_char;
-    struct msghdr msg;
-    struct cmsghdr *cmsg;
-    union {
-        struct cmsghdr hdr;
-        unsigned char buf[CMSG_SPACE(sizeof(int))];
-    } cmsgbuf;
-
-    if (!PyArg_ParseTuple(args, "ii", &conn, &fd))
-        return NULL;
-
-    dummy_iov.iov_base = &dummy_char;
-    dummy_iov.iov_len = 1;
-
-    memset(&msg, 0, sizeof(msg));
-    msg.msg_control = &cmsgbuf.buf;
-    msg.msg_controllen = sizeof(cmsgbuf.buf);
-    msg.msg_iov = &dummy_iov;
-    msg.msg_iovlen = 1;
-
-    cmsg = CMSG_FIRSTHDR(&msg);
-    cmsg->cmsg_len = CMSG_LEN(sizeof(int));
-    cmsg->cmsg_level = SOL_SOCKET;
-    cmsg->cmsg_type = SCM_RIGHTS;
-    * (int *) CMSG_DATA(cmsg) = fd;
-
-    Py_BEGIN_ALLOW_THREADS
-    res = sendmsg(conn, &msg, 0);
-    Py_END_ALLOW_THREADS
-
-    if (res < 0)
-        return PyErr_SetFromErrno(PyExc_OSError);
-    Py_RETURN_NONE;
-}
-
-static PyObject *
-multiprocessing_recvfd(PyObject *self, PyObject *args)
-{
-    int conn, fd, res;
-    char dummy_char;
-    struct iovec dummy_iov;
-    struct msghdr msg = {0};
-    struct cmsghdr *cmsg;
-    union {
-        struct cmsghdr hdr;
-        unsigned char buf[CMSG_SPACE(sizeof(int))];
-    } cmsgbuf;
-
-    if (!PyArg_ParseTuple(args, "i", &conn))
-        return NULL;
-
-    dummy_iov.iov_base = &dummy_char;
-    dummy_iov.iov_len = 1;
-
-    memset(&msg, 0, sizeof(msg));
-    msg.msg_control = &cmsgbuf.buf;
-    msg.msg_controllen = sizeof(cmsgbuf.buf);
-    msg.msg_iov = &dummy_iov;
-    msg.msg_iovlen = 1;
-
-    cmsg = CMSG_FIRSTHDR(&msg);
-    cmsg->cmsg_level = SOL_SOCKET;
-    cmsg->cmsg_type = SCM_RIGHTS;
-    cmsg->cmsg_len = CMSG_LEN(sizeof(int));
-    msg.msg_controllen = cmsg->cmsg_len;
-
-    Py_BEGIN_ALLOW_THREADS
-    res = recvmsg(conn, &msg, 0);
-    Py_END_ALLOW_THREADS
-
-    if (res < 0)
-        return PyErr_SetFromErrno(PyExc_OSError);
-
-    fd = * (int *) CMSG_DATA(cmsg);
-    return Py_BuildValue("i", fd);
-}
-
-#endif /* HAVE_FD_TRANSFER */
-
-#endif /* !MS_WINDOWS */
+#endif /* MS_WINDOWS */
 
 
 /*
@@ -201,16 +99,6 @@
     {"address_of_buffer", multiprocessing_address_of_buffer, METH_O,
      "address_of_buffer(obj) -> int\n"
      "Return address of obj assuming obj supports buffer inteface"},
-#if HAVE_FD_TRANSFER
-    {"sendfd", multiprocessing_sendfd, METH_VARARGS,
-     "sendfd(sockfd, fd) -> None\n"
-     "Send file descriptor given by fd over the unix domain socket\n"
-     "whose file decriptor is sockfd"},
-    {"recvfd", multiprocessing_recvfd, METH_VARARGS,
-     "recvfd(sockfd) -> fd\n"
-     "Receive a file descriptor over a unix domain socket\n"
-     "whose file decriptor is sockfd"},
-#endif
     {NULL}
 };
 
@@ -308,9 +196,6 @@
 #ifdef HAVE_SEM_TIMEDWAIT
     ADD_FLAG(HAVE_SEM_TIMEDWAIT);
 #endif
-#ifdef HAVE_FD_TRANSFER
-    ADD_FLAG(HAVE_FD_TRANSFER);
-#endif
 #ifdef HAVE_BROKEN_SEM_GETVALUE
     ADD_FLAG(HAVE_BROKEN_SEM_GETVALUE);
 #endif


More information about the Python-bugs-list mailing list