[Python-checkins] bpo-38880: List interpreters associated with a channel end (GH-17323)

Lewis Gaul webhook-mailer at python.org
Tue Apr 28 20:18:50 EDT 2020


https://github.com/python/cpython/commit/f7bbf58aa9299e9dd00b7a1bdd1113b4dcb6dfdf
commit: f7bbf58aa9299e9dd00b7a1bdd1113b4dcb6dfdf
branch: master
author: Lewis Gaul <Lewis at scphillips.com>
committer: GitHub <noreply at github.com>
date: 2020-04-28T17:18:42-07:00
summary:

bpo-38880: List interpreters associated with a channel end (GH-17323)



This PR adds the functionality requested by https://github.com/ericsnowcurrently/multi-core-python/issues/52.

Automerge-Triggered-By: @ericsnowcurrently

files:
A Misc/NEWS.d/next/Core and Builtins/2019-11-22-14-34-47.bpo-38880.evcCPa.rst
M Lib/test/test__xxsubinterpreters.py
M Misc/ACKS
M Modules/_xxsubinterpretersmodule.c

diff --git a/Lib/test/test__xxsubinterpreters.py b/Lib/test/test__xxsubinterpreters.py
index 8a368dc113972..44f4d3fa0f4c9 100644
--- a/Lib/test/test__xxsubinterpreters.py
+++ b/Lib/test/test__xxsubinterpreters.py
@@ -1207,6 +1207,185 @@ def test_ids_global(self):
 
         self.assertEqual(cid2, int(cid1) + 1)
 
