[Python-checkins] cpython: Issue #12981: rewrite multiprocessing_{sendfd,recvfd} in Python.

charles-francois.natali python-checkins at python.org
Sat Sep 24 20:04:01 CEST 2011


http://hg.python.org/cpython/rev/95ee0df1e746
changeset:   72465:95ee0df1e746
user:        Charles-François Natali <neologix at free.fr>
date:        Sat Sep 24 20:04:29 2011 +0200
summary:
  Issue #12981: rewrite multiprocessing_{sendfd,recvfd} in Python.

files:
  Lib/multiprocessing/reduction.py           |   21 +-
  Modules/_multiprocessing/multiprocessing.c |  128 +---------
  Modules/_multiprocessing/multiprocessing.h |   10 -
  3 files changed, 19 insertions(+), 140 deletions(-)


diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py
--- a/Lib/multiprocessing/reduction.py
+++ b/Lib/multiprocessing/reduction.py
@@ -39,6 +39,7 @@
 import sys
 import socket
 import threading
+import struct
 
 import _multiprocessing
 from multiprocessing import current_process
@@ -51,7 +52,8 @@
 #
 #
 
-if not(sys.platform == 'win32' or hasattr(_multiprocessing, 'recvfd')):
+if not(sys.platform == 'win32' or (hasattr(socket, 'CMSG_LEN') and
+                                   hasattr(socket, 'SCM_RIGHTS'))):
     raise ImportError('pickling of connections not supported')
 
 #
@@ -77,10 +79,23 @@
 
 else:
     def send_handle(conn, handle, destination_pid):
-        _multiprocessing.sendfd(conn.fileno(), handle)
+        with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
+            s.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS,
+                                struct.pack("@i", handle))])
 
     def recv_handle(conn):
-        return _multiprocessing.recvfd(conn.fileno())
+        size = struct.calcsize("@i")
+        with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
+            msg, ancdata, flags, addr = s.recvmsg(1, socket.CMSG_LEN(size))
+            try:
+                cmsg_level, cmsg_type, cmsg_data = ancdata[0]
+                if (cmsg_level == socket.SOL_SOCKET and
+                    cmsg_type == socket.SCM_RIGHTS):
+                    return struct.unpack("@i", cmsg_data[:size])[0]
+            except (ValueError, IndexError, struct.error):
+                pass
+            raise RuntimeError('Invalid data received')
+
 
 #
 # Support for a per-process server thread which caches pickled handles
diff --git a/Modules/_multiprocessing/multiprocessing.c b/Modules/_multiprocessing/multiprocessing.c
--- a/Modules/_multiprocessing/multiprocessing.c
+++ b/Modules/_multiprocessing/multiprocessing.c
@@ -8,11 +8,6 @@
 
 #include "multiprocessing.h"
 
-#ifdef SCM_RIGHTS
-    #define HAVE_FD_TRANSFER 1
-#else
-    #define HAVE_FD_TRANSFER 0
-#endif
 
 PyObject *create_win32_namespace(void);
 
@@ -75,115 +70,7 @@
     return FALSE;
 }
 
-/*
- * Unix only
- */
-
-#else /* !MS_WINDOWS */
-
-#if HAVE_FD_TRANSFER
-
-/* Functions for transferring file descriptors between processes.
-   Reimplements some of the functionality of the fdcred
-   module at http://www.mca-ltd.com/resources/fdcred_1.tgz. */
-/* Based in http://resin.csoft.net/cgi-bin/man.cgi?section=3&topic=CMSG_DATA */
-
-static PyObject *
-multiprocessing_sendfd(PyObject *self, PyObject *args)
-{
-    int conn, fd, res;
-    struct iovec dummy_iov;
-    char dummy_char;
-    struct msghdr msg;
-    struct cmsghdr *cmsg;
-    union {
-        struct cmsghdr hdr;
-        unsigned char buf[CMSG_SPACE(sizeof(int))];
-    } cmsgbuf;
-
-    if (!PyArg_ParseTuple(args, "ii", &conn, &fd))
-        return NULL;
-
-    dummy_iov.iov_base = &dummy_char;
-    dummy_iov.iov_len = 1;
-
-    memset(&msg, 0, sizeof(msg));
-    msg.msg_control = &cmsgbuf.buf;
-    msg.msg_controllen = sizeof(cmsgbuf.buf);
-    msg.msg_iov = &dummy_iov;
-    msg.msg_iovlen = 1;
-
-    cmsg = CMSG_FIRSTHDR(&msg);
-    cmsg->cmsg_len = CMSG_LEN(sizeof(int));
-    cmsg->cmsg_level = SOL_SOCKET;
-    cmsg->cmsg_type = SCM_RIGHTS;
-    * (int *) CMSG_DATA(cmsg) = fd;
-
-    Py_BEGIN_ALLOW_THREADS
-    res = sendmsg(conn, &msg, 0);
-    Py_END_ALLOW_THREADS
-
-    if (res < 0)
-        return PyErr_SetFromErrno(PyExc_OSError);
-    Py_RETURN_NONE;
-}
-
-static PyObject *
-multiprocessing_recvfd(PyObject *self, PyObject *args)
-{
-    int conn, fd, res;
-    char dummy_char;
-    struct iovec dummy_iov;
-    struct msghdr msg = {0};
-    struct cmsghdr *cmsg;
-    union {
-        struct cmsghdr hdr;
-        unsigned char buf[CMSG_SPACE(sizeof(int))];
-    } cmsgbuf;
-
-    if (!PyArg_ParseTuple(args, "i", &conn))
-        return NULL;
-
-    dummy_iov.iov_base = &dummy_char;
-    dummy_iov.iov_len = 1;
-
-    memset(&msg, 0, sizeof(msg));
-    msg.msg_control = &cmsgbuf.buf;
-    msg.msg_controllen = sizeof(cmsgbuf.buf);
-    msg.msg_iov = &dummy_iov;
-    msg.msg_iovlen = 1;
-
-    cmsg = CMSG_FIRSTHDR(&msg);
-    cmsg->cmsg_level = SOL_SOCKET;
-    cmsg->cmsg_type = SCM_RIGHTS;
-    cmsg->cmsg_len = CMSG_LEN(sizeof(int));
-    msg.msg_controllen = cmsg->cmsg_len;
-
-    Py_BEGIN_ALLOW_THREADS
-    res = recvmsg(conn, &msg, 0);
-    Py_END_ALLOW_THREADS
-
-    if (res < 0)
-        return PyErr_SetFromErrno(PyExc_OSError);
-
-    if (msg.msg_controllen < CMSG_LEN(sizeof(int)) ||
-        (cmsg = CMSG_FIRSTHDR(&msg)) == NULL ||
-        cmsg->cmsg_level != SOL_SOCKET ||
-        cmsg->cmsg_type != SCM_RIGHTS ||
-        cmsg->cmsg_len < CMSG_LEN(sizeof(int))) {
-        /* If at least one control message is present, there should be
-           no room for any further data in the buffer. */
-        PyErr_SetString(PyExc_RuntimeError, "No file descriptor received");
-        return NULL;
-    }
-
-    fd = * (int *) CMSG_DATA(cmsg);
-    return Py_BuildValue("i", fd);
-}
-
-#endif /* HAVE_FD_TRANSFER */
-
-#endif /* !MS_WINDOWS */
+#endif /* MS_WINDOWS */
 
 
 /*
@@ -212,16 +99,6 @@
     {"address_of_buffer", multiprocessing_address_of_buffer, METH_O,
      "address_of_buffer(obj) -> int\n"
      "Return address of obj assuming obj supports buffer inteface"},
-#if HAVE_FD_TRANSFER
-    {"sendfd", multiprocessing_sendfd, METH_VARARGS,
-     "sendfd(sockfd, fd) -> None\n"
-     "Send file descriptor given by fd over the unix domain socket\n"
-     "whose file decriptor is sockfd"},
-    {"recvfd", multiprocessing_recvfd, METH_VARARGS,
-     "recvfd(sockfd) -> fd\n"
-     "Receive a file descriptor over a unix domain socket\n"
-     "whose file decriptor is sockfd"},
-#endif
     {NULL}
 };
 
@@ -319,9 +196,6 @@
 #ifdef HAVE_SEM_TIMEDWAIT
     ADD_FLAG(HAVE_SEM_TIMEDWAIT);
 #endif
-#ifdef HAVE_FD_TRANSFER
-    ADD_FLAG(HAVE_FD_TRANSFER);
-#endif
 #ifdef HAVE_BROKEN_SEM_GETVALUE
     ADD_FLAG(HAVE_BROKEN_SEM_GETVALUE);
 #endif
diff --git a/Modules/_multiprocessing/multiprocessing.h b/Modules/_multiprocessing/multiprocessing.h
--- a/Modules/_multiprocessing/multiprocessing.h
+++ b/Modules/_multiprocessing/multiprocessing.h
@@ -3,12 +3,6 @@
 
 #define PY_SSIZE_T_CLEAN
 
-#ifdef __sun
-/* The control message API is only available on Solaris
-   if XPG 4.2 or later is requested. */
-#define _XOPEN_SOURCE 500
-#endif
-
 #include "Python.h"
 #include "structmember.h"
 #include "pythread.h"
@@ -29,10 +23,6 @@
 #  define SEM_VALUE_MAX LONG_MAX
 #else
 #  include <fcntl.h>                 /* O_CREAT and O_EXCL */
-#  include <netinet/in.h>
-#  include <sys/socket.h>
-#  include <sys/uio.h>
-#  include <arpa/inet.h>             /* htonl() and ntohl() */
 #  if defined(HAVE_SEM_OPEN) && !defined(POSIX_SEMAPHORES_NOT_ENABLED)
 #    include <semaphore.h>
      typedef sem_t *SEM_HANDLE;

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list