+    def test_channel_list_interpreters_none(self):
+        """Test listing interpreters for a channel with no associations."""
+        # Test for channel with no associated interpreters.
+        cid = interpreters.channel_create()
+        send_interps = interpreters.channel_list_interpreters(cid, send=True)
+        recv_interps = interpreters.channel_list_interpreters(cid, send=False)
+        self.assertEqual(send_interps, [])
+        self.assertEqual(recv_interps, [])
+
+    def test_channel_list_interpreters_basic(self):
+        """Test basic listing channel interpreters."""
+        interp0 = interpreters.get_main()
+        cid = interpreters.channel_create()
+        interpreters.channel_send(cid, "send")
+        # Test for a channel that has one end associated to an interpreter.
+        send_interps = interpreters.channel_list_interpreters(cid, send=True)
+        recv_interps = interpreters.channel_list_interpreters(cid, send=False)
+        self.assertEqual(send_interps, [interp0])
+        self.assertEqual(recv_interps, [])
+
+        interp1 = interpreters.create()
+        _run_output(interp1, dedent(f"""
+            import _xxsubinterpreters as _interpreters
+            obj = _interpreters.channel_recv({cid})
+            """))
+        # Test for channel that has boths ends associated to an interpreter.
+        send_interps = interpreters.channel_list_interpreters(cid, send=True)
+        recv_interps = interpreters.channel_list_interpreters(cid, send=False)
+        self.assertEqual(send_interps, [interp0])
+        self.assertEqual(recv_interps, [interp1])
+
+    def test_channel_list_interpreters_multiple(self):
+        """Test listing interpreters for a channel with many associations."""
+        interp0 = interpreters.get_main()
+        interp1 = interpreters.create()
+        interp2 = interpreters.create()
+        interp3 = interpreters.create()
+        cid = interpreters.channel_create()
+
+        interpreters.channel_send(cid, "send")
+        _run_output(interp1, dedent(f"""
+            import _xxsubinterpreters as _interpreters
+            _interpreters.channel_send({cid}, "send")
+            """))
+        _run_output(interp2, dedent(f"""
+            import _xxsubinterpreters as _interpreters
+            obj = _interpreters.channel_recv({cid})
+            """))
+        _run_output(interp3, dedent(f"""
+            import _xxsubinterpreters as _interpreters
+            obj = _interpreters.channel_recv({cid})
+            """))
+        send_interps = interpreters.channel_list_interpreters(cid, send=True)
+        recv_interps = interpreters.channel_list_interpreters(cid, send=False)
+        self.assertEqual(set(send_interps), {interp0, interp1})
+        self.assertEqual(set(recv_interps), {interp2, interp3})
+
+    def test_channel_list_interpreters_destroyed(self):
+        """Test listing channel interpreters with a destroyed interpreter."""
+        interp0 = interpreters.get_main()
+        interp1 = interpreters.create()
+        cid = interpreters.channel_create()
+        interpreters.channel_send(cid, "send")
+        _run_output(interp1, dedent(f"""
+            import _xxsubinterpreters as _interpreters
+            obj = _interpreters.channel_recv({cid})
+            """))
+        # Should be one interpreter associated with each end.
+        send_interps = interpreters.channel_list_interpreters(cid, send=True)
+        recv_interps = interpreters.channel_list_interpreters(cid, send=False)
+        self.assertEqual(send_interps, [interp0])
+        self.assertEqual(recv_interps, [interp1])
+
+        interpreters.destroy(interp1)
+        # Destroyed interpreter should not be listed.
+        send_interps = interpreters.channel_list_interpreters(cid, send=True)
+        recv_interps = interpreters.channel_list_interpreters(cid, send=False)
+        self.assertEqual(send_interps, [interp0])
+        self.assertEqual(recv_interps, [])
+
+    def test_channel_list_interpreters_released(self):
+        """Test listing channel interpreters with a released channel."""
+        # Set up one channel with main interpreter on the send end and two
+        # subinterpreters on the receive end.
+        interp0 = interpreters.get_main()
+        interp1 = interpreters.create()
+        interp2 = interpreters.create()
+        cid = interpreters.channel_create()
+        interpreters.channel_send(cid, "data")
+        _run_output(interp1, dedent(f"""
+            import _xxsubinterpreters as _interpreters
+            obj = _interpreters.channel_recv({cid})
+            """))
+        interpreters.channel_send(cid, "data")
+        _run_output(interp2, dedent(f"""
+            import _xxsubinterpreters as _interpreters
+            obj = _interpreters.channel_recv({cid})
+            """))
+        # Check the setup.
+        send_interps = interpreters.channel_list_interpreters(cid, send=True)
+        recv_interps = interpreters.channel_list_interpreters(cid, send=False)
+        self.assertEqual(len(send_interps), 1)
+        self.assertEqual(len(recv_interps), 2)
+
+        # Release the main interpreter from the send end.
+        interpreters.channel_release(cid, send=True)
+        # Send end should have no associated interpreters.
+        send_interps = interpreters.channel_list_interpreters(cid, send=True)
+        recv_interps = interpreters.channel_list_interpreters(cid, send=False)
+        self.assertEqual(len(send_interps), 0)
+        self.assertEqual(len(recv_interps), 2)
+
+        # Release one of the subinterpreters from the receive end.
+        _run_output(interp2, dedent(f"""
+            import _xxsubinterpreters as _interpreters
+            _interpreters.channel_release({cid})
+            """))
+        # Receive end should have the released interpreter removed.
+        send_interps = interpreters.channel_list_interpreters(cid, send=True)
+        recv_interps = interpreters.channel_list_interpreters(cid, send=False)
+        self.assertEqual(len(send_interps), 0)
+        self.assertEqual(recv_interps, [interp1])
+
+    def test_channel_list_interpreters_closed(self):
+        """Test listing channel interpreters with a closed channel."""
+        interp0 = interpreters.get_main()
+        interp1 = interpreters.create()
+        cid = interpreters.channel_create()
+        # Put something in the channel so that it's not empty.
+        interpreters.channel_send(cid, "send")
+
+        # Check initial state.
+        send_interps = interpreters.channel_list_interpreters(cid, send=True)
+        recv_interps = interpreters.channel_list_interpreters(cid, send=False)
+        self.assertEqual(len(send_interps), 1)
+        self.assertEqual(len(recv_interps), 0)
+
+        # Force close the channel.
+        interpreters.channel_close(cid, force=True)
+        # Both ends should raise an error.
+        with self.assertRaises(interpreters.ChannelClosedError):
+            interpreters.channel_list_interpreters(cid, send=True)
+        with self.assertRaises(interpreters.ChannelClosedError):
+            interpreters.channel_list_interpreters(cid, send=False)
+
+    def test_channel_list_interpreters_closed_send_end(self):
+        """Test listing channel interpreters with a channel's send end closed."""
+        interp0 = interpreters.get_main()
+        interp1 = interpreters.create()
+        cid = interpreters.channel_create()
+        # Put something in the channel so that it's not empty.
+        interpreters.channel_send(cid, "send")
+
+        # Check initial state.
+        send_interps = interpreters.channel_list_interpreters(cid, send=True)
+        recv_interps = interpreters.channel_list_interpreters(cid, send=False)
+        self.assertEqual(len(send_interps), 1)
+        self.assertEqual(len(recv_interps), 0)
+
+        # Close the send end of the channel.
+        interpreters.channel_close(cid, send=True)
+        # Send end should raise an error.
+        with self.assertRaises(interpreters.ChannelClosedError):
+            interpreters.channel_list_interpreters(cid, send=True)
+        # Receive end should not be closed (since channel is not empty).
+        recv_interps = interpreters.channel_list_interpreters(cid, send=False)
+        self.assertEqual(len(recv_interps), 0)
+
+        # Close the receive end of the channel from a subinterpreter.
+        _run_output(interp1, dedent(f"""
+            import _xxsubinterpreters as _interpreters
+            _interpreters.channel_close({cid}, force=True)
+            """))
+        # Both ends should raise an error.
+        with self.assertRaises(interpreters.ChannelClosedError):
+            interpreters.channel_list_interpreters(cid, send=True)
+        with self.assertRaises(interpreters.ChannelClosedError):
+            interpreters.channel_list_interpreters(cid, send=False)
+
     ####################
 
     def test_send_recv_main(self):
@@ -1540,6 +1719,23 @@ def test_close_used_multiple_times_by_single_user(self):
         with self.assertRaises(interpreters.ChannelClosedError):
             interpreters.channel_recv(cid)
 
+    def test_channel_list_interpreters_invalid_channel(self):
+        cid = interpreters.channel_create()
+        # Test for invalid channel ID.
+        with self.assertRaises(interpreters.ChannelNotFoundError):
+            interpreters.channel_list_interpreters(1000, send=True)
+
+        interpreters.channel_close(cid)
+        # Test for a channel that has been closed.
+        with self.assertRaises(interpreters.ChannelClosedError):
+            interpreters.channel_list_interpreters(cid, send=True)
+
+    def test_channel_list_interpreters_invalid_args(self):
+        # Tests for invalid arguments passed to the API.
+        cid = interpreters.channel_create()
+        with self.assertRaises(TypeError):
+            interpreters.channel_list_interpreters(cid)
+
 
 class ChannelReleaseTests(TestBase):
 
diff --git a/Misc/ACKS b/Misc/ACKS
index d4ffc366769ac..89f37e584ef8b 100644
--- a/Misc/ACKS
+++ b/Misc/ACKS
@@ -456,6 +456,7 @@ Rodolpho Eckhardt
 Ulrich Eckhardt
 David Edelsohn
 John Edmonds
+Benjamin Edwards
 Grant Edwards
 Zvi Effron
 John Ehresman
@@ -570,6 +571,7 @@ Jake Garver
 Dan Gass
 Tim Gates
 Andrew Gaul
+Lewis Gaul
 Matthieu Gautier
 Stephen M. Gava
 Xavier de Gaye
diff --git a/Misc/NEWS.d/next/Core and Builtins/2019-11-22-14-34-47.bpo-38880.evcCPa.rst b/Misc/NEWS.d/next/Core and Builtins/2019-11-22-14-34-47.bpo-38880.evcCPa.rst
new file mode 100644
index 0000000000000..07a7f5ec22aa1
--- /dev/null
+++ b/Misc/NEWS.d/next/Core and Builtins/2019-11-22-14-34-47.bpo-38880.evcCPa.rst	
@@ -0,0 +1 @@
+Added the ability to list interpreters associated with channel ends in the internal subinterpreters module.
diff --git a/Modules/_xxsubinterpretersmodule.c b/Modules/_xxsubinterpretersmodule.c
index 2ee8d07d0671f..e618930e09d12 100644
--- a/Modules/_xxsubinterpretersmodule.c
+++ b/Modules/_xxsubinterpretersmodule.c
@@ -538,7 +538,7 @@ _channelend_find(_channelend *first, int64_t interp, _channelend **pprev)
 
 typedef struct _channelassociations {
     // Note that the list entries are never removed for interpreter
-    // for which the channel is closed.  This should be a problem in
+    // for which the channel is closed.  This should not be a problem in
     // practice.  Also, a channel isn't automatically closed when an
     // interpreter is destroyed.
     int64_t numsendopen;
@@ -1179,11 +1179,6 @@ _channels_list_all(_channels *channels, int64_t *count)
 {
     int64_t *cids = NULL;
     PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
-    int64_t numopen = channels->numopen;
-    if (numopen >= PY_SSIZE_T_MAX) {
-        PyErr_SetString(PyExc_RuntimeError, "too many channels open");
-        goto done;
-    }
     int64_t *ids = PyMem_NEW(int64_t, (Py_ssize_t)(channels->numopen));
     if (ids == NULL) {
         goto done;
@@ -1392,6 +1387,24 @@ _channel_close(_channels *channels, int64_t id, int end, int force)
     return _channels_close(channels, id, NULL, end, force);
 }
 
+static int
+_channel_is_associated(_channels *channels, int64_t cid, int64_t interp,
+                       int send)
+{
+    _PyChannelState *chan = _channels_lookup(channels, cid, NULL);
+    if (chan == NULL) {
+        return -1;
+    } else if (send && chan->closing != NULL) {
+        PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", cid);
+        return -1;
+    }
+
+    _channelend *end = _channelend_find(send ? chan->ends->send : chan->ends->recv,
+                                        interp, NULL);
+
+    return (end != NULL && end->open);
+}
+
 /* ChannelID class */
 
 static PyTypeObject ChannelIDtype;
@@ -2323,6 +2336,68 @@ PyDoc_STRVAR(channel_list_all_doc,
 \n\
 Return the list of all IDs for active channels.");
 
+static PyObject *
+channel_list_interpreters(PyObject *self, PyObject *args, PyObject *kwds)
+{
+    static char *kwlist[] = {"cid", "send", NULL};
+    int64_t cid;            /* Channel ID */
+    int send = 0;           /* Send or receive end? */
+    int64_t id;
+    PyObject *ids, *id_obj;
+    PyInterpreterState *interp;
+
+    if (!PyArg_ParseTupleAndKeywords(
+            args, kwds, "O&$p:channel_list_interpreters",
+            kwlist, channel_id_converter, &cid, &send)) {
+        return NULL;
+    }
+
+    ids = PyList_New(0);
+    if (ids == NULL) {
+        goto except;
+    }
+
+    interp = PyInterpreterState_Head();
+    while (interp != NULL) {
+        id = PyInterpreterState_GetID(interp);
+        assert(id >= 0);
+        int res = _channel_is_associated(&_globals.channels, cid, id, send);
+        if (res < 0) {
+            goto except;
+        }
+        if (res) {
+            id_obj = _PyInterpreterState_GetIDObject(interp);
+            if (id_obj == NULL) {
+                goto except;
+            }
+            res = PyList_Insert(ids, 0, id_obj);
+            Py_DECREF(id_obj);
+            if (res < 0) {
+                goto except;
+            }
+        }
+        interp = PyInterpreterState_Next(interp);
+    }
+
+    goto finally;
+
+except:
+    Py_XDECREF(ids);
+    ids = NULL;
+
+finally:
+    return ids;
+}
+
+PyDoc_STRVAR(channel_list_interpreters_doc,
+"channel_list_interpreters(cid, *, send) -> [id]\n\
+\n\
+Return the list of all interpreter IDs associated with an end of the channel.\n\
+\n\
+The 'send' argument should be a boolean indicating whether to use the send or\n\
+receive end.");
+
+
 static PyObject *
 channel_send(PyObject *self, PyObject *args, PyObject *kwds)
 {
@@ -2493,6 +2568,8 @@ static PyMethodDef module_functions[] = {
      METH_VARARGS | METH_KEYWORDS, channel_destroy_doc},
     {"channel_list_all",          channel_list_all,
      METH_NOARGS, channel_list_all_doc},
+    {"channel_list_interpreters", (PyCFunction)(void(*)(void))channel_list_interpreters,
+     METH_VARARGS | METH_KEYWORDS, channel_list_interpreters_doc},
     {"channel_send",              (PyCFunction)(void(*)(void))channel_send,
      METH_VARARGS | METH_KEYWORDS, channel_send_doc},
     {"channel_recv",              (PyCFunction)(void(*)(void))channel_recv,



More information about the Python-checkins mailing list