[Python-checkins] gh-101524: Split Up the _xxsubinterpreters Module (gh-101526)

ericsnowcurrently webhook-mailer at python.org
Fri Feb 3 20:14:53 EST 2023


https://github.com/python/cpython/commit/c67b00534abfeca83016a00818cf1fd949613d6b
commit: c67b00534abfeca83016a00818cf1fd949613d6b
branch: main
author: Eric Snow <ericsnowcurrently at gmail.com>
committer: ericsnowcurrently <ericsnowcurrently at gmail.com>
date: 2023-02-03T18:14:43-07:00
summary:

gh-101524: Split Up the _xxsubinterpreters Module (gh-101526)

This is step 1 in potentially dropping all the "channel"-related code. Channels have already been removed from PEP 554.

https://github.com/python/cpython/issues/101524

files:
A Lib/test/test__xxinterpchannels.py
A Modules/_xxinterpchannelsmodule.c
M Lib/test/support/interpreters.py
M Lib/test/test__xxsubinterpreters.py
M Lib/test/test_interpreters.py
M Modules/Setup
M Modules/Setup.stdlib.in
M Modules/_testcapimodule.c
M Modules/_xxsubinterpretersmodule.c
M PC/config.c
M PCbuild/pythoncore.vcxproj
M PCbuild/pythoncore.vcxproj.filters
M Tools/build/generate_stdlib_module_names.py
M configure
M configure.ac

diff --git a/Lib/test/support/interpreters.py b/Lib/test/support/interpreters.py
index 2935708f9df1..eeff3abe0324 100644
--- a/Lib/test/support/interpreters.py
+++ b/Lib/test/support/interpreters.py
@@ -2,11 +2,12 @@
 
 import time
 import _xxsubinterpreters as _interpreters
+import _xxinterpchannels as _channels
 
 # aliases:
-from _xxsubinterpreters import (
+from _xxsubinterpreters import is_shareable
+from _xxinterpchannels import (
     ChannelError, ChannelNotFoundError, ChannelEmptyError,
-    is_shareable,
 )
 
 
@@ -102,7 +103,7 @@ def create_channel():
 
     The channel may be used to pass data safely between interpreters.
     """
-    cid = _interpreters.channel_create()
+    cid = _channels.create()
     recv, send = RecvChannel(cid), SendChannel(cid)
     return recv, send
 
@@ -110,14 +111,14 @@ def create_channel():
 def list_all_channels():
     """Return a list of (recv, send) for all open channels."""
     return [(RecvChannel(cid), SendChannel(cid))
-            for cid in _interpreters.channel_list_all()]
+            for cid in _channels.list_all()]
 
 
 class _ChannelEnd:
     """The base class for RecvChannel and SendChannel."""
 
     def __init__(self, id):
-        if not isinstance(id, (int, _interpreters.ChannelID)):
+        if not isinstance(id, (int, _channels.ChannelID)):
             raise TypeError(f'id must be an int, got {id!r}')
         self._id = id
 
@@ -152,10 +153,10 @@ def recv(self, *, _sentinel=object(), _delay=10 / 1000):  # 10 milliseconds
         This blocks until an object has been sent, if none have been
         sent already.
         """
-        obj = _interpreters.channel_recv(self._id, _sentinel)
+        obj = _channels.recv(self._id, _sentinel)
         while obj is _sentinel:
             time.sleep(_delay)
-            obj = _interpreters.channel_recv(self._id, _sentinel)
+            obj = _channels.recv(self._id, _sentinel)
         return obj
 
     def recv_nowait(self, default=_NOT_SET):
@@ -166,9 +167,9 @@ def recv_nowait(self, default=_NOT_SET):
         is the same as recv().
         """
         if default is _NOT_SET:
-            return _interpreters.channel_recv(self._id)
+            return _channels.recv(self._id)
         else:
-            return _interpreters.channel_recv(self._id, default)
+            return _channels.recv(self._id, default)
 
 
 class SendChannel(_ChannelEnd):
@@ -179,7 +180,7 @@ def send(self, obj):
 
         This blocks until the object is received.
         """
-        _interpreters.channel_send(self._id, obj)
+        _channels.send(self._id, obj)
         # XXX We are missing a low-level channel_send_wait().
         # See bpo-32604 and gh-19829.
         # Until that shows up we fake it:
@@ -194,4 +195,4 @@ def send_nowait(self, obj):
         # XXX Note that at the moment channel_send() only ever returns
         # None.  This should be fixed when channel_send_wait() is added.
         # See bpo-32604 and gh-19829.
-        return _interpreters.channel_send(self._id, obj)
+        return _channels.send(self._id, obj)
diff --git a/Lib/test/test__xxinterpchannels.py b/Lib/test/test__xxinterpchannels.py
new file mode 100644
index 000000000000..03bb5c80b8da
--- /dev/null
+++ b/Lib/test/test__xxinterpchannels.py
@@ -0,0 +1,1541 @@
+from collections import namedtuple
+import contextlib
+import os
+import sys
+from textwrap import dedent
+import threading
+import time
+import unittest
+
+from test.support import import_helper
+
+from test.test__xxsubinterpreters import (
+    interpreters,
+    _run_output,
+    clean_up_interpreters,
+)
+
+
+channels = import_helper.import_module('_xxinterpchannels')
+
+
+##################################
+# helpers
+
+#@contextmanager
+#def run_threaded(id, source, **shared):
+#    def run():
+#        run_interp(id, source, **shared)
+#    t = threading.Thread(target=run)
+#    t.start()
+#    yield
+#    t.join()
+
+
+def run_interp(id, source, **shared):
+    _run_interp(id, source, shared)
+
+
+def _run_interp(id, source, shared, _mainns={}):
+    source = dedent(source)
+    main = interpreters.get_main()
+    if main == id:
+        if interpreters.get_current() != main:
+            raise RuntimeError
+        # XXX Run a func?
+        exec(source, _mainns)
+    else:
+        interpreters.run_string(id, source, shared)
+
+
+class Interpreter(namedtuple('Interpreter', 'name id')):
+
+    @classmethod
+    def from_raw(cls, raw):
+        if isinstance(raw, cls):
+            return raw
+        elif isinstance(raw, str):
+            return cls(raw)
+        else:
+            raise NotImplementedError
+
+    def __new__(cls, name=None, id=None):
+        main = interpreters.get_main()
+        if id == main:
+            if not name:
+                name = 'main'
+            elif name != 'main':
+                raise ValueError(
+                    'name mismatch (expected "main", got "{}")'.format(name))
+            id = main
+        elif id is not None:
+            if not name:
+                name = 'interp'
+            elif name == 'main':
+                raise ValueError('name mismatch (unexpected "main")')
+            if not isinstance(id, interpreters.InterpreterID):
+                id = interpreters.InterpreterID(id)
+        elif not name or name == 'main':
+            name = 'main'
+            id = main
+        else:
+            id = interpreters.create()
+        self = super().__new__(cls, name, id)
+        return self
+
+
+# XXX expect_channel_closed() is unnecessary once we improve exc propagation.
+
+ at contextlib.contextmanager
+def expect_channel_closed():
+    try:
+        yield
+    except channels.ChannelClosedError:
+        pass
+    else:
+        assert False, 'channel not closed'
+
+
+class ChannelAction(namedtuple('ChannelAction', 'action end interp')):
+
+    def __new__(cls, action, end=None, interp=None):
+        if not end:
+            end = 'both'
+        if not interp:
+            interp = 'main'
+        self = super().__new__(cls, action, end, interp)
+        return self
+
+    def __init__(self, *args, **kwargs):
+        if self.action == 'use':
+            if self.end not in ('same', 'opposite', 'send', 'recv'):
+                raise ValueError(self.end)
+        elif self.action in ('close', 'force-close'):
+            if self.end not in ('both', 'same', 'opposite', 'send', 'recv'):
+                raise ValueError(self.end)
+        else:
+            raise ValueError(self.action)
+        if self.interp not in ('main', 'same', 'other', 'extra'):
+            raise ValueError(self.interp)
+
+    def resolve_end(self, end):
+        if self.end == 'same':
+            return end
+        elif self.end == 'opposite':
+            return 'recv' if end == 'send' else 'send'
+        else:
+            return self.end
+
+    def resolve_interp(self, interp, other, extra):
+        if self.interp == 'same':
+            return interp
+        elif self.interp == 'other':
+            if other is None:
+                raise RuntimeError
+            return other
+        elif self.interp == 'extra':
+            if extra is None:
+                raise RuntimeError
+            return extra
+        elif self.interp == 'main':
+            if interp.name == 'main':
+                return interp
+            elif other and other.name == 'main':
+                return other
+            else:
+                raise RuntimeError
+        # Per __init__(), there aren't any others.
+
+
+class ChannelState(namedtuple('ChannelState', 'pending closed')):
+
+    def __new__(cls, pending=0, *, closed=False):
+        self = super().__new__(cls, pending, closed)
+        return self
+
+    def incr(self):
+        return type(self)(self.pending + 1, closed=self.closed)
+
+    def decr(self):
+        return type(self)(self.pending - 1, closed=self.closed)
+
+    def close(self, *, force=True):
+        if self.closed:
+            if not force or self.pending == 0:
+                return self
+        return type(self)(0 if force else self.pending, closed=True)
+
+
+def run_action(cid, action, end, state, *, hideclosed=True):
+    if state.closed:
+        if action == 'use' and end == 'recv' and state.pending:
+            expectfail = False
+        else:
+            expectfail = True
+    else:
+        expectfail = False
+
+    try:
+        result = _run_action(cid, action, end, state)
+    except channels.ChannelClosedError:
+        if not hideclosed and not expectfail:
+            raise
+        result = state.close()
+    else:
+        if expectfail:
+            raise ...  # XXX
+    return result
+
+
+def _run_action(cid, action, end, state):
+    if action == 'use':
+        if end == 'send':
+            channels.send(cid, b'spam')
+            return state.incr()
+        elif end == 'recv':
+            if not state.pending:
+                try:
+                    channels.recv(cid)
+                except channels.ChannelEmptyError:
+                    return state
+                else:
+                    raise Exception('expected ChannelEmptyError')
+            else:
+                channels.recv(cid)
+                return state.decr()
+        else:
+            raise ValueError(end)
+    elif action == 'close':
+        kwargs = {}
+        if end in ('recv', 'send'):
+            kwargs[end] = True
+        channels.close(cid, **kwargs)
+        return state.close()
+    elif action == 'force-close':
+        kwargs = {
+            'force': True,
+            }
+        if end in ('recv', 'send'):
+            kwargs[end] = True
+        channels.close(cid, **kwargs)
+        return state.close(force=True)
+    else:
+        raise ValueError(action)
+
+
+def clean_up_channels():
+    for cid in channels.list_all():
+        try:
+            channels.destroy(cid)
+        except channels.ChannelNotFoundError:
+            pass  # already destroyed
+
+
+class TestBase(unittest.TestCase):
+
+    def tearDown(self):
+        clean_up_channels()
+        clean_up_interpreters()
+
+
+##################################
+# channel tests
+
+class ChannelIDTests(TestBase):
+
+    def test_default_kwargs(self):
+        cid = channels._channel_id(10, force=True)
+
+        self.assertEqual(int(cid), 10)
+        self.assertEqual(cid.end, 'both')
+
+    def test_with_kwargs(self):
+        cid = channels._channel_id(10, send=True, force=True)
+        self.assertEqual(cid.end, 'send')
+
+        cid = channels._channel_id(10, send=True, recv=False, force=True)
+        self.assertEqual(cid.end, 'send')
+
+        cid = channels._channel_id(10, recv=True, force=True)
+        self.assertEqual(cid.end, 'recv')
+
+        cid = channels._channel_id(10, recv=True, send=False, force=True)
+        self.assertEqual(cid.end, 'recv')
+
+        cid = channels._channel_id(10, send=True, recv=True, force=True)
+        self.assertEqual(cid.end, 'both')
+
+    def test_coerce_id(self):
+        class Int(str):
+            def __index__(self):
+                return 10
+
+        cid = channels._channel_id(Int(), force=True)
+        self.assertEqual(int(cid), 10)
+
+    def test_bad_id(self):
+        self.assertRaises(TypeError, channels._channel_id, object())
+        self.assertRaises(TypeError, channels._channel_id, 10.0)
+        self.assertRaises(TypeError, channels._channel_id, '10')
+        self.assertRaises(TypeError, channels._channel_id, b'10')
+        self.assertRaises(ValueError, channels._channel_id, -1)
+        self.assertRaises(OverflowError, channels._channel_id, 2**64)
+
+    def test_bad_kwargs(self):
+        with self.assertRaises(ValueError):
+            channels._channel_id(10, send=False, recv=False)
+
+    def test_does_not_exist(self):
+        cid = channels.create()
+        with self.assertRaises(channels.ChannelNotFoundError):
+            channels._channel_id(int(cid) + 1)  # unforced
+
+    def test_str(self):
+        cid = channels._channel_id(10, force=True)
+        self.assertEqual(str(cid), '10')
+
+    def test_repr(self):
+        cid = channels._channel_id(10, force=True)
+        self.assertEqual(repr(cid), 'ChannelID(10)')
+
+        cid = channels._channel_id(10, send=True, force=True)
+        self.assertEqual(repr(cid), 'ChannelID(10, send=True)')
+
+        cid = channels._channel_id(10, recv=True, force=True)
+        self.assertEqual(repr(cid), 'ChannelID(10, recv=True)')
+
+        cid = channels._channel_id(10, send=True, recv=True, force=True)
+        self.assertEqual(repr(cid), 'ChannelID(10)')
+
+    def test_equality(self):
+        cid1 = channels.create()
+        cid2 = channels._channel_id(int(cid1))
+        cid3 = channels.create()
+
+        self.assertTrue(cid1 == cid1)
+        self.assertTrue(cid1 == cid2)
+        self.assertTrue(cid1 == int(cid1))
+        self.assertTrue(int(cid1) == cid1)
+        self.assertTrue(cid1 == float(int(cid1)))
+        self.assertTrue(float(int(cid1)) == cid1)
+        self.assertFalse(cid1 == float(int(cid1)) + 0.1)
+        self.assertFalse(cid1 == str(int(cid1)))
+        self.assertFalse(cid1 == 2**1000)
+        self.assertFalse(cid1 == float('inf'))
+        self.assertFalse(cid1 == 'spam')
+        self.assertFalse(cid1 == cid3)
+
+        self.assertFalse(cid1 != cid1)
+        self.assertFalse(cid1 != cid2)
+        self.assertTrue(cid1 != cid3)
+
+    def test_shareable(self):
+        chan = channels.create()
+
+        obj = channels.create()
+        channels.send(chan, obj)
+        got = channels.recv(chan)
+
+        self.assertEqual(got, obj)
+        self.assertIs(type(got), type(obj))
+        # XXX Check the following in the channel tests?
+        #self.assertIsNot(got, obj)
+
+
+class ChannelTests(TestBase):
+
+    def test_create_cid(self):
+        cid = channels.create()
+        self.assertIsInstance(cid, channels.ChannelID)
+
+    def test_sequential_ids(self):
+        before = channels.list_all()
+        id1 = channels.create()
+        id2 = channels.create()
+        id3 = channels.create()
+        after = channels.list_all()
+
+        self.assertEqual(id2, int(id1) + 1)
+        self.assertEqual(id3, int(id2) + 1)
+        self.assertEqual(set(after) - set(before), {id1, id2, id3})
+
+    def test_ids_global(self):
+        id1 = interpreters.create()
+        out = _run_output(id1, dedent("""
+            import _xxinterpchannels as _channels
+            cid = _channels.create()
+            print(cid)
+            """))
+        cid1 = int(out.strip())
+
+        id2 = interpreters.create()
+        out = _run_output(id2, dedent("""
+            import _xxinterpchannels as _channels
+            cid = _channels.create()
+            print(cid)
+            """))
+        cid2 = int(out.strip())
+
+        self.assertEqual(cid2, int(cid1) + 1)
+
+    def test_channel_list_interpreters_none(self):
+        """Test listing interpreters for a channel with no associations."""
+        # Test for channel with no associated interpreters.
+        cid = channels.create()
+        send_interps = channels.list_interpreters(cid, send=True)
+        recv_interps = channels.list_interpreters(cid, send=False)
+        self.assertEqual(send_interps, [])
+        self.assertEqual(recv_interps, [])
+
+    def test_channel_list_interpreters_basic(self):
+        """Test basic listing channel interpreters."""
+        interp0 = interpreters.get_main()
+        cid = channels.create()
+        channels.send(cid, "send")
+        # Test for a channel that has one end associated to an interpreter.
+        send_interps = channels.list_interpreters(cid, send=True)
+        recv_interps = channels.list_interpreters(cid, send=False)
+        self.assertEqual(send_interps, [interp0])
+        self.assertEqual(recv_interps, [])
+
+        interp1 = interpreters.create()
+        _run_output(interp1, dedent(f"""
+            import _xxinterpchannels as _channels
+            obj = _channels.recv({cid})
+            """))
+        # Test for channel that has both ends associated to an interpreter.
+        send_interps = channels.list_interpreters(cid, send=True)
+        recv_interps = channels.list_interpreters(cid, send=False)
+        self.assertEqual(send_interps, [interp0])
+        self.assertEqual(recv_interps, [interp1])
+
+    def test_channel_list_interpreters_multiple(self):
+        """Test listing interpreters for a channel with many associations."""
+        interp0 = interpreters.get_main()
+        interp1 = interpreters.create()
+        interp2 = interpreters.create()
+        interp3 = interpreters.create()
+        cid = channels.create()
+
+        channels.send(cid, "send")
+        _run_output(interp1, dedent(f"""
+            import _xxinterpchannels as _channels
+            _channels.send({cid}, "send")
+            """))
+        _run_output(interp2, dedent(f"""
+            import _xxinterpchannels as _channels
+            obj = _channels.recv({cid})
+            """))
+        _run_output(interp3, dedent(f"""
+            import _xxinterpchannels as _channels
+            obj = _channels.recv({cid})
+            """))
+        send_interps = channels.list_interpreters(cid, send=True)
+        recv_interps = channels.list_interpreters(cid, send=False)
+        self.assertEqual(set(send_interps), {interp0, interp1})
+        self.assertEqual(set(recv_interps), {interp2, interp3})
+
+    def test_channel_list_interpreters_destroyed(self):
+        """Test listing channel interpreters with a destroyed interpreter."""
+        interp0 = interpreters.get_main()
+        interp1 = interpreters.create()
+        cid = channels.create()
+        channels.send(cid, "send")
+        _run_output(interp1, dedent(f"""
+            import _xxinterpchannels as _channels
+            obj = _channels.recv({cid})
+            """))
+        # Should be one interpreter associated with each end.
+        send_interps = channels.list_interpreters(cid, send=True)
+        recv_interps = channels.list_interpreters(cid, send=False)
+        self.assertEqual(send_interps, [interp0])
+        self.assertEqual(recv_interps, [interp1])
+
+        interpreters.destroy(interp1)
+        # Destroyed interpreter should not be listed.
+        send_interps = channels.list_interpreters(cid, send=True)
+        recv_interps = channels.list_interpreters(cid, send=False)
+        self.assertEqual(send_interps, [interp0])
+        self.assertEqual(recv_interps, [])
+
+    def test_channel_list_interpreters_released(self):
+        """Test listing channel interpreters with a released channel."""
+        # Set up one channel with main interpreter on the send end and two
+        # subinterpreters on the receive end.
+        interp0 = interpreters.get_main()
+        interp1 = interpreters.create()
+        interp2 = interpreters.create()
+        cid = channels.create()
+        channels.send(cid, "data")
+        _run_output(interp1, dedent(f"""
+            import _xxinterpchannels as _channels
+            obj = _channels.recv({cid})
+            """))
+        channels.send(cid, "data")
+        _run_output(interp2, dedent(f"""
+            import _xxinterpchannels as _channels
+            obj = _channels.recv({cid})
+            """))
+        # Check the setup.
+        send_interps = channels.list_interpreters(cid, send=True)
+        recv_interps = channels.list_interpreters(cid, send=False)
+        self.assertEqual(len(send_interps), 1)
+        self.assertEqual(len(recv_interps), 2)
+
+        # Release the main interpreter from the send end.
+        channels.release(cid, send=True)
+        # Send end should have no associated interpreters.
+        send_interps = channels.list_interpreters(cid, send=True)
+        recv_interps = channels.list_interpreters(cid, send=False)
+        self.assertEqual(len(send_interps), 0)
+        self.assertEqual(len(recv_interps), 2)
+
+        # Release one of the subinterpreters from the receive end.
+        _run_output(interp2, dedent(f"""
+            import _xxinterpchannels as _channels
+            _channels.release({cid})
+            """))
+        # Receive end should have the released interpreter removed.
+        send_interps = channels.list_interpreters(cid, send=True)
+        recv_interps = channels.list_interpreters(cid, send=False)
+        self.assertEqual(len(send_interps), 0)
+        self.assertEqual(recv_interps, [interp1])
+
+    def test_channel_list_interpreters_closed(self):
+        """Test listing channel interpreters with a closed channel."""
+        interp0 = interpreters.get_main()
+        interp1 = interpreters.create()
+        cid = channels.create()
+        # Put something in the channel so that it's not empty.
+        channels.send(cid, "send")
+
+        # Check initial state.
+        send_interps = channels.list_interpreters(cid, send=True)
+        recv_interps = channels.list_interpreters(cid, send=False)
+        self.assertEqual(len(send_interps), 1)
+        self.assertEqual(len(recv_interps), 0)
+
+        # Force close the channel.
+        channels.close(cid, force=True)
+        # Both ends should raise an error.
+        with self.assertRaises(channels.ChannelClosedError):
+            channels.list_interpreters(cid, send=True)
+        with self.assertRaises(channels.ChannelClosedError):
+            channels.list_interpreters(cid, send=False)
+
+    def test_channel_list_interpreters_closed_send_end(self):
+        """Test listing channel interpreters with a channel's send end closed."""
+        interp0 = interpreters.get_main()
+        interp1 = interpreters.create()
+        cid = channels.create()
+        # Put something in the channel so that it's not empty.
+        channels.send(cid, "send")
+
+        # Check initial state.
+        send_interps = channels.list_interpreters(cid, send=True)
+        recv_interps = channels.list_interpreters(cid, send=False)
+        self.assertEqual(len(send_interps), 1)
+        self.assertEqual(len(recv_interps), 0)
+
+        # Close the send end of the channel.
+        channels.close(cid, send=True)
+        # Send end should raise an error.
+        with self.assertRaises(channels.ChannelClosedError):
+            channels.list_interpreters(cid, send=True)
+        # Receive end should not be closed (since channel is not empty).
+        recv_interps = channels.list_interpreters(cid, send=False)
+        self.assertEqual(len(recv_interps), 0)
+
+        # Close the receive end of the channel from a subinterpreter.
+        _run_output(interp1, dedent(f"""
+            import _xxinterpchannels as _channels
+            _channels.close({cid}, force=True)
+            """))
+        # Both ends should raise an error.
+        with self.assertRaises(channels.ChannelClosedError):
+            channels.list_interpreters(cid, send=True)
+        with self.assertRaises(channels.ChannelClosedError):
+            channels.list_interpreters(cid, send=False)
+
+    ####################
+
+    def test_send_recv_main(self):
+        cid = channels.create()
+        orig = b'spam'
+        channels.send(cid, orig)
+        obj = channels.recv(cid)
+
+        self.assertEqual(obj, orig)
+        self.assertIsNot(obj, orig)
+
+    def test_send_recv_same_interpreter(self):
+        id1 = interpreters.create()
+        out = _run_output(id1, dedent("""
+            import _xxinterpchannels as _channels
+            cid = _channels.create()
+            orig = b'spam'
+            _channels.send(cid, orig)
+            obj = _channels.recv(cid)
+            assert obj is not orig
+            assert obj == orig
+            """))
+
+    def test_send_recv_different_interpreters(self):
+        cid = channels.create()
+        id1 = interpreters.create()
+        out = _run_output(id1, dedent(f"""
+            import _xxinterpchannels as _channels
+            _channels.send({cid}, b'spam')
+            """))
+        obj = channels.recv(cid)
+
+        self.assertEqual(obj, b'spam')
+
+    def test_send_recv_different_threads(self):
+        cid = channels.create()
+
+        def f():
+            while True:
+                try:
+                    obj = channels.recv(cid)
+                    break
+                except channels.ChannelEmptyError:
+                    time.sleep(0.1)
+            channels.send(cid, obj)
+        t = threading.Thread(target=f)
+        t.start()
+
+        channels.send(cid, b'spam')
+        t.join()
+        obj = channels.recv(cid)
+
+        self.assertEqual(obj, b'spam')
+
+    def test_send_recv_different_interpreters_and_threads(self):
+        cid = channels.create()
+        id1 = interpreters.create()
+        out = None
+
+        def f():
+            nonlocal out
+            out = _run_output(id1, dedent(f"""
+                import time
+                import _xxinterpchannels as _channels
+                while True:
+                    try:
+                        obj = _channels.recv({cid})
+                        break
+                    except _channels.ChannelEmptyError:
+                        time.sleep(0.1)
+                assert(obj == b'spam')
+                _channels.send({cid}, b'eggs')
+                """))
+        t = threading.Thread(target=f)
+        t.start()
+
+        channels.send(cid, b'spam')
+        t.join()
+        obj = channels.recv(cid)
+
+        self.assertEqual(obj, b'eggs')
+
+    def test_send_not_found(self):
+        with self.assertRaises(channels.ChannelNotFoundError):
+            channels.send(10, b'spam')
+
+    def test_recv_not_found(self):
+        with self.assertRaises(channels.ChannelNotFoundError):
+            channels.recv(10)
+
+    def test_recv_empty(self):
+        cid = channels.create()
+        with self.assertRaises(channels.ChannelEmptyError):
+            channels.recv(cid)
+
+    def test_recv_default(self):
+        default = object()
+        cid = channels.create()
+        obj1 = channels.recv(cid, default)
+        channels.send(cid, None)
+        channels.send(cid, 1)
+        channels.send(cid, b'spam')
+        channels.send(cid, b'eggs')
+        obj2 = channels.recv(cid, default)
+        obj3 = channels.recv(cid, default)
+        obj4 = channels.recv(cid)
+        obj5 = channels.recv(cid, default)
+        obj6 = channels.recv(cid, default)
+
+        self.assertIs(obj1, default)
+        self.assertIs(obj2, None)
+        self.assertEqual(obj3, 1)
+        self.assertEqual(obj4, b'spam')
+        self.assertEqual(obj5, b'eggs')
+        self.assertIs(obj6, default)
+
+    def test_recv_sending_interp_destroyed(self):
+        cid = channels.create()
+        interp = interpreters.create()
+        interpreters.run_string(interp, dedent(f"""
+            import _xxinterpchannels as _channels
+            _channels.send({cid}, b'spam')
+            """))
+        interpreters.destroy(interp)
+
+        with self.assertRaisesRegex(RuntimeError,
+                                    'unrecognized interpreter ID'):
+            channels.recv(cid)
+
+    def test_allowed_types(self):
+        cid = channels.create()
+        objects = [
+            None,
+            'spam',
+            b'spam',
+            42,
+        ]
+        for obj in objects:
+            with self.subTest(obj):
+                channels.send(cid, obj)
+                got = channels.recv(cid)
+
+                self.assertEqual(got, obj)
+                self.assertIs(type(got), type(obj))
+                # XXX Check the following?
+                #self.assertIsNot(got, obj)
+                # XXX What about between interpreters?
+
+    def test_run_string_arg_unresolved(self):
+        cid = channels.create()
+        interp = interpreters.create()
+
+        out = _run_output(interp, dedent("""
+            import _xxinterpchannels as _channels
+            print(cid.end)
+            _channels.send(cid, b'spam')
+            """),
+            dict(cid=cid.send))
+        obj = channels.recv(cid)
+
+        self.assertEqual(obj, b'spam')
+        self.assertEqual(out.strip(), 'send')
+
+    # XXX For now there is no high-level channel into which the
+    # sent channel ID can be converted...
+    # Note: this test caused crashes on some buildbots (bpo-33615).
+    @unittest.skip('disabled until high-level channels exist')
+    def test_run_string_arg_resolved(self):
+        cid = channels.create()
+        cid = channels._channel_id(cid, _resolve=True)
+        interp = interpreters.create()
+
+        out = _run_output(interp, dedent("""
+            import _xxinterpchannels as _channels
+            print(chan.id.end)
+            _channels.send(chan.id, b'spam')
+            """),
+            dict(chan=cid.send))
+        obj = channels.recv(cid)
+
+        self.assertEqual(obj, b'spam')
+        self.assertEqual(out.strip(), 'send')
+
+    # close
+
+    def test_close_single_user(self):
+        cid = channels.create()
+        channels.send(cid, b'spam')
+        channels.recv(cid)
+        channels.close(cid)
+
+        with self.assertRaises(channels.ChannelClosedError):
+            channels.send(cid, b'eggs')
+        with self.assertRaises(channels.ChannelClosedError):
+            channels.recv(cid)
+
+    def test_close_multiple_users(self):
+        cid = channels.create()
+        id1 = interpreters.create()
+        id2 = interpreters.create()
+        interpreters.run_string(id1, dedent(f"""
+            import _xxinterpchannels as _channels
+            _channels.send({cid}, b'spam')
+            """))
+        interpreters.run_string(id2, dedent(f"""
+            import _xxinterpchannels as _channels
+            _channels.recv({cid})
+            """))
+        channels.close(cid)
+        with self.assertRaises(interpreters.RunFailedError) as cm:
+            interpreters.run_string(id1, dedent(f"""
+                _channels.send({cid}, b'spam')
+                """))
+        self.assertIn('ChannelClosedError', str(cm.exception))
+        with self.assertRaises(interpreters.RunFailedError) as cm:
+            interpreters.run_string(id2, dedent(f"""
+                _channels.send({cid}, b'spam')
+                """))
+        self.assertIn('ChannelClosedError', str(cm.exception))
+
+    def test_close_multiple_times(self):
+        cid = channels.create()
+        channels.send(cid, b'spam')
+        channels.recv(cid)
+        channels.close(cid)
+
+        with self.assertRaises(channels.ChannelClosedError):
+            channels.close(cid)
+
+    def test_close_empty(self):
+        tests = [
+            (False, False),
+            (True, False),
+            (False, True),
+            (True, True),
+            ]
+        for send, recv in tests:
+            with self.subTest((send, recv)):
+                cid = channels.create()
+                channels.send(cid, b'spam')
+                channels.recv(cid)
+                channels.close(cid, send=send, recv=recv)
+
+                with self.assertRaises(channels.ChannelClosedError):
+                    channels.send(cid, b'eggs')
+                with self.assertRaises(channels.ChannelClosedError):
+                    channels.recv(cid)
+
+    def test_close_defaults_with_unused_items(self):
+        cid = channels.create()
+        channels.send(cid, b'spam')
+        channels.send(cid, b'ham')
+
+        with self.assertRaises(channels.ChannelNotEmptyError):
+            channels.close(cid)
+        channels.recv(cid)
+        channels.send(cid, b'eggs')
+
+    def test_close_recv_with_unused_items_unforced(self):
+        cid = channels.create()
+        channels.send(cid, b'spam')
+        channels.send(cid, b'ham')
+
+        with self.assertRaises(channels.ChannelNotEmptyError):
+            channels.close(cid, recv=True)
+        channels.recv(cid)
+        channels.send(cid, b'eggs')
+        channels.recv(cid)
+        channels.recv(cid)
+        channels.close(cid, recv=True)
+
+    def test_close_send_with_unused_items_unforced(self):
+        cid = channels.create()
+        channels.send(cid, b'spam')
+        channels.send(cid, b'ham')
+        channels.close(cid, send=True)
+
+        with self.assertRaises(channels.ChannelClosedError):
+            channels.send(cid, b'eggs')
+        channels.recv(cid)
+        channels.recv(cid)
+        with self.assertRaises(channels.ChannelClosedError):
+            channels.recv(cid)
+
+    def test_close_both_with_unused_items_unforced(self):
+        cid = channels.create()
+        channels.send(cid, b'spam')
+        channels.send(cid, b'ham')
+
+        with self.assertRaises(channels.ChannelNotEmptyError):
+            channels.close(cid, recv=True, send=True)
+        channels.recv(cid)
+        channels.send(cid, b'eggs')
+        channels.recv(cid)
+        channels.recv(cid)
+        channels.close(cid, recv=True)
+
+    def test_close_recv_with_unused_items_forced(self):
+        cid = channels.create()
+        channels.send(cid, b'spam')
+        channels.send(cid, b'ham')
+        channels.close(cid, recv=True, force=True)
+
+        with self.assertRaises(channels.ChannelClosedError):
+            channels.send(cid, b'eggs')
+        with self.assertRaises(channels.ChannelClosedError):
+            channels.recv(cid)
+
+    def test_close_send_with_unused_items_forced(self):
+        cid = channels.create()
+        channels.send(cid, b'spam')
+        channels.send(cid, b'ham')
+        channels.close(cid, send=True, force=True)
+
+        with self.assertRaises(channels.ChannelClosedError):
+            channels.send(cid, b'eggs')
+        with self.assertRaises(channels.ChannelClosedError):
+            channels.recv(cid)
+
+    def test_close_both_with_unused_items_forced(self):
+        cid = channels.create()
+        channels.send(cid, b'spam')
+        channels.send(cid, b'ham')
+        channels.close(cid, send=True, recv=True, force=True)
+
+        with self.assertRaises(channels.ChannelClosedError):
+            channels.send(cid, b'eggs')
+        with self.assertRaises(channels.ChannelClosedError):
+            channels.recv(cid)
+
+    def test_close_never_used(self):
+        cid = channels.create()
+        channels.close(cid)
+
+        with self.assertRaises(channels.ChannelClosedError):
+            channels.send(cid, b'spam')
+        with self.assertRaises(channels.ChannelClosedError):
+            channels.recv(cid)
+
+    def test_close_by_unassociated_interp(self):
+        cid = channels.create()
+        channels.send(cid, b'spam')
+        interp = interpreters.create()
+        interpreters.run_string(interp, dedent(f"""
+            import _xxinterpchannels as _channels
+            _channels.close({cid}, force=True)
+            """))
+        with self.assertRaises(channels.ChannelClosedError):
+            channels.recv(cid)
+        with self.assertRaises(channels.ChannelClosedError):
+            channels.close(cid)
+
+    def test_close_used_multiple_times_by_single_user(self):
+        cid = channels.create()
+        channels.send(cid, b'spam')
+        channels.send(cid, b'spam')
+        channels.send(cid, b'spam')
+        channels.recv(cid)
+        channels.close(cid, force=True)
+
+        with self.assertRaises(channels.ChannelClosedError):
+            channels.send(cid, b'eggs')
+        with self.assertRaises(channels.ChannelClosedError):
+            channels.recv(cid)
+
+    def test_channel_list_interpreters_invalid_channel(self):
+        cid = channels.create()
+        # Test for invalid channel ID.
+        with self.assertRaises(channels.ChannelNotFoundError):
+            channels.list_interpreters(1000, send=True)
+
+        channels.close(cid)
+        # Test for a channel that has been closed.
+        with self.assertRaises(channels.ChannelClosedError):
+            channels.list_interpreters(cid, send=True)
+
+    def test_channel_list_interpreters_invalid_args(self):
+        # Tests for invalid arguments passed to the API.
+        cid = channels.create()
+        with self.assertRaises(TypeError):
+            channels.list_interpreters(cid)
+
+
+class ChannelReleaseTests(TestBase):
+
+    # XXX Add more test coverage a la the tests for close().
+
+    """
+    - main / interp / other
+    - run in: current thread / new thread / other thread / different threads
+    - end / opposite
+    - force / no force
+    - used / not used  (associated / not associated)
+    - empty / emptied / never emptied / partly emptied
+    - closed / not closed
+    - released / not released
+    - creator (interp) / other
+    - associated interpreter not running
+    - associated interpreter destroyed
+    """
+
+    """
+    use
+    pre-release
+    release
+    after
+    check
+    """
+
+    """
+    release in:         main, interp1
+    creator:            same, other (incl. interp2)
+
+    use:                None,send,recv,send/recv in None,same,other(incl. interp2),same+other(incl. interp2),all
+    pre-release:        None,send,recv,both in None,same,other(incl. interp2),same+other(incl. interp2),all
+    pre-release forced: None,send,recv,both in None,same,other(incl. interp2),same+other(incl. interp2),all
+
+    release:            same
+    release forced:     same
+
+    use after:          None,send,recv,send/recv in None,same,other(incl. interp2),same+other(incl. interp2),all
+    release after:      None,send,recv,send/recv in None,same,other(incl. interp2),same+other(incl. interp2),all
+    check released:     send/recv for same/other(incl. interp2)
+    check closed:       send/recv for same/other(incl. interp2)
+    """
+
+    def test_single_user(self):
+        cid = channels.create()
+        channels.send(cid, b'spam')
+        channels.recv(cid)
+        channels.release(cid, send=True, recv=True)
+
+        with self.assertRaises(channels.ChannelClosedError):
+            channels.send(cid, b'eggs')
+        with self.assertRaises(channels.ChannelClosedError):
+            channels.recv(cid)
+
+    def test_multiple_users(self):
+        cid = channels.create()
+        id1 = interpreters.create()
+        id2 = interpreters.create()
+        interpreters.run_string(id1, dedent(f"""
+            import _xxinterpchannels as _channels
+            _channels.send({cid}, b'spam')
+            """))
+        out = _run_output(id2, dedent(f"""
+            import _xxinterpchannels as _channels
+            obj = _channels.recv({cid})
+            _channels.release({cid})
+            print(repr(obj))
+            """))
+        interpreters.run_string(id1, dedent(f"""
+            _channels.release({cid})
+            """))
+
+        self.assertEqual(out.strip(), "b'spam'")
+
+    def test_no_kwargs(self):
+        cid = channels.create()
+        channels.send(cid, b'spam')
+        channels.recv(cid)
+        channels.release(cid)
+
+        with self.assertRaises(channels.ChannelClosedError):
+            channels.send(cid, b'eggs')
+        with self.assertRaises(channels.ChannelClosedError):
+            channels.recv(cid)
+
+    def test_multiple_times(self):
+        cid = channels.create()
+        channels.send(cid, b'spam')
+        channels.recv(cid)
+        channels.release(cid, send=True, recv=True)
+
+        with self.assertRaises(channels.ChannelClosedError):
+            channels.release(cid, send=True, recv=True)
+
+    def test_with_unused_items(self):
+        cid = channels.create()
+        channels.send(cid, b'spam')
+        channels.send(cid, b'ham')
+        channels.release(cid, send=True, recv=True)
+
+        with self.assertRaises(channels.ChannelClosedError):
+            channels.recv(cid)
+
+    def test_never_used(self):
+        cid = channels.create()
+        channels.release(cid)
+
+        with self.assertRaises(channels.ChannelClosedError):
+            channels.send(cid, b'spam')
+        with self.assertRaises(channels.ChannelClosedError):
+            channels.recv(cid)
+
+    def test_by_unassociated_interp(self):
+        cid = channels.create()
+        channels.send(cid, b'spam')
+        interp = interpreters.create()
+        interpreters.run_string(interp, dedent(f"""
+            import _xxinterpchannels as _channels
+            _channels.release({cid})
+            """))
+        obj = channels.recv(cid)
+        channels.release(cid)
+
+        with self.assertRaises(channels.ChannelClosedError):
+            channels.send(cid, b'eggs')
+        self.assertEqual(obj, b'spam')
+
+    def test_close_if_unassociated(self):
+        # XXX Something's not right with this test...
+        cid = channels.create()
+        interp = interpreters.create()
+        interpreters.run_string(interp, dedent(f"""
+            import _xxinterpchannels as _channels
+            obj = _channels.send({cid}, b'spam')
+            _channels.release({cid})
+            """))
+
+        with self.assertRaises(channels.ChannelClosedError):
+            channels.recv(cid)
+
+    def test_partially(self):
+        # XXX Is partial close too weird/confusing?
+        cid = channels.create()
+        channels.send(cid, None)
+        channels.recv(cid)
+        channels.send(cid, b'spam')
+        channels.release(cid, send=True)
+        obj = channels.recv(cid)
+
+        self.assertEqual(obj, b'spam')
+
+    def test_used_multiple_times_by_single_user(self):
+        cid = channels.create()
+        channels.send(cid, b'spam')
+        channels.send(cid, b'spam')
+        channels.send(cid, b'spam')
+        channels.recv(cid)
+        channels.release(cid, send=True, recv=True)
+
+        with self.assertRaises(channels.ChannelClosedError):
+            channels.send(cid, b'eggs')
+        with self.assertRaises(channels.ChannelClosedError):
+            channels.recv(cid)
+
+
+class ChannelCloseFixture(namedtuple('ChannelCloseFixture',
+                                     'end interp other extra creator')):
+
+    # Set this to True to avoid creating interpreters, e.g. when
+    # scanning through test permutations without running them.
+    QUICK = False
+
+    def __new__(cls, end, interp, other, extra, creator):
+        assert end in ('send', 'recv')
+        if cls.QUICK:
+            known = {}
+        else:
+            interp = Interpreter.from_raw(interp)
+            other = Interpreter.from_raw(other)
+            extra = Interpreter.from_raw(extra)
+            known = {
+                interp.name: interp,
+                other.name: other,
+                extra.name: extra,
+                }
+        if not creator:
+            creator = 'same'
+        self = super().__new__(cls, end, interp, other, extra, creator)
+        self._prepped = set()
+        self._state = ChannelState()
+        self._known = known
+        return self
+
+    @property
+    def state(self):
+        return self._state
+
+    @property
+    def cid(self):
+        try:
+            return self._cid
+        except AttributeError:
+            creator = self._get_interpreter(self.creator)
+            self._cid = self._new_channel(creator)
+            return self._cid
+
+    def get_interpreter(self, interp):
+        interp = self._get_interpreter(interp)
+        self._prep_interpreter(interp)
+        return interp
+
+    def expect_closed_error(self, end=None):
+        if end is None:
+            end = self.end
+        if end == 'recv' and self.state.closed == 'send':
+            return False
+        return bool(self.state.closed)
+
+    def prep_interpreter(self, interp):
+        self._prep_interpreter(interp)
+
+    def record_action(self, action, result):
+        self._state = result
+
+    def clean_up(self):
+        clean_up_interpreters()
+        clean_up_channels()
+
+    # internal methods
+
+    def _new_channel(self, creator):
+        if creator.name == 'main':
+            return channels.create()
+        else:
+            ch = channels.create()
+            run_interp(creator.id, f"""
+                import _xxsubinterpreters
+                cid = _xxsubchannels.create()
+                # We purposefully send back an int to avoid tying the
+                # channel to the other interpreter.
+                _xxsubchannels.send({ch}, int(cid))
+                del _xxsubinterpreters
+                """)
+            self._cid = channels.recv(ch)
+        return self._cid
+
+    def _get_interpreter(self, interp):
+        if interp in ('same', 'interp'):
+            return self.interp
+        elif interp == 'other':
+            return self.other
+        elif interp == 'extra':
+            return self.extra
+        else:
+            name = interp
+            try:
+                interp = self._known[name]
+            except KeyError:
+                interp = self._known[name] = Interpreter(name)
+            return interp
+
+    def _prep_interpreter(self, interp):
+        if interp.id in self._prepped:
+            return
+        self._prepped.add(interp.id)
+        if interp.name == 'main':
+            return
+        run_interp(interp.id, f"""
+            import _xxinterpchannels as channels
+            import test.test__xxinterpchannels as helpers
+            ChannelState = helpers.ChannelState
+            try:
+                cid
+            except NameError:
+                cid = channels._channel_id({self.cid})
+            """)
+
+
+ at unittest.skip('these tests take several hours to run')
+class ExhaustiveChannelTests(TestBase):
+
+    """
+    - main / interp / other
+    - run in: current thread / new thread / other thread / different threads
+    - end / opposite
+    - force / no force
+    - used / not used  (associated / not associated)
+    - empty / emptied / never emptied / partly emptied
+    - closed / not closed
+    - released / not released
+    - creator (interp) / other
+    - associated interpreter not running
+    - associated interpreter destroyed
+
+    - close after unbound
+    """
+
+    """
+    use
+    pre-close
+    close
+    after
+    check
+    """
+
+    """
+    close in:         main, interp1
+    creator:          same, other, extra
+
+    use:              None,send,recv,send/recv in None,same,other,same+other,all
+    pre-close:        None,send,recv in None,same,other,same+other,all
+    pre-close forced: None,send,recv in None,same,other,same+other,all
+
+    close:            same
+    close forced:     same
+
+    use after:        None,send,recv,send/recv in None,same,other,extra,same+other,all
+    close after:      None,send,recv,send/recv in None,same,other,extra,same+other,all
+    check closed:     send/recv for same/other(incl. interp2)
+    """
+
+    def iter_action_sets(self):
+        # - used / not used  (associated / not associated)
+        # - empty / emptied / never emptied / partly emptied
+        # - closed / not closed
+        # - released / not released
+
+        # never used
+        yield []
+
+        # only pre-closed (and possible used after)
+        for closeactions in self._iter_close_action_sets('same', 'other'):
+            yield closeactions
+            for postactions in self._iter_post_close_action_sets():
+                yield closeactions + postactions
+        for closeactions in self._iter_close_action_sets('other', 'extra'):
+            yield closeactions
+            for postactions in self._iter_post_close_action_sets():
+                yield closeactions + postactions
+
+        # used
+        for useactions in self._iter_use_action_sets('same', 'other'):
+            yield useactions
+            for closeactions in self._iter_close_action_sets('same', 'other'):
+                actions = useactions + closeactions
+                yield actions
+                for postactions in self._iter_post_close_action_sets():
+                    yield actions + postactions
+            for closeactions in self._iter_close_action_sets('other', 'extra'):
+                actions = useactions + closeactions
+                yield actions
+                for postactions in self._iter_post_close_action_sets():
+                    yield actions + postactions
+        for useactions in self._iter_use_action_sets('other', 'extra'):
+            yield useactions
+            for closeactions in self._iter_close_action_sets('same', 'other'):
+                actions = useactions + closeactions
+                yield actions
+                for postactions in self._iter_post_close_action_sets():
+                    yield actions + postactions
+            for closeactions in self._iter_close_action_sets('other', 'extra'):
+                actions = useactions + closeactions
+                yield actions
+                for postactions in self._iter_post_close_action_sets():
+                    yield actions + postactions
+
+    def _iter_use_action_sets(self, interp1, interp2):
+        interps = (interp1, interp2)
+
+        # only recv end used
+        yield [
+            ChannelAction('use', 'recv', interp1),
+            ]
+        yield [
+            ChannelAction('use', 'recv', interp2),
+            ]
+        yield [
+            ChannelAction('use', 'recv', interp1),
+            ChannelAction('use', 'recv', interp2),
+            ]
+
+        # never emptied
+        yield [
+            ChannelAction('use', 'send', interp1),
+            ]
+        yield [
+            ChannelAction('use', 'send', interp2),
+            ]
+        yield [
+            ChannelAction('use', 'send', interp1),
+            ChannelAction('use', 'send', interp2),
+            ]
+
+        # partially emptied
+        for interp1 in interps:
+            for interp2 in interps:
+                for interp3 in interps:
+                    yield [
+                        ChannelAction('use', 'send', interp1),
+                        ChannelAction('use', 'send', interp2),
+                        ChannelAction('use', 'recv', interp3),
+                        ]
+
+        # fully emptied
+        for interp1 in interps:
+            for interp2 in interps:
+                for interp3 in interps:
+                    for interp4 in interps:
+                        yield [
+                            ChannelAction('use', 'send', interp1),
+                            ChannelAction('use', 'send', interp2),
+                            ChannelAction('use', 'recv', interp3),
+                            ChannelAction('use', 'recv', interp4),
+                            ]
+
+    def _iter_close_action_sets(self, interp1, interp2):
+        ends = ('recv', 'send')
+        interps = (interp1, interp2)
+        for force in (True, False):
+            op = 'force-close' if force else 'close'
+            for interp in interps:
+                for end in ends:
+                    yield [
+                        ChannelAction(op, end, interp),
+                        ]
+        for recvop in ('close', 'force-close'):
+            for sendop in ('close', 'force-close'):
+                for recv in interps:
+                    for send in interps:
+                        yield [
+                            ChannelAction(recvop, 'recv', recv),
+                            ChannelAction(sendop, 'send', send),
+                            ]
+
+    def _iter_post_close_action_sets(self):
+        for interp in ('same', 'extra', 'other'):
+            yield [
+                ChannelAction('use', 'recv', interp),
+                ]
+            yield [
+                ChannelAction('use', 'send', interp),
+                ]
+
+    def run_actions(self, fix, actions):
+        for action in actions:
+            self.run_action(fix, action)
+
+    def run_action(self, fix, action, *, hideclosed=True):
+        end = action.resolve_end(fix.end)
+        interp = action.resolve_interp(fix.interp, fix.other, fix.extra)
+        fix.prep_interpreter(interp)
+        if interp.name == 'main':
+            result = run_action(
+                fix.cid,
+                action.action,
+                end,
+                fix.state,
+                hideclosed=hideclosed,
+                )
+            fix.record_action(action, result)
+        else:
+            _cid = channels.create()
+            run_interp(interp.id, f"""
+                result = helpers.run_action(
+                    {fix.cid},
+                    {repr(action.action)},
+                    {repr(end)},
+                    {repr(fix.state)},
+                    hideclosed={hideclosed},
+                    )
+                channels.send({_cid}, result.pending.to_bytes(1, 'little'))
+                channels.send({_cid}, b'X' if result.closed else b'')
+                """)
+            result = ChannelState(
+                pending=int.from_bytes(channels.recv(_cid), 'little'),
+                closed=bool(channels.recv(_cid)),
+                )
+            fix.record_action(action, result)
+
+    def iter_fixtures(self):
+        # XXX threads?
+        interpreters = [
+            ('main', 'interp', 'extra'),
+            ('interp', 'main', 'extra'),
+            ('interp1', 'interp2', 'extra'),
+            ('interp1', 'interp2', 'main'),
+        ]
+        for interp, other, extra in interpreters:
+            for creator in ('same', 'other', 'creator'):
+                for end in ('send', 'recv'):
+                    yield ChannelCloseFixture(end, interp, other, extra, creator)
+
+    def _close(self, fix, *, force):
+        op = 'force-close' if force else 'close'
+        close = ChannelAction(op, fix.end, 'same')
+        if not fix.expect_closed_error():
+            self.run_action(fix, close, hideclosed=False)
+        else:
+            with self.assertRaises(channels.ChannelClosedError):
+                self.run_action(fix, close, hideclosed=False)
+
+    def _assert_closed_in_interp(self, fix, interp=None):
+        if interp is None or interp.name == 'main':
+            with self.assertRaises(channels.ChannelClosedError):
+                channels.recv(fix.cid)
+            with self.assertRaises(channels.ChannelClosedError):
+                channels.send(fix.cid, b'spam')
+            with self.assertRaises(channels.ChannelClosedError):
+                channels.close(fix.cid)
+            with self.assertRaises(channels.ChannelClosedError):
+                channels.close(fix.cid, force=True)
+        else:
+            run_interp(interp.id, f"""
+                with helpers.expect_channel_closed():
+                    channels.recv(cid)
+                """)
+            run_interp(interp.id, f"""
+                with helpers.expect_channel_closed():
+                    channels.send(cid, b'spam')
+                """)
+            run_interp(interp.id, f"""
+                with helpers.expect_channel_closed():
+                    channels.close(cid)
+                """)
+            run_interp(interp.id, f"""
+                with helpers.expect_channel_closed():
+                    channels.close(cid, force=True)
+                """)
+
+    def _assert_closed(self, fix):
+        self.assertTrue(fix.state.closed)
+
+        for _ in range(fix.state.pending):
+            channels.recv(fix.cid)
+        self._assert_closed_in_interp(fix)
+
+        for interp in ('same', 'other'):
+            interp = fix.get_interpreter(interp)
+            if interp.name == 'main':
+                continue
+            self._assert_closed_in_interp(fix, interp)
+
+        interp = fix.get_interpreter('fresh')
+        self._assert_closed_in_interp(fix, interp)
+
+    def _iter_close_tests(self, verbose=False):
+        i = 0
+        for actions in self.iter_action_sets():
+            print()
+            for fix in self.iter_fixtures():
+                i += 1
+                if i > 1000:
+                    return
+                if verbose:
+                    if (i - 1) % 6 == 0:
+                        print()
+                    print(i, fix, '({} actions)'.format(len(actions)))
+                else:
+                    if (i - 1) % 6 == 0:
+                        print(' ', end='')
+                    print('.', end=''); sys.stdout.flush()
+                yield i, fix, actions
+            if verbose:
+                print('---')
+        print()
+
+    # This is useful for scanning through the possible tests.
+    def _skim_close_tests(self):
+        ChannelCloseFixture.QUICK = True
+        for i, fix, actions in self._iter_close_tests():
+            pass
+
+    def test_close(self):
+        for i, fix, actions in self._iter_close_tests():
+            with self.subTest('{} {}  {}'.format(i, fix, actions)):
+                fix.prep_interpreter(fix.interp)
+                self.run_actions(fix, actions)
+
+                self._close(fix, force=False)
+
+                self._assert_closed(fix)
+            # XXX Things slow down if we have too many interpreters.
+            fix.clean_up()
+
+    def test_force_close(self):
+        for i, fix, actions in self._iter_close_tests():
+            with self.subTest('{} {}  {}'.format(i, fix, actions)):
+                fix.prep_interpreter(fix.interp)
+                self.run_actions(fix, actions)
+
+                self._close(fix, force=True)
+
+                self._assert_closed(fix)
+            # XXX Things slow down if we have too many interpreters.
+            fix.clean_up()
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/Lib/test/test__xxsubinterpreters.py b/Lib/test/test__xxsubinterpreters.py
index 18900bb9f716..687fcf3b7705 100644
--- a/Lib/test/test__xxsubinterpreters.py
+++ b/Lib/test/test__xxsubinterpreters.py
@@ -9,6 +9,7 @@
 import time
 import unittest
 
+import _testcapi
 from test import support
 from test.support import import_helper
 from test.support import script_helper
@@ -73,207 +74,6 @@ def run():
     t.join()
 
 
-#@contextmanager
-#def run_threaded(id, source, **shared):
-#    def run():
-#        run_interp(id, source, **shared)
-#    t = threading.Thread(target=run)
-#    t.start()
-#    yield
-#    t.join()
-
-
-def run_interp(id, source, **shared):
-    _run_interp(id, source, shared)
-
-
-def _run_interp(id, source, shared, _mainns={}):
-    source = dedent(source)
-    main = interpreters.get_main()
-    if main == id:
-        if interpreters.get_current() != main:
-            raise RuntimeError
-        # XXX Run a func?
-        exec(source, _mainns)
-    else:
-        interpreters.run_string(id, source, shared)
-
-
-class Interpreter(namedtuple('Interpreter', 'name id')):
-
-    @classmethod
-    def from_raw(cls, raw):
-        if isinstance(raw, cls):
-            return raw
-        elif isinstance(raw, str):
-            return cls(raw)
-        else:
-            raise NotImplementedError
-
-    def __new__(cls, name=None, id=None):
-        main = interpreters.get_main()
-        if id == main:
-            if not name:
-                name = 'main'
-            elif name != 'main':
-                raise ValueError(
-                    'name mismatch (expected "main", got "{}")'.format(name))
-            id = main
-        elif id is not None:
-            if not name:
-                name = 'interp'
-            elif name == 'main':
-                raise ValueError('name mismatch (unexpected "main")')
-            if not isinstance(id, interpreters.InterpreterID):
-                id = interpreters.InterpreterID(id)
-        elif not name or name == 'main':
-            name = 'main'
-            id = main
-        else:
-            id = interpreters.create()
-        self = super().__new__(cls, name, id)
-        return self
-
-
-# XXX expect_channel_closed() is unnecessary once we improve exc propagation.
-
- at contextlib.contextmanager
-def expect_channel_closed():
-    try:
-        yield
-    except interpreters.ChannelClosedError:
-        pass
-    else:
-        assert False, 'channel not closed'
-
-
-class ChannelAction(namedtuple('ChannelAction', 'action end interp')):
-
-    def __new__(cls, action, end=None, interp=None):
-        if not end:
-            end = 'both'
-        if not interp:
-            interp = 'main'
-        self = super().__new__(cls, action, end, interp)
-        return self
-
-    def __init__(self, *args, **kwargs):
-        if self.action == 'use':
-            if self.end not in ('same', 'opposite', 'send', 'recv'):
-                raise ValueError(self.end)
-        elif self.action in ('close', 'force-close'):
-            if self.end not in ('both', 'same', 'opposite', 'send', 'recv'):
-                raise ValueError(self.end)
-        else:
-            raise ValueError(self.action)
-        if self.interp not in ('main', 'same', 'other', 'extra'):
-            raise ValueError(self.interp)
-
-    def resolve_end(self, end):
-        if self.end == 'same':
-            return end
-        elif self.end == 'opposite':
-            return 'recv' if end == 'send' else 'send'
-        else:
-            return self.end
-
-    def resolve_interp(self, interp, other, extra):
-        if self.interp == 'same':
-            return interp
-        elif self.interp == 'other':
-            if other is None:
-                raise RuntimeError
-            return other
-        elif self.interp == 'extra':
-            if extra is None:
-                raise RuntimeError
-            return extra
-        elif self.interp == 'main':
-            if interp.name == 'main':
-                return interp
-            elif other and other.name == 'main':
-                return other
-            else:
-                raise RuntimeError
-        # Per __init__(), there aren't any others.
-
-
-class ChannelState(namedtuple('ChannelState', 'pending closed')):
-
-    def __new__(cls, pending=0, *, closed=False):
-        self = super().__new__(cls, pending, closed)
-        return self
-
-    def incr(self):
-        return type(self)(self.pending + 1, closed=self.closed)
-
-    def decr(self):
-        return type(self)(self.pending - 1, closed=self.closed)
-
-    def close(self, *, force=True):
-        if self.closed:
-            if not force or self.pending == 0:
-                return self
-        return type(self)(0 if force else self.pending, closed=True)
-
-
-def run_action(cid, action, end, state, *, hideclosed=True):
-    if state.closed:
-        if action == 'use' and end == 'recv' and state.pending:
-            expectfail = False
-        else:
-            expectfail = True
-    else:
-        expectfail = False
-
-    try:
-        result = _run_action(cid, action, end, state)
-    except interpreters.ChannelClosedError:
-        if not hideclosed and not expectfail:
-            raise
-        result = state.close()
-    else:
-        if expectfail:
-            raise ...  # XXX
-    return result
-
-
-def _run_action(cid, action, end, state):
-    if action == 'use':
-        if end == 'send':
-            interpreters.channel_send(cid, b'spam')
-            return state.incr()
-        elif end == 'recv':
-            if not state.pending:
-                try:
-                    interpreters.channel_recv(cid)
-                except interpreters.ChannelEmptyError:
-                    return state
-                else:
-                    raise Exception('expected ChannelEmptyError')
-            else:
-                interpreters.channel_recv(cid)
-                return state.decr()
-        else:
-            raise ValueError(end)
-    elif action == 'close':
-        kwargs = {}
-        if end in ('recv', 'send'):
-            kwargs[end] = True
-        interpreters.channel_close(cid, **kwargs)
-        return state.close()
-    elif action == 'force-close':
-        kwargs = {
-            'force': True,
-            }
-        if end in ('recv', 'send'):
-            kwargs[end] = True
-        interpreters.channel_close(cid, **kwargs)
-        return state.close(force=True)
-    else:
-        raise ValueError(action)
-
-
 def clean_up_interpreters():
     for id in interpreters.list_all():
         if id == 0:  # main
@@ -284,18 +84,9 @@ def clean_up_interpreters():
             pass  # already destroyed
 
 
-def clean_up_channels():
-    for cid in interpreters.channel_list_all():
-        try:
-            interpreters.channel_destroy(cid)
-        except interpreters.ChannelNotFoundError:
-            pass  # already destroyed
-
-
 class TestBase(unittest.TestCase):
 
     def tearDown(self):
-        clean_up_channels()
         clean_up_interpreters()
 
 
@@ -354,30 +145,20 @@ class SubBytes(bytes):
 
 class ShareableTypeTests(unittest.TestCase):
 
-    def setUp(self):
-        super().setUp()
-        self.cid = interpreters.channel_create()
-
-    def tearDown(self):
-        interpreters.channel_destroy(self.cid)
-        super().tearDown()
-
     def _assert_values(self, values):
         for obj in values:
             with self.subTest(obj):
-                interpreters.channel_send(self.cid, obj)
-                got = interpreters.channel_recv(self.cid)
+                xid = _testcapi.get_crossinterp_data(obj)
+                got = _testcapi.restore_crossinterp_data(xid)
 
                 self.assertEqual(got, obj)
                 self.assertIs(type(got), type(obj))
-                # XXX Check the following in the channel tests?
-                #self.assertIsNot(got, obj)
 
     def test_singletons(self):
         for obj in [None]:
             with self.subTest(obj):
-                interpreters.channel_send(self.cid, obj)
-                got = interpreters.channel_recv(self.cid)
+                xid = _testcapi.get_crossinterp_data(obj)
+                got = _testcapi.restore_crossinterp_data(xid)
 
                 # XXX What about between interpreters?
                 self.assertIs(got, obj)
@@ -408,7 +189,7 @@ def test_non_shareable_int(self):
         for i in ints:
             with self.subTest(i):
                 with self.assertRaises(OverflowError):
-                    interpreters.channel_send(self.cid, i)
+                    _testcapi.get_crossinterp_data(i)
 
 
 class ModuleTests(TestBase):
@@ -555,7 +336,7 @@ def test_bad_id(self):
         self.assertRaises(OverflowError, interpreters.InterpreterID, 2**64)
 
     def test_does_not_exist(self):
-        id = interpreters.channel_create()
+        id = interpreters.create()
         with self.assertRaises(RuntimeError):
             interpreters.InterpreterID(int(id) + 1)  # unforced
 
@@ -864,6 +645,22 @@ def f():
 
             self.assertEqual(out, 'it worked!')
 
+    def test_shareable_types(self):
+        interp = interpreters.create()
+        objects = [
+            None,
+            'spam',
+            b'spam',
+            42,
+        ]
+        for obj in objects:
+            with self.subTest(obj):
+                interpreters.run_string(
+                    interp,
+                    f'assert(obj == {obj!r})',
+                    shared=dict(obj=obj),
+                )
+
     def test_os_exec(self):
         expected = 'spam spam spam spam spam'
         subinterp = interpreters.create()
@@ -1130,1285 +927,5 @@ def f():
         self.assertEqual(retcode, 0)
 
 
-##################################
-# channel tests
-
-class ChannelIDTests(TestBase):
-
-    def test_default_kwargs(self):
-        cid = interpreters._channel_id(10, force=True)
-
-        self.assertEqual(int(cid), 10)
-        self.assertEqual(cid.end, 'both')
-
-    def test_with_kwargs(self):
-        cid = interpreters._channel_id(10, send=True, force=True)
-        self.assertEqual(cid.end, 'send')
-
-        cid = interpreters._channel_id(10, send=True, recv=False, force=True)
-        self.assertEqual(cid.end, 'send')
-
-        cid = interpreters._channel_id(10, recv=True, force=True)
-        self.assertEqual(cid.end, 'recv')
-
-        cid = interpreters._channel_id(10, recv=True, send=False, force=True)
-        self.assertEqual(cid.end, 'recv')
-
-        cid = interpreters._channel_id(10, send=True, recv=True, force=True)
-        self.assertEqual(cid.end, 'both')
-
-    def test_coerce_id(self):
-        class Int(str):
-            def __index__(self):
-                return 10
-
-        cid = interpreters._channel_id(Int(), force=True)
-        self.assertEqual(int(cid), 10)
-
-    def test_bad_id(self):
-        self.assertRaises(TypeError, interpreters._channel_id, object())
-        self.assertRaises(TypeError, interpreters._channel_id, 10.0)
-        self.assertRaises(TypeError, interpreters._channel_id, '10')
-        self.assertRaises(TypeError, interpreters._channel_id, b'10')
-        self.assertRaises(ValueError, interpreters._channel_id, -1)
-        self.assertRaises(OverflowError, interpreters._channel_id, 2**64)
-
-    def test_bad_kwargs(self):
-        with self.assertRaises(ValueError):
-            interpreters._channel_id(10, send=False, recv=False)
-
-    def test_does_not_exist(self):
-        cid = interpreters.channel_create()
-        with self.assertRaises(interpreters.ChannelNotFoundError):
-            interpreters._channel_id(int(cid) + 1)  # unforced
-
-    def test_str(self):
-        cid = interpreters._channel_id(10, force=True)
-        self.assertEqual(str(cid), '10')
-
-    def test_repr(self):
-        cid = interpreters._channel_id(10, force=True)
-        self.assertEqual(repr(cid), 'ChannelID(10)')
-
-        cid = interpreters._channel_id(10, send=True, force=True)
-        self.assertEqual(repr(cid), 'ChannelID(10, send=True)')
-
-        cid = interpreters._channel_id(10, recv=True, force=True)
-        self.assertEqual(repr(cid), 'ChannelID(10, recv=True)')
-
-        cid = interpreters._channel_id(10, send=True, recv=True, force=True)
-        self.assertEqual(repr(cid), 'ChannelID(10)')
-
-    def test_equality(self):
-        cid1 = interpreters.channel_create()
-        cid2 = interpreters._channel_id(int(cid1))
-        cid3 = interpreters.channel_create()
-
-        self.assertTrue(cid1 == cid1)
-        self.assertTrue(cid1 == cid2)
-        self.assertTrue(cid1 == int(cid1))
-        self.assertTrue(int(cid1) == cid1)
-        self.assertTrue(cid1 == float(int(cid1)))
-        self.assertTrue(float(int(cid1)) == cid1)
-        self.assertFalse(cid1 == float(int(cid1)) + 0.1)
-        self.assertFalse(cid1 == str(int(cid1)))
-        self.assertFalse(cid1 == 2**1000)
-        self.assertFalse(cid1 == float('inf'))
-        self.assertFalse(cid1 == 'spam')
-        self.assertFalse(cid1 == cid3)
-
-        self.assertFalse(cid1 != cid1)
-        self.assertFalse(cid1 != cid2)
-        self.assertTrue(cid1 != cid3)
-
-    def test_shareable(self):
-        chan = interpreters.channel_create()
-
-        obj = interpreters.channel_create()
-        interpreters.channel_send(chan, obj)
-        got = interpreters.channel_recv(chan)
-
-        self.assertEqual(got, obj)
-        self.assertIs(type(got), type(obj))
-        # XXX Check the following in the channel tests?
-        #self.assertIsNot(got, obj)
-
-
-class ChannelTests(TestBase):
-
-    def test_create_cid(self):
-        cid = interpreters.channel_create()
-        self.assertIsInstance(cid, interpreters.ChannelID)
-
-    def test_sequential_ids(self):
-        before = interpreters.channel_list_all()
-        id1 = interpreters.channel_create()
-        id2 = interpreters.channel_create()
-        id3 = interpreters.channel_create()
-        after = interpreters.channel_list_all()
-
-        self.assertEqual(id2, int(id1) + 1)
-        self.assertEqual(id3, int(id2) + 1)
-        self.assertEqual(set(after) - set(before), {id1, id2, id3})
-
-    def test_ids_global(self):
-        id1 = interpreters.create()
-        out = _run_output(id1, dedent("""
-            import _xxsubinterpreters as _interpreters
-            cid = _interpreters.channel_create()
-            print(cid)
-            """))
-        cid1 = int(out.strip())
-
-        id2 = interpreters.create()
-        out = _run_output(id2, dedent("""
-            import _xxsubinterpreters as _interpreters
-            cid = _interpreters.channel_create()
-            print(cid)
-            """))
-        cid2 = int(out.strip())
-
-        self.assertEqual(cid2, int(cid1) + 1)
-
-    def test_channel_list_interpreters_none(self):
-        """Test listing interpreters for a channel with no associations."""
-        # Test for channel with no associated interpreters.
-        cid = interpreters.channel_create()
-        send_interps = interpreters.channel_list_interpreters(cid, send=True)
-        recv_interps = interpreters.channel_list_interpreters(cid, send=False)
-        self.assertEqual(send_interps, [])
-        self.assertEqual(recv_interps, [])
-
-    def test_channel_list_interpreters_basic(self):
-        """Test basic listing channel interpreters."""
-        interp0 = interpreters.get_main()
-        cid = interpreters.channel_create()
-        interpreters.channel_send(cid, "send")
-        # Test for a channel that has one end associated to an interpreter.
-        send_interps = interpreters.channel_list_interpreters(cid, send=True)
-        recv_interps = interpreters.channel_list_interpreters(cid, send=False)
-        self.assertEqual(send_interps, [interp0])
-        self.assertEqual(recv_interps, [])
-
-        interp1 = interpreters.create()
-        _run_output(interp1, dedent(f"""
-            import _xxsubinterpreters as _interpreters
-            obj = _interpreters.channel_recv({cid})
-            """))
-        # Test for channel that has both ends associated to an interpreter.
-        send_interps = interpreters.channel_list_interpreters(cid, send=True)
-        recv_interps = interpreters.channel_list_interpreters(cid, send=False)
-        self.assertEqual(send_interps, [interp0])
-        self.assertEqual(recv_interps, [interp1])
-
-    def test_channel_list_interpreters_multiple(self):
-        """Test listing interpreters for a channel with many associations."""
-        interp0 = interpreters.get_main()
-        interp1 = interpreters.create()
-        interp2 = interpreters.create()
-        interp3 = interpreters.create()
-        cid = interpreters.channel_create()
-
-        interpreters.channel_send(cid, "send")
-        _run_output(interp1, dedent(f"""
-            import _xxsubinterpreters as _interpreters
-            _interpreters.channel_send({cid}, "send")
-            """))
-        _run_output(interp2, dedent(f"""
-            import _xxsubinterpreters as _interpreters
-            obj = _interpreters.channel_recv({cid})
-            """))
-        _run_output(interp3, dedent(f"""
-            import _xxsubinterpreters as _interpreters
-            obj = _interpreters.channel_recv({cid})
-            """))
-        send_interps = interpreters.channel_list_interpreters(cid, send=True)
-        recv_interps = interpreters.channel_list_interpreters(cid, send=False)
-        self.assertEqual(set(send_interps), {interp0, interp1})
-        self.assertEqual(set(recv_interps), {interp2, interp3})
-
-    def test_channel_list_interpreters_destroyed(self):
-        """Test listing channel interpreters with a destroyed interpreter."""
-        interp0 = interpreters.get_main()
-        interp1 = interpreters.create()
-        cid = interpreters.channel_create()
-        interpreters.channel_send(cid, "send")
-        _run_output(interp1, dedent(f"""
-            import _xxsubinterpreters as _interpreters
-            obj = _interpreters.channel_recv({cid})
-            """))
-        # Should be one interpreter associated with each end.
-        send_interps = interpreters.channel_list_interpreters(cid, send=True)
-        recv_interps = interpreters.channel_list_interpreters(cid, send=False)
-        self.assertEqual(send_interps, [interp0])
-        self.assertEqual(recv_interps, [interp1])
-
-        interpreters.destroy(interp1)
-        # Destroyed interpreter should not be listed.
-        send_interps = interpreters.channel_list_interpreters(cid, send=True)
-        recv_interps = interpreters.channel_list_interpreters(cid, send=False)
-        self.assertEqual(send_interps, [interp0])
-        self.assertEqual(recv_interps, [])
-
-    def test_channel_list_interpreters_released(self):
-        """Test listing channel interpreters with a released channel."""
-        # Set up one channel with main interpreter on the send end and two
-        # subinterpreters on the receive end.
-        interp0 = interpreters.get_main()
-        interp1 = interpreters.create()
-        interp2 = interpreters.create()
-        cid = interpreters.channel_create()
-        interpreters.channel_send(cid, "data")
-        _run_output(interp1, dedent(f"""
-            import _xxsubinterpreters as _interpreters
-            obj = _interpreters.channel_recv({cid})
-            """))
-        interpreters.channel_send(cid, "data")
-        _run_output(interp2, dedent(f"""
-            import _xxsubinterpreters as _interpreters
-            obj = _interpreters.channel_recv({cid})
-            """))
-        # Check the setup.
-        send_interps = interpreters.channel_list_interpreters(cid, send=True)
-        recv_interps = interpreters.channel_list_interpreters(cid, send=False)
-        self.assertEqual(len(send_interps), 1)
-        self.assertEqual(len(recv_interps), 2)
-
-        # Release the main interpreter from the send end.
-        interpreters.channel_release(cid, send=True)
-        # Send end should have no associated interpreters.
-        send_interps = interpreters.channel_list_interpreters(cid, send=True)
-        recv_interps = interpreters.channel_list_interpreters(cid, send=False)
-        self.assertEqual(len(send_interps), 0)
-        self.assertEqual(len(recv_interps), 2)
-
-        # Release one of the subinterpreters from the receive end.
-        _run_output(interp2, dedent(f"""
-            import _xxsubinterpreters as _interpreters
-            _interpreters.channel_release({cid})
-            """))
-        # Receive end should have the released interpreter removed.
-        send_interps = interpreters.channel_list_interpreters(cid, send=True)
-        recv_interps = interpreters.channel_list_interpreters(cid, send=False)
-        self.assertEqual(len(send_interps), 0)
-        self.assertEqual(recv_interps, [interp1])
-
-    def test_channel_list_interpreters_closed(self):
-        """Test listing channel interpreters with a closed channel."""
-        interp0 = interpreters.get_main()
-        interp1 = interpreters.create()
-        cid = interpreters.channel_create()
-        # Put something in the channel so that it's not empty.
-        interpreters.channel_send(cid, "send")
-
-        # Check initial state.
-        send_interps = interpreters.channel_list_interpreters(cid, send=True)
-        recv_interps = interpreters.channel_list_interpreters(cid, send=False)
-        self.assertEqual(len(send_interps), 1)
-        self.assertEqual(len(recv_interps), 0)
-
-        # Force close the channel.
-        interpreters.channel_close(cid, force=True)
-        # Both ends should raise an error.
-        with self.assertRaises(interpreters.ChannelClosedError):
-            interpreters.channel_list_interpreters(cid, send=True)
-        with self.assertRaises(interpreters.ChannelClosedError):
-            interpreters.channel_list_interpreters(cid, send=False)
-
-    def test_channel_list_interpreters_closed_send_end(self):
-        """Test listing channel interpreters with a channel's send end closed."""
-        interp0 = interpreters.get_main()
-        interp1 = interpreters.create()
-        cid = interpreters.channel_create()
-        # Put something in the channel so that it's not empty.
-        interpreters.channel_send(cid, "send")
-
-        # Check initial state.
-        send_interps = interpreters.channel_list_interpreters(cid, send=True)
-        recv_interps = interpreters.channel_list_interpreters(cid, send=False)
-        self.assertEqual(len(send_interps), 1)
-        self.assertEqual(len(recv_interps), 0)
-
-        # Close the send end of the channel.
-        interpreters.channel_close(cid, send=True)
-        # Send end should raise an error.
-        with self.assertRaises(interpreters.ChannelClosedError):
-            interpreters.channel_list_interpreters(cid, send=True)
-        # Receive end should not be closed (since channel is not empty).
-        recv_interps = interpreters.channel_list_interpreters(cid, send=False)
-        self.assertEqual(len(recv_interps), 0)
-
-        # Close the receive end of the channel from a subinterpreter.
-        _run_output(interp1, dedent(f"""
-            import _xxsubinterpreters as _interpreters
-            _interpreters.channel_close({cid}, force=True)
-            """))
-        # Both ends should raise an error.
-        with self.assertRaises(interpreters.ChannelClosedError):
-            interpreters.channel_list_interpreters(cid, send=True)
-        with self.assertRaises(interpreters.ChannelClosedError):
-            interpreters.channel_list_interpreters(cid, send=False)
-
-    ####################
-
-    def test_send_recv_main(self):
-        cid = interpreters.channel_create()
-        orig = b'spam'
-        interpreters.channel_send(cid, orig)
-        obj = interpreters.channel_recv(cid)
-
-        self.assertEqual(obj, orig)
-        self.assertIsNot(obj, orig)
-
-    def test_send_recv_same_interpreter(self):
-        id1 = interpreters.create()
-        out = _run_output(id1, dedent("""
-            import _xxsubinterpreters as _interpreters
-            cid = _interpreters.channel_create()
-            orig = b'spam'
-            _interpreters.channel_send(cid, orig)
-            obj = _interpreters.channel_recv(cid)
-            assert obj is not orig
-            assert obj == orig
-            """))
-
-    def test_send_recv_different_interpreters(self):
-        cid = interpreters.channel_create()
-        id1 = interpreters.create()
-        out = _run_output(id1, dedent(f"""
-            import _xxsubinterpreters as _interpreters
-            _interpreters.channel_send({cid}, b'spam')
-            """))
-        obj = interpreters.channel_recv(cid)
-
-        self.assertEqual(obj, b'spam')
-
-    def test_send_recv_different_threads(self):
-        cid = interpreters.channel_create()
-
-        def f():
-            while True:
-                try:
-                    obj = interpreters.channel_recv(cid)
-                    break
-                except interpreters.ChannelEmptyError:
-                    time.sleep(0.1)
-            interpreters.channel_send(cid, obj)
-        t = threading.Thread(target=f)
-        t.start()
-
-        interpreters.channel_send(cid, b'spam')
-        t.join()
-        obj = interpreters.channel_recv(cid)
-
-        self.assertEqual(obj, b'spam')
-
-    def test_send_recv_different_interpreters_and_threads(self):
-        cid = interpreters.channel_create()
-        id1 = interpreters.create()
-        out = None
-
-        def f():
-            nonlocal out
-            out = _run_output(id1, dedent(f"""
-                import time
-                import _xxsubinterpreters as _interpreters
-                while True:
-                    try:
-                        obj = _interpreters.channel_recv({cid})
-                        break
-                    except _interpreters.ChannelEmptyError:
-                        time.sleep(0.1)
-                assert(obj == b'spam')
-                _interpreters.channel_send({cid}, b'eggs')
-                """))
-        t = threading.Thread(target=f)
-        t.start()
-
-        interpreters.channel_send(cid, b'spam')
-        t.join()
-        obj = interpreters.channel_recv(cid)
-
-        self.assertEqual(obj, b'eggs')
-
-    def test_send_not_found(self):
-        with self.assertRaises(interpreters.ChannelNotFoundError):
-            interpreters.channel_send(10, b'spam')
-
-    def test_recv_not_found(self):
-        with self.assertRaises(interpreters.ChannelNotFoundError):
-            interpreters.channel_recv(10)
-
-    def test_recv_empty(self):
-        cid = interpreters.channel_create()
-        with self.assertRaises(interpreters.ChannelEmptyError):
-            interpreters.channel_recv(cid)
-
-    def test_recv_default(self):
-        default = object()
-        cid = interpreters.channel_create()
-        obj1 = interpreters.channel_recv(cid, default)
-        interpreters.channel_send(cid, None)
-        interpreters.channel_send(cid, 1)
-        interpreters.channel_send(cid, b'spam')
-        interpreters.channel_send(cid, b'eggs')
-        obj2 = interpreters.channel_recv(cid, default)
-        obj3 = interpreters.channel_recv(cid, default)
-        obj4 = interpreters.channel_recv(cid)
-        obj5 = interpreters.channel_recv(cid, default)
-        obj6 = interpreters.channel_recv(cid, default)
-
-        self.assertIs(obj1, default)
-        self.assertIs(obj2, None)
-        self.assertEqual(obj3, 1)
-        self.assertEqual(obj4, b'spam')
-        self.assertEqual(obj5, b'eggs')
-        self.assertIs(obj6, default)
-
-    def test_recv_sending_interp_destroyed(self):
-        cid = interpreters.channel_create()
-        interp = interpreters.create()
-        interpreters.run_string(interp, dedent(f"""
-            import _xxsubinterpreters as _interpreters
-            _interpreters.channel_send({cid}, b'spam')
-            """))
-        interpreters.destroy(interp)
-
-        with self.assertRaisesRegex(RuntimeError,
-                                    'unrecognized interpreter ID'):
-            interpreters.channel_recv(cid)
-
-    def test_run_string_arg_unresolved(self):
-        cid = interpreters.channel_create()
-        interp = interpreters.create()
-
-        out = _run_output(interp, dedent("""
-            import _xxsubinterpreters as _interpreters
-            print(cid.end)
-            _interpreters.channel_send(cid, b'spam')
-            """),
-            dict(cid=cid.send))
-        obj = interpreters.channel_recv(cid)
-
-        self.assertEqual(obj, b'spam')
-        self.assertEqual(out.strip(), 'send')
-
-    # XXX For now there is no high-level channel into which the
-    # sent channel ID can be converted...
-    # Note: this test caused crashes on some buildbots (bpo-33615).
-    @unittest.skip('disabled until high-level channels exist')
-    def test_run_string_arg_resolved(self):
-        cid = interpreters.channel_create()
-        cid = interpreters._channel_id(cid, _resolve=True)
-        interp = interpreters.create()
-
-        out = _run_output(interp, dedent("""
-            import _xxsubinterpreters as _interpreters
-            print(chan.id.end)
-            _interpreters.channel_send(chan.id, b'spam')
-            """),
-            dict(chan=cid.send))
-        obj = interpreters.channel_recv(cid)
-
-        self.assertEqual(obj, b'spam')
-        self.assertEqual(out.strip(), 'send')
-
-    # close
-
-    def test_close_single_user(self):
-        cid = interpreters.channel_create()
-        interpreters.channel_send(cid, b'spam')
-        interpreters.channel_recv(cid)
-        interpreters.channel_close(cid)
-
-        with self.assertRaises(interpreters.ChannelClosedError):
-            interpreters.channel_send(cid, b'eggs')
-        with self.assertRaises(interpreters.ChannelClosedError):
-            interpreters.channel_recv(cid)
-
-    def test_close_multiple_users(self):
-        cid = interpreters.channel_create()
-        id1 = interpreters.create()
-        id2 = interpreters.create()
-        interpreters.run_string(id1, dedent(f"""
-            import _xxsubinterpreters as _interpreters
-            _interpreters.channel_send({cid}, b'spam')
-            """))
-        interpreters.run_string(id2, dedent(f"""
-            import _xxsubinterpreters as _interpreters
-            _interpreters.channel_recv({cid})
-            """))
-        interpreters.channel_close(cid)
-        with self.assertRaises(interpreters.RunFailedError) as cm:
-            interpreters.run_string(id1, dedent(f"""
-                _interpreters.channel_send({cid}, b'spam')
-                """))
-        self.assertIn('ChannelClosedError', str(cm.exception))
-        with self.assertRaises(interpreters.RunFailedError) as cm:
-            interpreters.run_string(id2, dedent(f"""
-                _interpreters.channel_send({cid}, b'spam')
-                """))
-        self.assertIn('ChannelClosedError', str(cm.exception))
-
-    def test_close_multiple_times(self):
-        cid = interpreters.channel_create()
-        interpreters.channel_send(cid, b'spam')
-        interpreters.channel_recv(cid)
-        interpreters.channel_close(cid)
-
-        with self.assertRaises(interpreters.ChannelClosedError):
-            interpreters.channel_close(cid)
-
-    def test_close_empty(self):
-        tests = [
-            (False, False),
-            (True, False),
-            (False, True),
-            (True, True),
-            ]
-        for send, recv in tests:
-            with self.subTest((send, recv)):
-                cid = interpreters.channel_create()
-                interpreters.channel_send(cid, b'spam')
-                interpreters.channel_recv(cid)
-                interpreters.channel_close(cid, send=send, recv=recv)
-
-                with self.assertRaises(interpreters.ChannelClosedError):
-                    interpreters.channel_send(cid, b'eggs')
-                with self.assertRaises(interpreters.ChannelClosedError):
-                    interpreters.channel_recv(cid)
-
-    def test_close_defaults_with_unused_items(self):
-        cid = interpreters.channel_create()
-        interpreters.channel_send(cid, b'spam')
-        interpreters.channel_send(cid, b'ham')
-
-        with self.assertRaises(interpreters.ChannelNotEmptyError):
-            interpreters.channel_close(cid)
-        interpreters.channel_recv(cid)
-        interpreters.channel_send(cid, b'eggs')
-
-    def test_close_recv_with_unused_items_unforced(self):
-        cid = interpreters.channel_create()
-        interpreters.channel_send(cid, b'spam')
-        interpreters.channel_send(cid, b'ham')
-
-        with self.assertRaises(interpreters.ChannelNotEmptyError):
-            interpreters.channel_close(cid, recv=True)
-        interpreters.channel_recv(cid)
-        interpreters.channel_send(cid, b'eggs')
-        interpreters.channel_recv(cid)
-        interpreters.channel_recv(cid)
-        interpreters.channel_close(cid, recv=True)
-
-    def test_close_send_with_unused_items_unforced(self):
-        cid = interpreters.channel_create()
-        interpreters.channel_send(cid, b'spam')
-        interpreters.channel_send(cid, b'ham')
-        interpreters.channel_close(cid, send=True)
-
-        with self.assertRaises(interpreters.ChannelClosedError):
-            interpreters.channel_send(cid, b'eggs')
-        interpreters.channel_recv(cid)
-        interpreters.channel_recv(cid)
-        with self.assertRaises(interpreters.ChannelClosedError):
-            interpreters.channel_recv(cid)
-
-    def test_close_both_with_unused_items_unforced(self):
-        cid = interpreters.channel_create()
-        interpreters.channel_send(cid, b'spam')
-        interpreters.channel_send(cid, b'ham')
-
-        with self.assertRaises(interpreters.ChannelNotEmptyError):
-            interpreters.channel_close(cid, recv=True, send=True)
-        interpreters.channel_recv(cid)
-        interpreters.channel_send(cid, b'eggs')
-        interpreters.channel_recv(cid)
-        interpreters.channel_recv(cid)
-        interpreters.channel_close(cid, recv=True)
-
-    def test_close_recv_with_unused_items_forced(self):
-        cid = interpreters.channel_create()
-        interpreters.channel_send(cid, b'spam')
-        interpreters.channel_send(cid, b'ham')
-        interpreters.channel_close(cid, recv=True, force=True)
-
-        with self.assertRaises(interpreters.ChannelClosedError):
-            interpreters.channel_send(cid, b'eggs')
-        with self.assertRaises(interpreters.ChannelClosedError):
-            interpreters.channel_recv(cid)
-
-    def test_close_send_with_unused_items_forced(self):
-        cid = interpreters.channel_create()
-        interpreters.channel_send(cid, b'spam')
-        interpreters.channel_send(cid, b'ham')
-        interpreters.channel_close(cid, send=True, force=True)
-
-        with self.assertRaises(interpreters.ChannelClosedError):
-            interpreters.channel_send(cid, b'eggs')
-        with self.assertRaises(interpreters.ChannelClosedError):
-            interpreters.channel_recv(cid)
-
-    def test_close_both_with_unused_items_forced(self):
-        cid = interpreters.channel_create()
-        interpreters.channel_send(cid, b'spam')
-        interpreters.channel_send(cid, b'ham')
-        interpreters.channel_close(cid, send=True, recv=True, force=True)
-
-        with self.assertRaises(interpreters.ChannelClosedError):
-            interpreters.channel_send(cid, b'eggs')
-        with self.assertRaises(interpreters.ChannelClosedError):
-            interpreters.channel_recv(cid)
-
-    def test_close_never_used(self):
-        cid = interpreters.channel_create()
-        interpreters.channel_close(cid)
-
-        with self.assertRaises(interpreters.ChannelClosedError):
-            interpreters.channel_send(cid, b'spam')
-        with self.assertRaises(interpreters.ChannelClosedError):
-            interpreters.channel_recv(cid)
-
-    def test_close_by_unassociated_interp(self):
-        cid = interpreters.channel_create()
-        interpreters.channel_send(cid, b'spam')
-        interp = interpreters.create()
-        interpreters.run_string(interp, dedent(f"""
-            import _xxsubinterpreters as _interpreters
-            _interpreters.channel_close({cid}, force=True)
-            """))
-        with self.assertRaises(interpreters.ChannelClosedError):
-            interpreters.channel_recv(cid)
-        with self.assertRaises(interpreters.ChannelClosedError):
-            interpreters.channel_close(cid)
-
-    def test_close_used_multiple_times_by_single_user(self):
-        cid = interpreters.channel_create()
-        interpreters.channel_send(cid, b'spam')
-        interpreters.channel_send(cid, b'spam')
-        interpreters.channel_send(cid, b'spam')
-        interpreters.channel_recv(cid)
-        interpreters.channel_close(cid, force=True)
-
-        with self.assertRaises(interpreters.ChannelClosedError):
-            interpreters.channel_send(cid, b'eggs')
-        with self.assertRaises(interpreters.ChannelClosedError):
-            interpreters.channel_recv(cid)
-
-    def test_channel_list_interpreters_invalid_channel(self):
-        cid = interpreters.channel_create()
-        # Test for invalid channel ID.
-        with self.assertRaises(interpreters.ChannelNotFoundError):
-            interpreters.channel_list_interpreters(1000, send=True)
-
-        interpreters.channel_close(cid)
-        # Test for a channel that has been closed.
-        with self.assertRaises(interpreters.ChannelClosedError):
-            interpreters.channel_list_interpreters(cid, send=True)
-
-    def test_channel_list_interpreters_invalid_args(self):
-        # Tests for invalid arguments passed to the API.
-        cid = interpreters.channel_create()
-        with self.assertRaises(TypeError):
-            interpreters.channel_list_interpreters(cid)
-
-
-class ChannelReleaseTests(TestBase):
-
-    # XXX Add more test coverage a la the tests for close().
-
-    """
-    - main / interp / other
-    - run in: current thread / new thread / other thread / different threads
-    - end / opposite
-    - force / no force
-    - used / not used  (associated / not associated)
-    - empty / emptied / never emptied / partly emptied
-    - closed / not closed
-    - released / not released
-    - creator (interp) / other
-    - associated interpreter not running
-    - associated interpreter destroyed
-    """
-
-    """
-    use
-    pre-release
-    release
-    after
-    check
-    """
-
-    """
-    release in:         main, interp1
-    creator:            same, other (incl. interp2)
-
-    use:                None,send,recv,send/recv in None,same,other(incl. interp2),same+other(incl. interp2),all
-    pre-release:        None,send,recv,both in None,same,other(incl. interp2),same+other(incl. interp2),all
-    pre-release forced: None,send,recv,both in None,same,other(incl. interp2),same+other(incl. interp2),all
-
-    release:            same
-    release forced:     same
-
-    use after:          None,send,recv,send/recv in None,same,other(incl. interp2),same+other(incl. interp2),all
-    release after:      None,send,recv,send/recv in None,same,other(incl. interp2),same+other(incl. interp2),all
-    check released:     send/recv for same/other(incl. interp2)
-    check closed:       send/recv for same/other(incl. interp2)
-    """
-
-    def test_single_user(self):
-        cid = interpreters.channel_create()
-        interpreters.channel_send(cid, b'spam')
-        interpreters.channel_recv(cid)
-        interpreters.channel_release(cid, send=True, recv=True)
-
-        with self.assertRaises(interpreters.ChannelClosedError):
-            interpreters.channel_send(cid, b'eggs')
-        with self.assertRaises(interpreters.ChannelClosedError):
-            interpreters.channel_recv(cid)
-
-    def test_multiple_users(self):
-        cid = interpreters.channel_create()
-        id1 = interpreters.create()
-        id2 = interpreters.create()
-        interpreters.run_string(id1, dedent(f"""
-            import _xxsubinterpreters as _interpreters
-            _interpreters.channel_send({cid}, b'spam')
-            """))
-        out = _run_output(id2, dedent(f"""
-            import _xxsubinterpreters as _interpreters
-            obj = _interpreters.channel_recv({cid})
-            _interpreters.channel_release({cid})
-            print(repr(obj))
-            """))
-        interpreters.run_string(id1, dedent(f"""
-            _interpreters.channel_release({cid})
-            """))
-
-        self.assertEqual(out.strip(), "b'spam'")
-
-    def test_no_kwargs(self):
-        cid = interpreters.channel_create()
-        interpreters.channel_send(cid, b'spam')
-        interpreters.channel_recv(cid)
-        interpreters.channel_release(cid)
-
-        with self.assertRaises(interpreters.ChannelClosedError):
-            interpreters.channel_send(cid, b'eggs')
-        with self.assertRaises(interpreters.ChannelClosedError):
-            interpreters.channel_recv(cid)
-
-    def test_multiple_times(self):
-        cid = interpreters.channel_create()
-        interpreters.channel_send(cid, b'spam')
-        interpreters.channel_recv(cid)
-        interpreters.channel_release(cid, send=True, recv=True)
-
-        with self.assertRaises(interpreters.ChannelClosedError):
-            interpreters.channel_release(cid, send=True, recv=True)
-
-    def test_with_unused_items(self):
-        cid = interpreters.channel_create()
-        interpreters.channel_send(cid, b'spam')
-        interpreters.channel_send(cid, b'ham')
-        interpreters.channel_release(cid, send=True, recv=True)
-
-        with self.assertRaises(interpreters.ChannelClosedError):
-            interpreters.channel_recv(cid)
-
-    def test_never_used(self):
-        cid = interpreters.channel_create()
-        interpreters.channel_release(cid)
-
-        with self.assertRaises(interpreters.ChannelClosedError):
-            interpreters.channel_send(cid, b'spam')
-        with self.assertRaises(interpreters.ChannelClosedError):
-            interpreters.channel_recv(cid)
-
-    def test_by_unassociated_interp(self):
-        cid = interpreters.channel_create()
-        interpreters.channel_send(cid, b'spam')
-        interp = interpreters.create()
-        interpreters.run_string(interp, dedent(f"""
-            import _xxsubinterpreters as _interpreters
-            _interpreters.channel_release({cid})
-            """))
-        obj = interpreters.channel_recv(cid)
-        interpreters.channel_release(cid)
-
-        with self.assertRaises(interpreters.ChannelClosedError):
-            interpreters.channel_send(cid, b'eggs')
-        self.assertEqual(obj, b'spam')
-
-    def test_close_if_unassociated(self):
-        # XXX Something's not right with this test...
-        cid = interpreters.channel_create()
-        interp = interpreters.create()
-        interpreters.run_string(interp, dedent(f"""
-            import _xxsubinterpreters as _interpreters
-            obj = _interpreters.channel_send({cid}, b'spam')
-            _interpreters.channel_release({cid})
-            """))
-
-        with self.assertRaises(interpreters.ChannelClosedError):
-            interpreters.channel_recv(cid)
-
-    def test_partially(self):
-        # XXX Is partial close too weird/confusing?
-        cid = interpreters.channel_create()
-        interpreters.channel_send(cid, None)
-        interpreters.channel_recv(cid)
-        interpreters.channel_send(cid, b'spam')
-        interpreters.channel_release(cid, send=True)
-        obj = interpreters.channel_recv(cid)
-
-        self.assertEqual(obj, b'spam')
-
-    def test_used_multiple_times_by_single_user(self):
-        cid = interpreters.channel_create()
-        interpreters.channel_send(cid, b'spam')
-        interpreters.channel_send(cid, b'spam')
-        interpreters.channel_send(cid, b'spam')
-        interpreters.channel_recv(cid)
-        interpreters.channel_release(cid, send=True, recv=True)
-
-        with self.assertRaises(interpreters.ChannelClosedError):
-            interpreters.channel_send(cid, b'eggs')
-        with self.assertRaises(interpreters.ChannelClosedError):
-            interpreters.channel_recv(cid)
-
-
-class ChannelCloseFixture(namedtuple('ChannelCloseFixture',
-                                     'end interp other extra creator')):
-
-    # Set this to True to avoid creating interpreters, e.g. when
-    # scanning through test permutations without running them.
-    QUICK = False
-
-    def __new__(cls, end, interp, other, extra, creator):
-        assert end in ('send', 'recv')
-        if cls.QUICK:
-            known = {}
-        else:
-            interp = Interpreter.from_raw(interp)
-            other = Interpreter.from_raw(other)
-            extra = Interpreter.from_raw(extra)
-            known = {
-                interp.name: interp,
-                other.name: other,
-                extra.name: extra,
-                }
-        if not creator:
-            creator = 'same'
-        self = super().__new__(cls, end, interp, other, extra, creator)
-        self._prepped = set()
-        self._state = ChannelState()
-        self._known = known
-        return self
-
-    @property
-    def state(self):
-        return self._state
-
-    @property
-    def cid(self):
-        try:
-            return self._cid
-        except AttributeError:
-            creator = self._get_interpreter(self.creator)
-            self._cid = self._new_channel(creator)
-            return self._cid
-
-    def get_interpreter(self, interp):
-        interp = self._get_interpreter(interp)
-        self._prep_interpreter(interp)
-        return interp
-
-    def expect_closed_error(self, end=None):
-        if end is None:
-            end = self.end
-        if end == 'recv' and self.state.closed == 'send':
-            return False
-        return bool(self.state.closed)
-
-    def prep_interpreter(self, interp):
-        self._prep_interpreter(interp)
-
-    def record_action(self, action, result):
-        self._state = result
-
-    def clean_up(self):
-        clean_up_interpreters()
-        clean_up_channels()
-
-    # internal methods
-
-    def _new_channel(self, creator):
-        if creator.name == 'main':
-            return interpreters.channel_create()
-        else:
-            ch = interpreters.channel_create()
-            run_interp(creator.id, f"""
-                import _xxsubinterpreters
-                cid = _xxsubinterpreters.channel_create()
-                # We purposefully send back an int to avoid tying the
-                # channel to the other interpreter.
-                _xxsubinterpreters.channel_send({ch}, int(cid))
-                del _xxsubinterpreters
-                """)
-            self._cid = interpreters.channel_recv(ch)
-        return self._cid
-
-    def _get_interpreter(self, interp):
-        if interp in ('same', 'interp'):
-            return self.interp
-        elif interp == 'other':
-            return self.other
-        elif interp == 'extra':
-            return self.extra
-        else:
-            name = interp
-            try:
-                interp = self._known[name]
-            except KeyError:
-                interp = self._known[name] = Interpreter(name)
-            return interp
-
-    def _prep_interpreter(self, interp):
-        if interp.id in self._prepped:
-            return
-        self._prepped.add(interp.id)
-        if interp.name == 'main':
-            return
-        run_interp(interp.id, f"""
-            import _xxsubinterpreters as interpreters
-            import test.test__xxsubinterpreters as helpers
-            ChannelState = helpers.ChannelState
-            try:
-                cid
-            except NameError:
-                cid = interpreters._channel_id({self.cid})
-            """)
-
-
- at unittest.skip('these tests take several hours to run')
-class ExhaustiveChannelTests(TestBase):
-
-    """
-    - main / interp / other
-    - run in: current thread / new thread / other thread / different threads
-    - end / opposite
-    - force / no force
-    - used / not used  (associated / not associated)
-    - empty / emptied / never emptied / partly emptied
-    - closed / not closed
-    - released / not released
-    - creator (interp) / other
-    - associated interpreter not running
-    - associated interpreter destroyed
-
-    - close after unbound
-    """
-
-    """
-    use
-    pre-close
-    close
-    after
-    check
-    """
-
-    """
-    close in:         main, interp1
-    creator:          same, other, extra
-
-    use:              None,send,recv,send/recv in None,same,other,same+other,all
-    pre-close:        None,send,recv in None,same,other,same+other,all
-    pre-close forced: None,send,recv in None,same,other,same+other,all
-
-    close:            same
-    close forced:     same
-
-    use after:        None,send,recv,send/recv in None,same,other,extra,same+other,all
-    close after:      None,send,recv,send/recv in None,same,other,extra,same+other,all
-    check closed:     send/recv for same/other(incl. interp2)
-    """
-
-    def iter_action_sets(self):
-        # - used / not used  (associated / not associated)
-        # - empty / emptied / never emptied / partly emptied
-        # - closed / not closed
-        # - released / not released
-
-        # never used
-        yield []
-
-        # only pre-closed (and possible used after)
-        for closeactions in self._iter_close_action_sets('same', 'other'):
-            yield closeactions
-            for postactions in self._iter_post_close_action_sets():
-                yield closeactions + postactions
-        for closeactions in self._iter_close_action_sets('other', 'extra'):
-            yield closeactions
-            for postactions in self._iter_post_close_action_sets():
-                yield closeactions + postactions
-
-        # used
-        for useactions in self._iter_use_action_sets('same', 'other'):
-            yield useactions
-            for closeactions in self._iter_close_action_sets('same', 'other'):
-                actions = useactions + closeactions
-                yield actions
-                for postactions in self._iter_post_close_action_sets():
-                    yield actions + postactions
-            for closeactions in self._iter_close_action_sets('other', 'extra'):
-                actions = useactions + closeactions
-                yield actions
-                for postactions in self._iter_post_close_action_sets():
-                    yield actions + postactions
-        for useactions in self._iter_use_action_sets('other', 'extra'):
-            yield useactions
-            for closeactions in self._iter_close_action_sets('same', 'other'):
-                actions = useactions + closeactions
-                yield actions
-                for postactions in self._iter_post_close_action_sets():
-                    yield actions + postactions
-            for closeactions in self._iter_close_action_sets('other', 'extra'):
-                actions = useactions + closeactions
-                yield actions
-                for postactions in self._iter_post_close_action_sets():
-                    yield actions + postactions
-
-    def _iter_use_action_sets(self, interp1, interp2):
-        interps = (interp1, interp2)
-
-        # only recv end used
-        yield [
-            ChannelAction('use', 'recv', interp1),
-            ]
-        yield [
-            ChannelAction('use', 'recv', interp2),
-            ]
-        yield [
-            ChannelAction('use', 'recv', interp1),
-            ChannelAction('use', 'recv', interp2),
-            ]
-
-        # never emptied
-        yield [
-            ChannelAction('use', 'send', interp1),
-            ]
-        yield [
-            ChannelAction('use', 'send', interp2),
-            ]
-        yield [
-            ChannelAction('use', 'send', interp1),
-            ChannelAction('use', 'send', interp2),
-            ]
-
-        # partially emptied
-        for interp1 in interps:
-            for interp2 in interps:
-                for interp3 in interps:
-                    yield [
-                        ChannelAction('use', 'send', interp1),
-                        ChannelAction('use', 'send', interp2),
-                        ChannelAction('use', 'recv', interp3),
-                        ]
-
-        # fully emptied
-        for interp1 in interps:
-            for interp2 in interps:
-                for interp3 in interps:
-                    for interp4 in interps:
-                        yield [
-                            ChannelAction('use', 'send', interp1),
-                            ChannelAction('use', 'send', interp2),
-                            ChannelAction('use', 'recv', interp3),
-                            ChannelAction('use', 'recv', interp4),
-                            ]
-
-    def _iter_close_action_sets(self, interp1, interp2):
-        ends = ('recv', 'send')
-        interps = (interp1, interp2)
-        for force in (True, False):
-            op = 'force-close' if force else 'close'
-            for interp in interps:
-                for end in ends:
-                    yield [
-                        ChannelAction(op, end, interp),
-                        ]
-        for recvop in ('close', 'force-close'):
-            for sendop in ('close', 'force-close'):
-                for recv in interps:
-                    for send in interps:
-                        yield [
-                            ChannelAction(recvop, 'recv', recv),
-                            ChannelAction(sendop, 'send', send),
-                            ]
-
-    def _iter_post_close_action_sets(self):
-        for interp in ('same', 'extra', 'other'):
-            yield [
-                ChannelAction('use', 'recv', interp),
-                ]
-            yield [
-                ChannelAction('use', 'send', interp),
-                ]
-
-    def run_actions(self, fix, actions):
-        for action in actions:
-            self.run_action(fix, action)
-
-    def run_action(self, fix, action, *, hideclosed=True):
-        end = action.resolve_end(fix.end)
-        interp = action.resolve_interp(fix.interp, fix.other, fix.extra)
-        fix.prep_interpreter(interp)
-        if interp.name == 'main':
-            result = run_action(
-                fix.cid,
-                action.action,
-                end,
-                fix.state,
-                hideclosed=hideclosed,
-                )
-            fix.record_action(action, result)
-        else:
-            _cid = interpreters.channel_create()
-            run_interp(interp.id, f"""
-                result = helpers.run_action(
-                    {fix.cid},
-                    {repr(action.action)},
-                    {repr(end)},
-                    {repr(fix.state)},
-                    hideclosed={hideclosed},
-                    )
-                interpreters.channel_send({_cid}, result.pending.to_bytes(1, 'little'))
-                interpreters.channel_send({_cid}, b'X' if result.closed else b'')
-                """)
-            result = ChannelState(
-                pending=int.from_bytes(interpreters.channel_recv(_cid), 'little'),
-                closed=bool(interpreters.channel_recv(_cid)),
-                )
-            fix.record_action(action, result)
-
-    def iter_fixtures(self):
-        # XXX threads?
-        interpreters = [
-            ('main', 'interp', 'extra'),
-            ('interp', 'main', 'extra'),
-            ('interp1', 'interp2', 'extra'),
-            ('interp1', 'interp2', 'main'),
-        ]
-        for interp, other, extra in interpreters:
-            for creator in ('same', 'other', 'creator'):
-                for end in ('send', 'recv'):
-                    yield ChannelCloseFixture(end, interp, other, extra, creator)
-
-    def _close(self, fix, *, force):
-        op = 'force-close' if force else 'close'
-        close = ChannelAction(op, fix.end, 'same')
-        if not fix.expect_closed_error():
-            self.run_action(fix, close, hideclosed=False)
-        else:
-            with self.assertRaises(interpreters.ChannelClosedError):
-                self.run_action(fix, close, hideclosed=False)
-
-    def _assert_closed_in_interp(self, fix, interp=None):
-        if interp is None or interp.name == 'main':
-            with self.assertRaises(interpreters.ChannelClosedError):
-                interpreters.channel_recv(fix.cid)
-            with self.assertRaises(interpreters.ChannelClosedError):
-                interpreters.channel_send(fix.cid, b'spam')
-            with self.assertRaises(interpreters.ChannelClosedError):
-                interpreters.channel_close(fix.cid)
-            with self.assertRaises(interpreters.ChannelClosedError):
-                interpreters.channel_close(fix.cid, force=True)
-        else:
-            run_interp(interp.id, f"""
-                with helpers.expect_channel_closed():
-                    interpreters.channel_recv(cid)
-                """)
-            run_interp(interp.id, f"""
-                with helpers.expect_channel_closed():
-                    interpreters.channel_send(cid, b'spam')
-                """)
-            run_interp(interp.id, f"""
-                with helpers.expect_channel_closed():
-                    interpreters.channel_close(cid)
-                """)
-            run_interp(interp.id, f"""
-                with helpers.expect_channel_closed():
-                    interpreters.channel_close(cid, force=True)
-                """)
-
-    def _assert_closed(self, fix):
-        self.assertTrue(fix.state.closed)
-
-        for _ in range(fix.state.pending):
-            interpreters.channel_recv(fix.cid)
-        self._assert_closed_in_interp(fix)
-
-        for interp in ('same', 'other'):
-            interp = fix.get_interpreter(interp)
-            if interp.name == 'main':
-                continue
-            self._assert_closed_in_interp(fix, interp)
-
-        interp = fix.get_interpreter('fresh')
-        self._assert_closed_in_interp(fix, interp)
-
-    def _iter_close_tests(self, verbose=False):
-        i = 0
-        for actions in self.iter_action_sets():
-            print()
-            for fix in self.iter_fixtures():
-                i += 1
-                if i > 1000:
-                    return
-                if verbose:
-                    if (i - 1) % 6 == 0:
-                        print()
-                    print(i, fix, '({} actions)'.format(len(actions)))
-                else:
-                    if (i - 1) % 6 == 0:
-                        print(' ', end='')
-                    print('.', end=''); sys.stdout.flush()
-                yield i, fix, actions
-            if verbose:
-                print('---')
-        print()
-
-    # This is useful for scanning through the possible tests.
-    def _skim_close_tests(self):
-        ChannelCloseFixture.QUICK = True
-        for i, fix, actions in self._iter_close_tests():
-            pass
-
-    def test_close(self):
-        for i, fix, actions in self._iter_close_tests():
-            with self.subTest('{} {}  {}'.format(i, fix, actions)):
-                fix.prep_interpreter(fix.interp)
-                self.run_actions(fix, actions)
-
-                self._close(fix, force=False)
-
-                self._assert_closed(fix)
-            # XXX Things slow down if we have too many interpreters.
-            fix.clean_up()
-
-    def test_force_close(self):
-        for i, fix, actions in self._iter_close_tests():
-            with self.subTest('{} {}  {}'.format(i, fix, actions)):
-                fix.prep_interpreter(fix.interp)
-                self.run_actions(fix, actions)
-
-                self._close(fix, force=True)
-
-                self._assert_closed(fix)
-            # XXX Things slow down if we have too many interpreters.
-            fix.clean_up()
-
-
 if __name__ == '__main__':
     unittest.main()
diff --git a/Lib/test/test_interpreters.py b/Lib/test/test_interpreters.py
index b969ddf33d81..d1bebe471583 100644
--- a/Lib/test/test_interpreters.py
+++ b/Lib/test/test_interpreters.py
@@ -8,6 +8,7 @@
 from test import support
 from test.support import import_helper
 _interpreters = import_helper.import_module('_xxsubinterpreters')
+_channels = import_helper.import_module('_xxinterpchannels')
 from test.support import interpreters
 
 
@@ -533,7 +534,7 @@ class TestRecvChannelAttrs(TestBase):
 
     def test_id_type(self):
         rch, _ = interpreters.create_channel()
-        self.assertIsInstance(rch.id, _interpreters.ChannelID)
+        self.assertIsInstance(rch.id, _channels.ChannelID)
 
     def test_custom_id(self):
         rch = interpreters.RecvChannel(1)
@@ -558,7 +559,7 @@ class TestSendChannelAttrs(TestBase):
 
     def test_id_type(self):
         _, sch = interpreters.create_channel()
-        self.assertIsInstance(sch.id, _interpreters.ChannelID)
+        self.assertIsInstance(sch.id, _channels.ChannelID)
 
     def test_custom_id(self):
         sch = interpreters.SendChannel(1)
diff --git a/Modules/Setup b/Modules/Setup
index e3a82975b5ff..428be0a1bf8f 100644
--- a/Modules/Setup
+++ b/Modules/Setup
@@ -280,6 +280,7 @@ PYTHONPATH=$(COREPYTHONPATH)
 # Testing
 
 #_xxsubinterpreters _xxsubinterpretersmodule.c
+#_xxinterpchannels _xxinterpchannelsmodule.c
 #_xxtestfuzz _xxtestfuzz/_xxtestfuzz.c _xxtestfuzz/fuzzer.c
 #_testbuffer _testbuffer.c
 #_testinternalcapi _testinternalcapi.c
diff --git a/Modules/Setup.stdlib.in b/Modules/Setup.stdlib.in
index d64752e8ca96..3fd591c70d49 100644
--- a/Modules/Setup.stdlib.in
+++ b/Modules/Setup.stdlib.in
@@ -43,6 +43,7 @@
 @MODULE__STRUCT_TRUE at _struct _struct.c
 @MODULE__TYPING_TRUE at _typing _typingmodule.c
 @MODULE__XXSUBINTERPRETERS_TRUE at _xxsubinterpreters _xxsubinterpretersmodule.c
+ at MODULE__XXINTERPCHANNELS_TRUE@_xxinterpchannels _xxinterpchannelsmodule.c
 @MODULE__ZONEINFO_TRUE at _zoneinfo _zoneinfo.c
 
 # needs libm
diff --git a/Modules/_testcapimodule.c b/Modules/_testcapimodule.c
index 46c025bf5340..f0d6e404f54a 100644
--- a/Modules/_testcapimodule.c
+++ b/Modules/_testcapimodule.c
@@ -1639,6 +1639,58 @@ run_in_subinterp_with_config(PyObject *self, PyObject *args, PyObject *kwargs)
     return PyLong_FromLong(r);
 }
 
+static void
+_xid_capsule_destructor(PyObject *capsule)
+{
+    _PyCrossInterpreterData *data = \
+            (_PyCrossInterpreterData *)PyCapsule_GetPointer(capsule, NULL);
+    if (data != NULL) {
+        assert(_PyCrossInterpreterData_Release(data) == 0);
+        PyMem_Free(data);
+    }
+}
+
+static PyObject *
+get_crossinterp_data(PyObject *self, PyObject *args)
+{
+    PyObject *obj = NULL;
+    if (!PyArg_ParseTuple(args, "O:get_crossinterp_data", &obj)) {
+        return NULL;
+    }
+
+    _PyCrossInterpreterData *data = PyMem_NEW(_PyCrossInterpreterData, 1);
+    if (data == NULL) {
+        PyErr_NoMemory();
+        return NULL;
+    }
+    if (_PyObject_GetCrossInterpreterData(obj, data) != 0) {
+        PyMem_Free(data);
+        return NULL;
+    }
+    PyObject *capsule = PyCapsule_New(data, NULL, _xid_capsule_destructor);
+    if (capsule == NULL) {
+        assert(_PyCrossInterpreterData_Release(data) == 0);
+        PyMem_Free(data);
+    }
+    return capsule;
+}
+
+static PyObject *
+restore_crossinterp_data(PyObject *self, PyObject *args)
+{
+    PyObject *capsule = NULL;
+    if (!PyArg_ParseTuple(args, "O:restore_crossinterp_data", &capsule)) {
+        return NULL;
+    }
+
+    _PyCrossInterpreterData *data = \
+            (_PyCrossInterpreterData *)PyCapsule_GetPointer(capsule, NULL);
+    if (data == NULL) {
+        return NULL;
+    }
+    return _PyCrossInterpreterData_NewObject(data);
+}
+
 static void
 slot_tp_del(PyObject *self)
 {
@@ -3306,6 +3358,8 @@ static PyMethodDef TestMethods[] = {
     {"run_in_subinterp_with_config",
      _PyCFunction_CAST(run_in_subinterp_with_config),
      METH_VARARGS | METH_KEYWORDS},
+    {"get_crossinterp_data",    get_crossinterp_data,            METH_VARARGS},
+    {"restore_crossinterp_data", restore_crossinterp_data,       METH_VARARGS},
     {"with_tp_del",             with_tp_del,                     METH_VARARGS},
     {"create_cfunction",        create_cfunction,                METH_NOARGS},
     {"call_in_temporary_c_thread", call_in_temporary_c_thread, METH_VARARGS,
diff --git a/Modules/_xxinterpchannelsmodule.c b/Modules/_xxinterpchannelsmodule.c
new file mode 100644
index 000000000000..8601a189e875
--- /dev/null
+++ b/Modules/_xxinterpchannelsmodule.c
@@ -0,0 +1,2325 @@
+
+/* interpreters module */
+/* low-level access to interpreter primitives */
+#ifndef Py_BUILD_CORE_BUILTIN
+#  define Py_BUILD_CORE_MODULE 1
+#endif
+
+#include "Python.h"
+#include "pycore_pystate.h"       // _PyThreadState_GET()
+#include "pycore_interpreteridobject.h"
+
+
+#define MODULE_NAME "_xxinterpchannels"
+
+
+static PyInterpreterState *
+_get_current_interp(void)
+{
+    // PyInterpreterState_Get() aborts if lookup fails, so don't need
+    // to check the result for NULL.
+    return PyInterpreterState_Get();
+}
+
+static PyObject *
+_get_current_module(void)
+{
+    PyObject *name = PyUnicode_FromString(MODULE_NAME);
+    if (name == NULL) {
+        return NULL;
+    }
+    PyObject *mod = PyImport_GetModule(name);
+    Py_DECREF(name);
+    if (mod == NULL) {
+        return NULL;
+    }
+    assert(mod != Py_None);
+    return mod;
+}
+
+static PyObject *
+get_module_from_owned_type(PyTypeObject *cls)
+{
+    assert(cls != NULL);
+    return _get_current_module();
+    // XXX Use the more efficient API now that we use heap types:
+    //return PyType_GetModule(cls);
+}
+
+static struct PyModuleDef moduledef;
+
+static PyObject *
+get_module_from_type(PyTypeObject *cls)
+{
+    assert(cls != NULL);
+    return _get_current_module();
+    // XXX Use the more efficient API now that we use heap types:
+    //return PyType_GetModuleByDef(cls, &moduledef);
+}
+
+static PyObject *
+add_new_exception(PyObject *mod, const char *name, PyObject *base)
+{
+    assert(!PyObject_HasAttrString(mod, name));
+    PyObject *exctype = PyErr_NewException(name, base, NULL);
+    if (exctype == NULL) {
+        return NULL;
+    }
+    int res = PyModule_AddType(mod, (PyTypeObject *)exctype);
+    if (res < 0) {
+        Py_DECREF(exctype);
+        return NULL;
+    }
+    return exctype;
+}
+
+#define ADD_NEW_EXCEPTION(MOD, NAME, BASE) \
+    add_new_exception(MOD, MODULE_NAME "." Py_STRINGIFY(NAME), BASE)
+
+static PyTypeObject *
+add_new_type(PyObject *mod, PyType_Spec *spec, crossinterpdatafunc shared)
+{
+    PyTypeObject *cls = (PyTypeObject *)PyType_FromMetaclass(
+                NULL, mod, spec, NULL);
+    if (cls == NULL) {
+        return NULL;
+    }
+    if (PyModule_AddType(mod, cls) < 0) {
+        Py_DECREF(cls);
+        return NULL;
+    }
+    if (shared != NULL) {
+        if (_PyCrossInterpreterData_RegisterClass(cls, shared)) {
+            Py_DECREF(cls);
+            return NULL;
+        }
+    }
+    return cls;
+}
+
+static int
+_release_xid_data(_PyCrossInterpreterData *data, int ignoreexc)
+{
+    PyObject *exctype, *excval, *exctb;
+    if (ignoreexc) {
+        PyErr_Fetch(&exctype, &excval, &exctb);
+    }
+    int res = _PyCrossInterpreterData_Release(data);
+    if (res < 0) {
+        // XXX Fix this!
+        /* The owning interpreter is already destroyed.
+         * Ideally, this shouldn't ever happen.  When an interpreter is
+         * about to be destroyed, we should clear out all of its objects
+         * from every channel associated with that interpreter.
+         * For now we hack around that to resolve refleaks, by decref'ing
+         * the released object here, even if its the wrong interpreter.
+         * The owning interpreter has already been destroyed
+         * so we should be okay, especially since the currently
+         * shareable types are all very basic, with no GC.
+         * That said, it becomes much messier once interpreters
+         * no longer share a GIL, so this needs to be fixed before then. */
+        _PyCrossInterpreterData_Clear(NULL, data);
+        if (ignoreexc) {
+            // XXX Emit a warning?
+            PyErr_Clear();
+        }
+    }
+    if (ignoreexc) {
+        PyErr_Restore(exctype, excval, exctb);
+    }
+    return res;
+}
+
+
+/* module state *************************************************************/
+
+typedef struct {
+    /* heap types */
+    PyTypeObject *ChannelIDType;
+
+    /* exceptions */
+    PyObject *ChannelError;
+    PyObject *ChannelNotFoundError;
+    PyObject *ChannelClosedError;
+    PyObject *ChannelEmptyError;
+    PyObject *ChannelNotEmptyError;
+} module_state;
+
+static inline module_state *
+get_module_state(PyObject *mod)
+{
+    assert(mod != NULL);
+    module_state *state = PyModule_GetState(mod);
+    assert(state != NULL);
+    return state;
+}
+
+static int
+traverse_module_state(module_state *state, visitproc visit, void *arg)
+{
+    /* heap types */
+    Py_VISIT(state->ChannelIDType);
+
+    /* exceptions */
+    Py_VISIT(state->ChannelError);
+    Py_VISIT(state->ChannelNotFoundError);
+    Py_VISIT(state->ChannelClosedError);
+    Py_VISIT(state->ChannelEmptyError);
+    Py_VISIT(state->ChannelNotEmptyError);
+
+    return 0;
+}
+
+static int
+clear_module_state(module_state *state)
+{
+    /* heap types */
+    (void)_PyCrossInterpreterData_UnregisterClass(state->ChannelIDType);
+    Py_CLEAR(state->ChannelIDType);
+
+    /* exceptions */
+    Py_CLEAR(state->ChannelError);
+    Py_CLEAR(state->ChannelNotFoundError);
+    Py_CLEAR(state->ChannelClosedError);
+    Py_CLEAR(state->ChannelEmptyError);
+    Py_CLEAR(state->ChannelNotEmptyError);
+
+    return 0;
+}
+
+
+/* channel-specific code ****************************************************/
+
+#define CHANNEL_SEND 1
+#define CHANNEL_BOTH 0
+#define CHANNEL_RECV -1
+
+/* channel errors */
+
+#define ERR_CHANNEL_NOT_FOUND -2
+#define ERR_CHANNEL_CLOSED -3
+#define ERR_CHANNEL_INTERP_CLOSED -4
+#define ERR_CHANNEL_EMPTY -5
+#define ERR_CHANNEL_NOT_EMPTY -6
+#define ERR_CHANNEL_MUTEX_INIT -7
+#define ERR_CHANNELS_MUTEX_INIT -8
+#define ERR_NO_NEXT_CHANNEL_ID -9
+
+static int
+exceptions_init(PyObject *mod)
+{
+    module_state *state = get_module_state(mod);
+    if (state == NULL) {
+        return -1;
+    }
+
+#define ADD(NAME, BASE) \
+    do { \
+        assert(state->NAME == NULL); \
+        state->NAME = ADD_NEW_EXCEPTION(mod, NAME, BASE); \
+        if (state->NAME == NULL) { \
+            return -1; \
+        } \
+    } while (0)
+
+    // A channel-related operation failed.
+    ADD(ChannelError, PyExc_RuntimeError);
+    // An operation tried to use a channel that doesn't exist.
+    ADD(ChannelNotFoundError, state->ChannelError);
+    // An operation tried to use a closed channel.
+    ADD(ChannelClosedError, state->ChannelError);
+    // An operation tried to pop from an empty channel.
+    ADD(ChannelEmptyError, state->ChannelError);
+    // An operation tried to close a non-empty channel.
+    ADD(ChannelNotEmptyError, state->ChannelError);
+#undef ADD
+
+    return 0;
+}
+
+static int
+handle_channel_error(int err, PyObject *mod, int64_t cid)
+{
+    if (err == 0) {
+        assert(!PyErr_Occurred());
+        return 0;
+    }
+    assert(err < 0);
+    module_state *state = get_module_state(mod);
+    assert(state != NULL);
+    if (err == ERR_CHANNEL_NOT_FOUND) {
+        PyErr_Format(state->ChannelNotFoundError,
+                     "channel %" PRId64 " not found", cid);
+    }
+    else if (err == ERR_CHANNEL_CLOSED) {
+        PyErr_Format(state->ChannelClosedError,
+                     "channel %" PRId64 " is closed", cid);
+    }
+    else if (err == ERR_CHANNEL_INTERP_CLOSED) {
+        PyErr_Format(state->ChannelClosedError,
+                     "channel %" PRId64 " is already closed", cid);
+    }
+    else if (err == ERR_CHANNEL_EMPTY) {
+        PyErr_Format(state->ChannelEmptyError,
+                     "channel %" PRId64 " is empty", cid);
+    }
+    else if (err == ERR_CHANNEL_NOT_EMPTY) {
+        PyErr_Format(state->ChannelNotEmptyError,
+                     "channel %" PRId64 " may not be closed "
+                     "if not empty (try force=True)",
+                     cid);
+    }
+    else if (err == ERR_CHANNEL_MUTEX_INIT) {
+        PyErr_SetString(state->ChannelError,
+                        "can't initialize mutex for new channel");
+    }
+    else if (err == ERR_CHANNELS_MUTEX_INIT) {
+        PyErr_SetString(state->ChannelError,
+                        "can't initialize mutex for channel management");
+    }
+    else if (err == ERR_NO_NEXT_CHANNEL_ID) {
+        PyErr_SetString(state->ChannelError,
+                        "failed to get a channel ID");
+    }
+    else {
+        assert(PyErr_Occurred());
+    }
+    return 1;
+}
+
+/* the channel queue */
+
+struct _channelitem;
+
+typedef struct _channelitem {
+    _PyCrossInterpreterData *data;
+    struct _channelitem *next;
+} _channelitem;
+
+static _channelitem *
+_channelitem_new(void)
+{
+    _channelitem *item = PyMem_NEW(_channelitem, 1);
+    if (item == NULL) {
+        PyErr_NoMemory();
+        return NULL;
+    }
+    item->data = NULL;
+    item->next = NULL;
+    return item;
+}
+
+static void
+_channelitem_clear(_channelitem *item)
+{
+    if (item->data != NULL) {
+        (void)_release_xid_data(item->data, 1);
+        PyMem_Free(item->data);
+        item->data = NULL;
+    }
+    item->next = NULL;
+}
+
+static void
+_channelitem_free(_channelitem *item)
+{
+    _channelitem_clear(item);
+    PyMem_Free(item);
+}
+
+static void
+_channelitem_free_all(_channelitem *item)
+{
+    while (item != NULL) {
+        _channelitem *last = item;
+        item = item->next;
+        _channelitem_free(last);
+    }
+}
+
+static _PyCrossInterpreterData *
+_channelitem_popped(_channelitem *item)
+{
+    _PyCrossInterpreterData *data = item->data;
+    item->data = NULL;
+    _channelitem_free(item);
+    return data;
+}
+
+typedef struct _channelqueue {
+    int64_t count;
+    _channelitem *first;
+    _channelitem *last;
+} _channelqueue;
+
+static _channelqueue *
+_channelqueue_new(void)
+{
+    _channelqueue *queue = PyMem_NEW(_channelqueue, 1);
+    if (queue == NULL) {
+        PyErr_NoMemory();
+        return NULL;
+    }
+    queue->count = 0;
+    queue->first = NULL;
+    queue->last = NULL;
+    return queue;
+}
+
+static void
+_channelqueue_clear(_channelqueue *queue)
+{
+    _channelitem_free_all(queue->first);
+    queue->count = 0;
+    queue->first = NULL;
+    queue->last = NULL;
+}
+
+static void
+_channelqueue_free(_channelqueue *queue)
+{
+    _channelqueue_clear(queue);
+    PyMem_Free(queue);
+}
+
+static int
+_channelqueue_put(_channelqueue *queue, _PyCrossInterpreterData *data)
+{
+    _channelitem *item = _channelitem_new();
+    if (item == NULL) {
+        return -1;
+    }
+    item->data = data;
+
+    queue->count += 1;
+    if (queue->first == NULL) {
+        queue->first = item;
+    }
+    else {
+        queue->last->next = item;
+    }
+    queue->last = item;
+    return 0;
+}
+
+static _PyCrossInterpreterData *
+_channelqueue_get(_channelqueue *queue)
+{
+    _channelitem *item = queue->first;
+    if (item == NULL) {
+        return NULL;
+    }
+    queue->first = item->next;
+    if (queue->last == item) {
+        queue->last = NULL;
+    }
+    queue->count -= 1;
+
+    return _channelitem_popped(item);
+}
+
+/* channel-interpreter associations */
+
+struct _channelend;
+
+typedef struct _channelend {
+    struct _channelend *next;
+    int64_t interp;
+    int open;
+} _channelend;
+
+static _channelend *
+_channelend_new(int64_t interp)
+{
+    _channelend *end = PyMem_NEW(_channelend, 1);
+    if (end == NULL) {
+        PyErr_NoMemory();
+        return NULL;
+    }
+    end->next = NULL;
+    end->interp = interp;
+    end->open = 1;
+    return end;
+}
+
+static void
+_channelend_free(_channelend *end)
+{
+    PyMem_Free(end);
+}
+
+static void
+_channelend_free_all(_channelend *end)
+{
+    while (end != NULL) {
+        _channelend *last = end;
+        end = end->next;
+        _channelend_free(last);
+    }
+}
+
+static _channelend *
+_channelend_find(_channelend *first, int64_t interp, _channelend **pprev)
+{
+    _channelend *prev = NULL;
+    _channelend *end = first;
+    while (end != NULL) {
+        if (end->interp == interp) {
+            break;
+        }
+        prev = end;
+        end = end->next;
+    }
+    if (pprev != NULL) {
+        *pprev = prev;
+    }
+    return end;
+}
+
+typedef struct _channelassociations {
+    // Note that the list entries are never removed for interpreter
+    // for which the channel is closed.  This should not be a problem in
+    // practice.  Also, a channel isn't automatically closed when an
+    // interpreter is destroyed.
+    int64_t numsendopen;
+    int64_t numrecvopen;
+    _channelend *send;
+    _channelend *recv;
+} _channelends;
+
+static _channelends *
+_channelends_new(void)
+{
+    _channelends *ends = PyMem_NEW(_channelends, 1);
+    if (ends== NULL) {
+        return NULL;
+    }
+    ends->numsendopen = 0;
+    ends->numrecvopen = 0;
+    ends->send = NULL;
+    ends->recv = NULL;
+    return ends;
+}
+
+static void
+_channelends_clear(_channelends *ends)
+{
+    _channelend_free_all(ends->send);
+    ends->send = NULL;
+    ends->numsendopen = 0;
+
+    _channelend_free_all(ends->recv);
+    ends->recv = NULL;
+    ends->numrecvopen = 0;
+}
+
+static void
+_channelends_free(_channelends *ends)
+{
+    _channelends_clear(ends);
+    PyMem_Free(ends);
+}
+
+static _channelend *
+_channelends_add(_channelends *ends, _channelend *prev, int64_t interp,
+                 int send)
+{
+    _channelend *end = _channelend_new(interp);
+    if (end == NULL) {
+        return NULL;
+    }
+
+    if (prev == NULL) {
+        if (send) {
+            ends->send = end;
+        }
+        else {
+            ends->recv = end;
+        }
+    }
+    else {
+        prev->next = end;
+    }
+    if (send) {
+        ends->numsendopen += 1;
+    }
+    else {
+        ends->numrecvopen += 1;
+    }
+    return end;
+}
+
+static int
+_channelends_associate(_channelends *ends, int64_t interp, int send)
+{
+    _channelend *prev;
+    _channelend *end = _channelend_find(send ? ends->send : ends->recv,
+                                        interp, &prev);
+    if (end != NULL) {
+        if (!end->open) {
+            return ERR_CHANNEL_CLOSED;
+        }
+        // already associated
+        return 0;
+    }
+    if (_channelends_add(ends, prev, interp, send) == NULL) {
+        return -1;
+    }
+    return 0;
+}
+
+static int
+_channelends_is_open(_channelends *ends)
+{
+    if (ends->numsendopen != 0 || ends->numrecvopen != 0) {
+        return 1;
+    }
+    if (ends->send == NULL && ends->recv == NULL) {
+        return 1;
+    }
+    return 0;
+}
+
+static void
+_channelends_close_end(_channelends *ends, _channelend *end, int send)
+{
+    end->open = 0;
+    if (send) {
+        ends->numsendopen -= 1;
+    }
+    else {
+        ends->numrecvopen -= 1;
+    }
+}
+
+static int
+_channelends_close_interpreter(_channelends *ends, int64_t interp, int which)
+{
+    _channelend *prev;
+    _channelend *end;
+    if (which >= 0) {  // send/both
+        end = _channelend_find(ends->send, interp, &prev);
+        if (end == NULL) {
+            // never associated so add it
+            end = _channelends_add(ends, prev, interp, 1);
+            if (end == NULL) {
+                return -1;
+            }
+        }
+        _channelends_close_end(ends, end, 1);
+    }
+    if (which <= 0) {  // recv/both
+        end = _channelend_find(ends->recv, interp, &prev);
+        if (end == NULL) {
+            // never associated so add it
+            end = _channelends_add(ends, prev, interp, 0);
+            if (end == NULL) {
+                return -1;
+            }
+        }
+        _channelends_close_end(ends, end, 0);
+    }
+    return 0;
+}
+
+static void
+_channelends_close_all(_channelends *ends, int which, int force)
+{
+    // XXX Handle the ends.
+    // XXX Handle force is True.
+
+    // Ensure all the "send"-associated interpreters are closed.
+    _channelend *end;
+    for (end = ends->send; end != NULL; end = end->next) {
+        _channelends_close_end(ends, end, 1);
+    }
+
+    // Ensure all the "recv"-associated interpreters are closed.
+    for (end = ends->recv; end != NULL; end = end->next) {
+        _channelends_close_end(ends, end, 0);
+    }
+}
+
+/* channels */
+
+struct _channel;
+struct _channel_closing;
+static void _channel_clear_closing(struct _channel *);
+static void _channel_finish_closing(struct _channel *);
+
+typedef struct _channel {
+    PyThread_type_lock mutex;
+    _channelqueue *queue;
+    _channelends *ends;
+    int open;
+    struct _channel_closing *closing;
+} _PyChannelState;
+
+static _PyChannelState *
+_channel_new(PyThread_type_lock mutex)
+{
+    _PyChannelState *chan = PyMem_NEW(_PyChannelState, 1);
+    if (chan == NULL) {
+        return NULL;
+    }
+    chan->mutex = mutex;
+    chan->queue = _channelqueue_new();
+    if (chan->queue == NULL) {
+        PyMem_Free(chan);
+        return NULL;
+    }
+    chan->ends = _channelends_new();
+    if (chan->ends == NULL) {
+        _channelqueue_free(chan->queue);
+        PyMem_Free(chan);
+        return NULL;
+    }
+    chan->open = 1;
+    chan->closing = NULL;
+    return chan;
+}
+
+static void
+_channel_free(_PyChannelState *chan)
+{
+    _channel_clear_closing(chan);
+    PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
+    _channelqueue_free(chan->queue);
+    _channelends_free(chan->ends);
+    PyThread_release_lock(chan->mutex);
+
+    PyThread_free_lock(chan->mutex);
+    PyMem_Free(chan);
+}
+
+static int
+_channel_add(_PyChannelState *chan, int64_t interp,
+             _PyCrossInterpreterData *data)
+{
+    int res = -1;
+    PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
+
+    if (!chan->open) {
+        res = ERR_CHANNEL_CLOSED;
+        goto done;
+    }
+    if (_channelends_associate(chan->ends, interp, 1) != 0) {
+        res = ERR_CHANNEL_INTERP_CLOSED;
+        goto done;
+    }
+
+    if (_channelqueue_put(chan->queue, data) != 0) {
+        goto done;
+    }
+
+    res = 0;
+done:
+    PyThread_release_lock(chan->mutex);
+    return res;
+}
+
+static int
+_channel_next(_PyChannelState *chan, int64_t interp,
+              _PyCrossInterpreterData **res)
+{
+    int err = 0;
+    PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
+
+    if (!chan->open) {
+        err = ERR_CHANNEL_CLOSED;
+        goto done;
+    }
+    if (_channelends_associate(chan->ends, interp, 0) != 0) {
+        err = ERR_CHANNEL_INTERP_CLOSED;
+        goto done;
+    }
+
+    _PyCrossInterpreterData *data = _channelqueue_get(chan->queue);
+    if (data == NULL && !PyErr_Occurred() && chan->closing != NULL) {
+        chan->open = 0;
+    }
+    *res = data;
+
+done:
+    PyThread_release_lock(chan->mutex);
+    if (chan->queue->count == 0) {
+        _channel_finish_closing(chan);
+    }
+    return err;
+}
+
+static int
+_channel_close_interpreter(_PyChannelState *chan, int64_t interp, int end)
+{
+    PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
+
+    int res = -1;
+    if (!chan->open) {
+        res = ERR_CHANNEL_CLOSED;
+        goto done;
+    }
+
+    if (_channelends_close_interpreter(chan->ends, interp, end) != 0) {
+        goto done;
+    }
+    chan->open = _channelends_is_open(chan->ends);
+
+    res = 0;
+done:
+    PyThread_release_lock(chan->mutex);
+    return res;
+}
+
+static int
+_channel_close_all(_PyChannelState *chan, int end, int force)
+{
+    int res = -1;
+    PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
+
+    if (!chan->open) {
+        res = ERR_CHANNEL_CLOSED;
+        goto done;
+    }
+
+    if (!force && chan->queue->count > 0) {
+        res = ERR_CHANNEL_NOT_EMPTY;
+        goto done;
+    }
+
+    chan->open = 0;
+
+    // We *could* also just leave these in place, since we've marked
+    // the channel as closed already.
+    _channelends_close_all(chan->ends, end, force);
+
+    res = 0;
+done:
+    PyThread_release_lock(chan->mutex);
+    return res;
+}
+
+/* the set of channels */
+
+struct _channelref;
+
+typedef struct _channelref {
+    int64_t id;
+    _PyChannelState *chan;
+    struct _channelref *next;
+    Py_ssize_t objcount;
+} _channelref;
+
+static _channelref *
+_channelref_new(int64_t id, _PyChannelState *chan)
+{
+    _channelref *ref = PyMem_NEW(_channelref, 1);
+    if (ref == NULL) {
+        return NULL;
+    }
+    ref->id = id;
+    ref->chan = chan;
+    ref->next = NULL;
+    ref->objcount = 0;
+    return ref;
+}
+
+//static void
+//_channelref_clear(_channelref *ref)
+//{
+//    ref->id = -1;
+//    ref->chan = NULL;
+//    ref->next = NULL;
+//    ref->objcount = 0;
+//}
+
+static void
+_channelref_free(_channelref *ref)
+{
+    if (ref->chan != NULL) {
+        _channel_clear_closing(ref->chan);
+    }
+    //_channelref_clear(ref);
+    PyMem_Free(ref);
+}
+
+static _channelref *
+_channelref_find(_channelref *first, int64_t id, _channelref **pprev)
+{
+    _channelref *prev = NULL;
+    _channelref *ref = first;
+    while (ref != NULL) {
+        if (ref->id == id) {
+            break;
+        }
+        prev = ref;
+        ref = ref->next;
+    }
+    if (pprev != NULL) {
+        *pprev = prev;
+    }
+    return ref;
+}
+
+typedef struct _channels {
+    PyThread_type_lock mutex;
+    _channelref *head;
+    int64_t numopen;
+    int64_t next_id;
+} _channels;
+
+static void
+_channels_init(_channels *channels, PyThread_type_lock mutex)
+{
+    channels->mutex = mutex;
+    channels->head = NULL;
+    channels->numopen = 0;
+    channels->next_id = 0;
+}
+
+static void
+_channels_fini(_channels *channels)
+{
+    assert(channels->numopen == 0);
+    assert(channels->head == NULL);
+    if (channels->mutex != NULL) {
+        PyThread_free_lock(channels->mutex);
+        channels->mutex = NULL;
+    }
+}
+
+static int64_t
+_channels_next_id(_channels *channels)  // needs lock
+{
+    int64_t id = channels->next_id;
+    if (id < 0) {
+        /* overflow */
+        return -1;
+    }
+    channels->next_id += 1;
+    return id;
+}
+
+static int
+_channels_lookup(_channels *channels, int64_t id, PyThread_type_lock *pmutex,
+                 _PyChannelState **res)
+{
+    int err = -1;
+    _PyChannelState *chan = NULL;
+    PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
+    if (pmutex != NULL) {
+        *pmutex = NULL;
+    }
+
+    _channelref *ref = _channelref_find(channels->head, id, NULL);
+    if (ref == NULL) {
+        err = ERR_CHANNEL_NOT_FOUND;
+        goto done;
+    }
+    if (ref->chan == NULL || !ref->chan->open) {
+        err = ERR_CHANNEL_CLOSED;
+        goto done;
+    }
+
+    if (pmutex != NULL) {
+        // The mutex will be closed by the caller.
+        *pmutex = channels->mutex;
+    }
+
+    chan = ref->chan;
+    err = 0;
+
+done:
+    if (pmutex == NULL || *pmutex == NULL) {
+        PyThread_release_lock(channels->mutex);
+    }
+    *res = chan;
+    return err;
+}
+
+static int64_t
+_channels_add(_channels *channels, _PyChannelState *chan)
+{
+    int64_t cid = -1;
+    PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
+
+    // Create a new ref.
+    int64_t id = _channels_next_id(channels);
+    if (id < 0) {
+        cid = ERR_NO_NEXT_CHANNEL_ID;
+        goto done;
+    }
+    _channelref *ref = _channelref_new(id, chan);
+    if (ref == NULL) {
+        goto done;
+    }
+
+    // Add it to the list.
+    // We assume that the channel is a new one (not already in the list).
+    ref->next = channels->head;
+    channels->head = ref;
+    channels->numopen += 1;
+
+    cid = id;
+done:
+    PyThread_release_lock(channels->mutex);
+    return cid;
+}
+
+/* forward */
+static int _channel_set_closing(struct _channelref *, PyThread_type_lock);
+
+static int
+_channels_close(_channels *channels, int64_t cid, _PyChannelState **pchan,
+                int end, int force)
+{
+    int res = -1;
+    PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
+    if (pchan != NULL) {
+        *pchan = NULL;
+    }
+
+    _channelref *ref = _channelref_find(channels->head, cid, NULL);
+    if (ref == NULL) {
+        res = ERR_CHANNEL_NOT_FOUND;
+        goto done;
+    }
+
+    if (ref->chan == NULL) {
+        res = ERR_CHANNEL_CLOSED;
+        goto done;
+    }
+    else if (!force && end == CHANNEL_SEND && ref->chan->closing != NULL) {
+        res = ERR_CHANNEL_CLOSED;
+        goto done;
+    }
+    else {
+        int err = _channel_close_all(ref->chan, end, force);
+        if (err != 0) {
+            if (end == CHANNEL_SEND && err == ERR_CHANNEL_NOT_EMPTY) {
+                if (ref->chan->closing != NULL) {
+                    res = ERR_CHANNEL_CLOSED;
+                    goto done;
+                }
+                // Mark the channel as closing and return.  The channel
+                // will be cleaned up in _channel_next().
+                PyErr_Clear();
+                int err = _channel_set_closing(ref, channels->mutex);
+                if (err != 0) {
+                    res = err;
+                    goto done;
+                }
+                if (pchan != NULL) {
+                    *pchan = ref->chan;
+                }
+                res = 0;
+            }
+            else {
+                res = err;
+            }
+            goto done;
+        }
+        if (pchan != NULL) {
+            *pchan = ref->chan;
+        }
+        else  {
+            _channel_free(ref->chan);
+        }
+        ref->chan = NULL;
+    }
+
+    res = 0;
+done:
+    PyThread_release_lock(channels->mutex);
+    return res;
+}
+
+static void
+_channels_remove_ref(_channels *channels, _channelref *ref, _channelref *prev,
+                     _PyChannelState **pchan)
+{
+    if (ref == channels->head) {
+        channels->head = ref->next;
+    }
+    else {
+        prev->next = ref->next;
+    }
+    channels->numopen -= 1;
+
+    if (pchan != NULL) {
+        *pchan = ref->chan;
+    }
+    _channelref_free(ref);
+}
+
+static int
+_channels_remove(_channels *channels, int64_t id, _PyChannelState **pchan)
+{
+    int res = -1;
+    PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
+
+    if (pchan != NULL) {
+        *pchan = NULL;
+    }
+
+    _channelref *prev = NULL;
+    _channelref *ref = _channelref_find(channels->head, id, &prev);
+    if (ref == NULL) {
+        res = ERR_CHANNEL_NOT_FOUND;
+        goto done;
+    }
+
+    _channels_remove_ref(channels, ref, prev, pchan);
+
+    res = 0;
+done:
+    PyThread_release_lock(channels->mutex);
+    return res;
+}
+
+static int
+_channels_add_id_object(_channels *channels, int64_t id)
+{
+    int res = -1;
+    PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
+
+    _channelref *ref = _channelref_find(channels->head, id, NULL);
+    if (ref == NULL) {
+        res = ERR_CHANNEL_NOT_FOUND;
+        goto done;
+    }
+    ref->objcount += 1;
+
+    res = 0;
+done:
+    PyThread_release_lock(channels->mutex);
+    return res;
+}
+
+static void
+_channels_drop_id_object(_channels *channels, int64_t id)
+{
+    PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
+
+    _channelref *prev = NULL;
+    _channelref *ref = _channelref_find(channels->head, id, &prev);
+    if (ref == NULL) {
+        // Already destroyed.
+        goto done;
+    }
+    ref->objcount -= 1;
+
+    // Destroy if no longer used.
+    if (ref->objcount == 0) {
+        _PyChannelState *chan = NULL;
+        _channels_remove_ref(channels, ref, prev, &chan);
+        if (chan != NULL) {
+            _channel_free(chan);
+        }
+    }
+
+done:
+    PyThread_release_lock(channels->mutex);
+}
+
+static int64_t *
+_channels_list_all(_channels *channels, int64_t *count)
+{
+    int64_t *cids = NULL;
+    PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
+    int64_t *ids = PyMem_NEW(int64_t, (Py_ssize_t)(channels->numopen));
+    if (ids == NULL) {
+        goto done;
+    }
+    _channelref *ref = channels->head;
+    for (int64_t i=0; ref != NULL; ref = ref->next, i++) {
+        ids[i] = ref->id;
+    }
+    *count = channels->numopen;
+
+    cids = ids;
+done:
+    PyThread_release_lock(channels->mutex);
+    return cids;
+}
+
+/* support for closing non-empty channels */
+
+struct _channel_closing {
+    struct _channelref *ref;
+};
+
+static int
+_channel_set_closing(struct _channelref *ref, PyThread_type_lock mutex) {
+    struct _channel *chan = ref->chan;
+    if (chan == NULL) {
+        // already closed
+        return 0;
+    }
+    int res = -1;
+    PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
+    if (chan->closing != NULL) {
+        res = ERR_CHANNEL_CLOSED;
+        goto done;
+    }
+    chan->closing = PyMem_NEW(struct _channel_closing, 1);
+    if (chan->closing == NULL) {
+        goto done;
+    }
+    chan->closing->ref = ref;
+
+    res = 0;
+done:
+    PyThread_release_lock(chan->mutex);
+    return res;
+}
+
+static void
+_channel_clear_closing(struct _channel *chan) {
+    PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
+    if (chan->closing != NULL) {
+        PyMem_Free(chan->closing);
+        chan->closing = NULL;
+    }
+    PyThread_release_lock(chan->mutex);
+}
+
+static void
+_channel_finish_closing(struct _channel *chan) {
+    struct _channel_closing *closing = chan->closing;
+    if (closing == NULL) {
+        return;
+    }
+    _channelref *ref = closing->ref;
+    _channel_clear_closing(chan);
+    // Do the things that would have been done in _channels_close().
+    ref->chan = NULL;
+    _channel_free(chan);
+}
+
+/* "high"-level channel-related functions */
+
+static int64_t
+_channel_create(_channels *channels)
+{
+    PyThread_type_lock mutex = PyThread_allocate_lock();
+    if (mutex == NULL) {
+        return ERR_CHANNEL_MUTEX_INIT;
+    }
+    _PyChannelState *chan = _channel_new(mutex);
+    if (chan == NULL) {
+        PyThread_free_lock(mutex);
+        return -1;
+    }
+    int64_t id = _channels_add(channels, chan);
+    if (id < 0) {
+        _channel_free(chan);
+    }
+    return id;
+}
+
+static int
+_channel_destroy(_channels *channels, int64_t id)
+{
+    _PyChannelState *chan = NULL;
+    int err = _channels_remove(channels, id, &chan);
+    if (err != 0) {
+        return err;
+    }
+    if (chan != NULL) {
+        _channel_free(chan);
+    }
+    return 0;
+}
+
+static int
+_channel_send(_channels *channels, int64_t id, PyObject *obj)
+{
+    PyInterpreterState *interp = _get_current_interp();
+    if (interp == NULL) {
+        return -1;
+    }
+
+    // Look up the channel.
+    PyThread_type_lock mutex = NULL;
+    _PyChannelState *chan = NULL;
+    int err = _channels_lookup(channels, id, &mutex, &chan);
+    if (err != 0) {
+        return err;
+    }
+    assert(chan != NULL);
+    // Past this point we are responsible for releasing the mutex.
+
+    if (chan->closing != NULL) {
+        PyThread_release_lock(mutex);
+        return ERR_CHANNEL_CLOSED;
+    }
+
+    // Convert the object to cross-interpreter data.
+    _PyCrossInterpreterData *data = PyMem_NEW(_PyCrossInterpreterData, 1);
+    if (data == NULL) {
+        PyThread_release_lock(mutex);
+        return -1;
+    }
+    if (_PyObject_GetCrossInterpreterData(obj, data) != 0) {
+        PyThread_release_lock(mutex);
+        PyMem_Free(data);
+        return -1;
+    }
+
+    // Add the data to the channel.
+    int res = _channel_add(chan, PyInterpreterState_GetID(interp), data);
+    PyThread_release_lock(mutex);
+    if (res != 0) {
+        // We may chain an exception here:
+        (void)_release_xid_data(data, 0);
+        PyMem_Free(data);
+        return res;
+    }
+
+    return 0;
+}
+
+static int
+_channel_recv(_channels *channels, int64_t id, PyObject **res)
+{
+    int err;
+    *res = NULL;
+
+    PyInterpreterState *interp = _get_current_interp();
+    if (interp == NULL) {
+        // XXX Is this always an error?
+        if (PyErr_Occurred()) {
+            return -1;
+        }
+        return 0;
+    }
+
+    // Look up the channel.
+    PyThread_type_lock mutex = NULL;
+    _PyChannelState *chan = NULL;
+    err = _channels_lookup(channels, id, &mutex, &chan);
+    if (err != 0) {
+        return err;
+    }
+    assert(chan != NULL);
+    // Past this point we are responsible for releasing the mutex.
+
+    // Pop off the next item from the channel.
+    _PyCrossInterpreterData *data = NULL;
+    err = _channel_next(chan, PyInterpreterState_GetID(interp), &data);
+    PyThread_release_lock(mutex);
+    if (err != 0) {
+        return err;
+    }
+    else if (data == NULL) {
+        assert(!PyErr_Occurred());
+        return 0;
+    }
+
+    // Convert the data back to an object.
+    PyObject *obj = _PyCrossInterpreterData_NewObject(data);
+    if (obj == NULL) {
+        assert(PyErr_Occurred());
+        (void)_release_xid_data(data, 1);
+        PyMem_Free(data);
+        return -1;
+    }
+    int release_res = _release_xid_data(data, 0);
+    PyMem_Free(data);
+    if (release_res < 0) {
+        // The source interpreter has been destroyed already.
+        assert(PyErr_Occurred());
+        Py_DECREF(obj);
+        return -1;
+    }
+
+    *res = obj;
+    return 0;
+}
+
+static int
+_channel_drop(_channels *channels, int64_t id, int send, int recv)
+{
+    PyInterpreterState *interp = _get_current_interp();
+    if (interp == NULL) {
+        return -1;
+    }
+
+    // Look up the channel.
+    PyThread_type_lock mutex = NULL;
+    _PyChannelState *chan = NULL;
+    int err = _channels_lookup(channels, id, &mutex, &chan);
+    if (err != 0) {
+        return err;
+    }
+    // Past this point we are responsible for releasing the mutex.
+
+    // Close one or both of the two ends.
+    int res = _channel_close_interpreter(chan, PyInterpreterState_GetID(interp), send-recv);
+    PyThread_release_lock(mutex);
+    return res;
+}
+
+static int
+_channel_close(_channels *channels, int64_t id, int end, int force)
+{
+    return _channels_close(channels, id, NULL, end, force);
+}
+
+static int
+_channel_is_associated(_channels *channels, int64_t cid, int64_t interp,
+                       int send)
+{
+    _PyChannelState *chan = NULL;
+    int err = _channels_lookup(channels, cid, NULL, &chan);
+    if (err != 0) {
+        return err;
+    }
+    else if (send && chan->closing != NULL) {
+        return ERR_CHANNEL_CLOSED;
+    }
+
+    _channelend *end = _channelend_find(send ? chan->ends->send : chan->ends->recv,
+                                        interp, NULL);
+
+    return (end != NULL && end->open);
+}
+
+/* ChannelID class */
+
+typedef struct channelid {
+    PyObject_HEAD
+    int64_t id;
+    int end;
+    int resolve;
+    _channels *channels;
+} channelid;
+
+struct channel_id_converter_data {
+    PyObject *module;
+    int64_t cid;
+};
+
+static int
+channel_id_converter(PyObject *arg, void *ptr)
+{
+    int64_t cid;
+    struct channel_id_converter_data *data = ptr;
+    module_state *state = get_module_state(data->module);
+    assert(state != NULL);
+    if (PyObject_TypeCheck(arg, state->ChannelIDType)) {
+        cid = ((channelid *)arg)->id;
+    }
+    else if (PyIndex_Check(arg)) {
+        cid = PyLong_AsLongLong(arg);
+        if (cid == -1 && PyErr_Occurred()) {
+            return 0;
+        }
+        if (cid < 0) {
+            PyErr_Format(PyExc_ValueError,
+                        "channel ID must be a non-negative int, got %R", arg);
+            return 0;
+        }
+    }
+    else {
+        PyErr_Format(PyExc_TypeError,
+                     "channel ID must be an int, got %.100s",
+                     Py_TYPE(arg)->tp_name);
+        return 0;
+    }
+    data->cid = cid;
+    return 1;
+}
+
+static int
+newchannelid(PyTypeObject *cls, int64_t cid, int end, _channels *channels,
+             int force, int resolve, channelid **res)
+{
+    *res = NULL;
+
+    channelid *self = PyObject_New(channelid, cls);
+    if (self == NULL) {
+        return -1;
+    }
+    self->id = cid;
+    self->end = end;
+    self->resolve = resolve;
+    self->channels = channels;
+
+    int err = _channels_add_id_object(channels, cid);
+    if (err != 0) {
+        if (force && err == ERR_CHANNEL_NOT_FOUND) {
+            assert(!PyErr_Occurred());
+        }
+        else {
+            Py_DECREF((PyObject *)self);
+            return err;
+        }
+    }
+
+    *res = self;
+    return 0;
+}
+
+static _channels * _global_channels(void);
+
+static PyObject *
+_channelid_new(PyObject *mod, PyTypeObject *cls,
+               PyObject *args, PyObject *kwds)
+{
+    static char *kwlist[] = {"id", "send", "recv", "force", "_resolve", NULL};
+    int64_t cid;
+    struct channel_id_converter_data cid_data = {
+        .module = mod,
+    };
+    int send = -1;
+    int recv = -1;
+    int force = 0;
+    int resolve = 0;
+    if (!PyArg_ParseTupleAndKeywords(args, kwds,
+                                     "O&|$pppp:ChannelID.__new__", kwlist,
+                                     channel_id_converter, &cid_data,
+                                     &send, &recv, &force, &resolve)) {
+        return NULL;
+    }
+    cid = cid_data.cid;
+
+    // Handle "send" and "recv".
+    if (send == 0 && recv == 0) {
+        PyErr_SetString(PyExc_ValueError,
+                        "'send' and 'recv' cannot both be False");
+        return NULL;
+    }
+
+    int end = 0;
+    if (send == 1) {
+        if (recv == 0 || recv == -1) {
+            end = CHANNEL_SEND;
+        }
+    }
+    else if (recv == 1) {
+        end = CHANNEL_RECV;
+    }
+
+    PyObject *id = NULL;
+    int err = newchannelid(cls, cid, end, _global_channels(),
+                           force, resolve,
+                           (channelid **)&id);
+    if (handle_channel_error(err, mod, cid)) {
+        assert(id == NULL);
+        return NULL;
+    }
+    assert(id != NULL);
+    return id;
+}
+
+static void
+channelid_dealloc(PyObject *self)
+{
+    int64_t cid = ((channelid *)self)->id;
+    _channels *channels = ((channelid *)self)->channels;
+
+    PyTypeObject *tp = Py_TYPE(self);
+    tp->tp_free(self);
+    /* "Instances of heap-allocated types hold a reference to their type."
+     * See: https://docs.python.org/3.11/howto/isolating-extensions.html#garbage-collection-protocol
+     * See: https://docs.python.org/3.11/c-api/typeobj.html#c.PyTypeObject.tp_traverse
+    */
+    // XXX Why don't we implement Py_TPFLAGS_HAVE_GC, e.g. Py_tp_traverse,
+    // like we do for _abc._abc_data?
+    Py_DECREF(tp);
+
+    _channels_drop_id_object(channels, cid);
+}
+
+static PyObject *
+channelid_repr(PyObject *self)
+{
+    PyTypeObject *type = Py_TYPE(self);
+    const char *name = _PyType_Name(type);
+
+    channelid *cid = (channelid *)self;
+    const char *fmt;
+    if (cid->end == CHANNEL_SEND) {
+        fmt = "%s(%" PRId64 ", send=True)";
+    }
+    else if (cid->end == CHANNEL_RECV) {
+        fmt = "%s(%" PRId64 ", recv=True)";
+    }
+    else {
+        fmt = "%s(%" PRId64 ")";
+    }
+    return PyUnicode_FromFormat(fmt, name, cid->id);
+}
+
+static PyObject *
+channelid_str(PyObject *self)
+{
+    channelid *cid = (channelid *)self;
+    return PyUnicode_FromFormat("%" PRId64 "", cid->id);
+}
+
+static PyObject *
+channelid_int(PyObject *self)
+{
+    channelid *cid = (channelid *)self;
+    return PyLong_FromLongLong(cid->id);
+}
+
+static Py_hash_t
+channelid_hash(PyObject *self)
+{
+    channelid *cid = (channelid *)self;
+    PyObject *id = PyLong_FromLongLong(cid->id);
+    if (id == NULL) {
+        return -1;
+    }
+    Py_hash_t hash = PyObject_Hash(id);
+    Py_DECREF(id);
+    return hash;
+}
+
+static PyObject *
+channelid_richcompare(PyObject *self, PyObject *other, int op)
+{
+    PyObject *res = NULL;
+    if (op != Py_EQ && op != Py_NE) {
+        Py_RETURN_NOTIMPLEMENTED;
+    }
+
+    PyObject *mod = get_module_from_type(Py_TYPE(self));
+    if (mod == NULL) {
+        return NULL;
+    }
+    module_state *state = get_module_state(mod);
+    if (state == NULL) {
+        goto done;
+    }
+
+    if (!PyObject_TypeCheck(self, state->ChannelIDType)) {
+        res = Py_NewRef(Py_NotImplemented);
+        goto done;
+    }
+
+    channelid *cid = (channelid *)self;
+    int equal;
+    if (PyObject_TypeCheck(other, state->ChannelIDType)) {
+        channelid *othercid = (channelid *)other;
+        equal = (cid->end == othercid->end) && (cid->id == othercid->id);
+    }
+    else if (PyLong_Check(other)) {
+        /* Fast path */
+        int overflow;
+        long long othercid = PyLong_AsLongLongAndOverflow(other, &overflow);
+        if (othercid == -1 && PyErr_Occurred()) {
+            goto done;
+        }
+        equal = !overflow && (othercid >= 0) && (cid->id == othercid);
+    }
+    else if (PyNumber_Check(other)) {
+        PyObject *pyid = PyLong_FromLongLong(cid->id);
+        if (pyid == NULL) {
+            goto done;
+        }
+        res = PyObject_RichCompare(pyid, other, op);
+        Py_DECREF(pyid);
+        goto done;
+    }
+    else {
+        res = Py_NewRef(Py_NotImplemented);
+        goto done;
+    }
+
+    if ((op == Py_EQ && equal) || (op == Py_NE && !equal)) {
+        res = Py_NewRef(Py_True);
+    }
+    else {
+        res = Py_NewRef(Py_False);
+    }
+
+done:
+    Py_DECREF(mod);
+    return res;
+}
+
+static PyObject *
+_channel_from_cid(PyObject *cid, int end)
+{
+    PyObject *highlevel = PyImport_ImportModule("interpreters");
+    if (highlevel == NULL) {
+        PyErr_Clear();
+        highlevel = PyImport_ImportModule("test.support.interpreters");
+        if (highlevel == NULL) {
+            return NULL;
+        }
+    }
+    const char *clsname = (end == CHANNEL_RECV) ? "RecvChannel" :
+                                                  "SendChannel";
+    PyObject *cls = PyObject_GetAttrString(highlevel, clsname);
+    Py_DECREF(highlevel);
+    if (cls == NULL) {
+        return NULL;
+    }
+    PyObject *chan = PyObject_CallFunctionObjArgs(cls, cid, NULL);
+    Py_DECREF(cls);
+    if (chan == NULL) {
+        return NULL;
+    }
+    return chan;
+}
+
+struct _channelid_xid {
+    int64_t id;
+    int end;
+    int resolve;
+};
+
+static PyObject *
+_channelid_from_xid(_PyCrossInterpreterData *data)
+{
+    struct _channelid_xid *xid = (struct _channelid_xid *)data->data;
+
+    // It might not be imported yet, so we can't use _get_current_module().
+    PyObject *mod = PyImport_ImportModule(MODULE_NAME);
+    if (mod == NULL) {
+        return NULL;
+    }
+    assert(mod != Py_None);
+    module_state *state = get_module_state(mod);
+    if (state == NULL) {
+        return NULL;
+    }
+
+    // Note that we do not preserve the "resolve" flag.
+    PyObject *cid = NULL;
+    int err = newchannelid(state->ChannelIDType, xid->id, xid->end,
+                           _global_channels(), 0, 0,
+                           (channelid **)&cid);
+    if (err != 0) {
+        assert(cid == NULL);
+        (void)handle_channel_error(err, mod, xid->id);
+        goto done;
+    }
+    assert(cid != NULL);
+    if (xid->end == 0) {
+        goto done;
+    }
+    if (!xid->resolve) {
+        goto done;
+    }
+
+    /* Try returning a high-level channel end but fall back to the ID. */
+    PyObject *chan = _channel_from_cid(cid, xid->end);
+    if (chan == NULL) {
+        PyErr_Clear();
+        goto done;
+    }
+    Py_DECREF(cid);
+    cid = chan;
+
+done:
+    Py_DECREF(mod);
+    return cid;
+}
+
+static int
+_channelid_shared(PyThreadState *tstate, PyObject *obj,
+                  _PyCrossInterpreterData *data)
+{
+    if (_PyCrossInterpreterData_InitWithSize(
+            data, tstate->interp, sizeof(struct _channelid_xid), obj,
+            _channelid_from_xid
+            ) < 0)
+    {
+        return -1;
+    }
+    struct _channelid_xid *xid = (struct _channelid_xid *)data->data;
+    xid->id = ((channelid *)obj)->id;
+    xid->end = ((channelid *)obj)->end;
+    xid->resolve = ((channelid *)obj)->resolve;
+    return 0;
+}
+
+static PyObject *
+channelid_end(PyObject *self, void *end)
+{
+    int force = 1;
+    channelid *cid = (channelid *)self;
+    if (end != NULL) {
+        PyObject *id = NULL;
+        int err = newchannelid(Py_TYPE(self), cid->id, *(int *)end,
+                               cid->channels, force, cid->resolve,
+                               (channelid **)&id);
+        if (err != 0) {
+            assert(id == NULL);
+            PyObject *mod = get_module_from_type(Py_TYPE(self));
+            if (mod == NULL) {
+                return NULL;
+            }
+            (void)handle_channel_error(err, mod, cid->id);
+            Py_DECREF(mod);
+            return NULL;
+        }
+        assert(id != NULL);
+        return id;
+    }
+
+    if (cid->end == CHANNEL_SEND) {
+        return PyUnicode_InternFromString("send");
+    }
+    if (cid->end == CHANNEL_RECV) {
+        return PyUnicode_InternFromString("recv");
+    }
+    return PyUnicode_InternFromString("both");
+}
+
+static int _channelid_end_send = CHANNEL_SEND;
+static int _channelid_end_recv = CHANNEL_RECV;
+
+static PyGetSetDef channelid_getsets[] = {
+    {"end", (getter)channelid_end, NULL,
+     PyDoc_STR("'send', 'recv', or 'both'")},
+    {"send", (getter)channelid_end, NULL,
+     PyDoc_STR("the 'send' end of the channel"), &_channelid_end_send},
+    {"recv", (getter)channelid_end, NULL,
+     PyDoc_STR("the 'recv' end of the channel"), &_channelid_end_recv},
+    {NULL}
+};
+
+PyDoc_STRVAR(channelid_doc,
+"A channel ID identifies a channel and may be used as an int.");
+
+static PyType_Slot ChannelIDType_slots[] = {
+    {Py_tp_dealloc, (destructor)channelid_dealloc},
+    {Py_tp_doc, (void *)channelid_doc},
+    {Py_tp_repr, (reprfunc)channelid_repr},
+    {Py_tp_str, (reprfunc)channelid_str},
+    {Py_tp_hash, channelid_hash},
+    {Py_tp_richcompare, channelid_richcompare},
+    {Py_tp_getset, channelid_getsets},
+    // number slots
+    {Py_nb_int, (unaryfunc)channelid_int},
+    {Py_nb_index,  (unaryfunc)channelid_int},
+    {0, NULL},
+};
+
+static PyType_Spec ChannelIDType_spec = {
+    .name = "_xxsubinterpreters.ChannelID",
+    .basicsize = sizeof(channelid),
+    .flags = (Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE |
+              Py_TPFLAGS_DISALLOW_INSTANTIATION | Py_TPFLAGS_IMMUTABLETYPE),
+    .slots = ChannelIDType_slots,
+};
+
+
+/* module level code ********************************************************/
+
+/* globals is the process-global state for the module.  It holds all
+   the data that we need to share between interpreters, so it cannot
+   hold PyObject values. */
+static struct globals {
+    int module_count;
+    _channels channels;
+} _globals = {0};
+
+static int
+_globals_init(void)
+{
+    // XXX This isn't thread-safe.
+    _globals.module_count++;
+    if (_globals.module_count > 1) {
+        // Already initialized.
+        return 0;
+    }
+
+    assert(_globals.channels.mutex == NULL);
+    PyThread_type_lock mutex = PyThread_allocate_lock();
+    if (mutex == NULL) {
+        return ERR_CHANNELS_MUTEX_INIT;
+    }
+    _channels_init(&_globals.channels, mutex);
+    return 0;
+}
+
+static void
+_globals_fini(void)
+{
+    // XXX This isn't thread-safe.
+    _globals.module_count--;
+    if (_globals.module_count > 0) {
+        return;
+    }
+
+    _channels_fini(&_globals.channels);
+}
+
+static _channels *
+_global_channels(void) {
+    return &_globals.channels;
+}
+
+
+static PyObject *
+channel_create(PyObject *self, PyObject *Py_UNUSED(ignored))
+{
+    int64_t cid = _channel_create(&_globals.channels);
+    if (cid < 0) {
+        (void)handle_channel_error(-1, self, cid);
+        return NULL;
+    }
+    module_state *state = get_module_state(self);
+    if (state == NULL) {
+        return NULL;
+    }
+    PyObject *id = NULL;
+    int err = newchannelid(state->ChannelIDType, cid, 0,
+                           &_globals.channels, 0, 0,
+                           (channelid **)&id);
+    if (handle_channel_error(err, self, cid)) {
+        assert(id == NULL);
+        err = _channel_destroy(&_globals.channels, cid);
+        if (handle_channel_error(err, self, cid)) {
+            // XXX issue a warning?
+        }
+        return NULL;
+    }
+    assert(id != NULL);
+    assert(((channelid *)id)->channels != NULL);
+    return id;
+}
+
+PyDoc_STRVAR(channel_create_doc,
+"channel_create() -> cid\n\
+\n\
+Create a new cross-interpreter channel and return a unique generated ID.");
+
+static PyObject *
+channel_destroy(PyObject *self, PyObject *args, PyObject *kwds)
+{
+    static char *kwlist[] = {"cid", NULL};
+    int64_t cid;
+    struct channel_id_converter_data cid_data = {
+        .module = self,
+    };
+    if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&:channel_destroy", kwlist,
+                                     channel_id_converter, &cid_data)) {
+        return NULL;
+    }
+    cid = cid_data.cid;
+
+    int err = _channel_destroy(&_globals.channels, cid);
+    if (handle_channel_error(err, self, cid)) {
+        return NULL;
+    }
+    Py_RETURN_NONE;
+}
+
+PyDoc_STRVAR(channel_destroy_doc,
+"channel_destroy(cid)\n\
+\n\
+Close and finalize the channel.  Afterward attempts to use the channel\n\
+will behave as though it never existed.");
+
+static PyObject *
+channel_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
+{
+    int64_t count = 0;
+    int64_t *cids = _channels_list_all(&_globals.channels, &count);
+    if (cids == NULL) {
+        if (count == 0) {
+            return PyList_New(0);
+        }
+        return NULL;
+    }
+    PyObject *ids = PyList_New((Py_ssize_t)count);
+    if (ids == NULL) {
+        goto finally;
+    }
+    module_state *state = get_module_state(self);
+    if (state == NULL) {
+        Py_DECREF(ids);
+        ids = NULL;
+        goto finally;
+    }
+    int64_t *cur = cids;
+    for (int64_t i=0; i < count; cur++, i++) {
+        PyObject *id = NULL;
+        int err = newchannelid(state->ChannelIDType, *cur, 0,
+                               &_globals.channels, 0, 0,
+                               (channelid **)&id);
+        if (handle_channel_error(err, self, *cur)) {
+            assert(id == NULL);
+            Py_SETREF(ids, NULL);
+            break;
+        }
+        assert(id != NULL);
+        PyList_SET_ITEM(ids, (Py_ssize_t)i, id);
+    }
+
+finally:
+    PyMem_Free(cids);
+    return ids;
+}
+
+PyDoc_STRVAR(channel_list_all_doc,
+"channel_list_all() -> [cid]\n\
+\n\
+Return the list of all IDs for active channels.");
+
+static PyObject *
+channel_list_interpreters(PyObject *self, PyObject *args, PyObject *kwds)
+{
+    static char *kwlist[] = {"cid", "send", NULL};
+    int64_t cid;            /* Channel ID */
+    struct channel_id_converter_data cid_data = {
+        .module = self,
+    };
+    int send = 0;           /* Send or receive end? */
+    int64_t id;
+    PyObject *ids, *id_obj;
+    PyInterpreterState *interp;
+
+    if (!PyArg_ParseTupleAndKeywords(
+            args, kwds, "O&$p:channel_list_interpreters",
+            kwlist, channel_id_converter, &cid_data, &send)) {
+        return NULL;
+    }
+    cid = cid_data.cid;
+
+    ids = PyList_New(0);
+    if (ids == NULL) {
+        goto except;
+    }
+
+    interp = PyInterpreterState_Head();
+    while (interp != NULL) {
+        id = PyInterpreterState_GetID(interp);
+        assert(id >= 0);
+        int res = _channel_is_associated(&_globals.channels, cid, id, send);
+        if (res < 0) {
+            (void)handle_channel_error(res, self, cid);
+            goto except;
+        }
+        if (res) {
+            id_obj = _PyInterpreterState_GetIDObject(interp);
+            if (id_obj == NULL) {
+                goto except;
+            }
+            res = PyList_Insert(ids, 0, id_obj);
+            Py_DECREF(id_obj);
+            if (res < 0) {
+                goto except;
+            }
+        }
+        interp = PyInterpreterState_Next(interp);
+    }
+
+    goto finally;
+
+except:
+    Py_CLEAR(ids);
+
+finally:
+    return ids;
+}
+
+PyDoc_STRVAR(channel_list_interpreters_doc,
+"channel_list_interpreters(cid, *, send) -> [id]\n\
+\n\
+Return the list of all interpreter IDs associated with an end of the channel.\n\
+\n\
+The 'send' argument should be a boolean indicating whether to use the send or\n\
+receive end.");
+
+
+static PyObject *
+channel_send(PyObject *self, PyObject *args, PyObject *kwds)
+{
+    static char *kwlist[] = {"cid", "obj", NULL};
+    int64_t cid;
+    struct channel_id_converter_data cid_data = {
+        .module = self,
+    };
+    PyObject *obj;
+    if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O:channel_send", kwlist,
+                                     channel_id_converter, &cid_data, &obj)) {
+        return NULL;
+    }
+    cid = cid_data.cid;
+
+    int err = _channel_send(&_globals.channels, cid, obj);
+    if (handle_channel_error(err, self, cid)) {
+        return NULL;
+    }
+    Py_RETURN_NONE;
+}
+
+PyDoc_STRVAR(channel_send_doc,
+"channel_send(cid, obj)\n\
+\n\
+Add the object's data to the channel's queue.");
+
+static PyObject *
+channel_recv(PyObject *self, PyObject *args, PyObject *kwds)
+{
+    static char *kwlist[] = {"cid", "default", NULL};
+    int64_t cid;
+    struct channel_id_converter_data cid_data = {
+        .module = self,
+    };
+    PyObject *dflt = NULL;
+    if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&|O:channel_recv", kwlist,
+                                     channel_id_converter, &cid_data, &dflt)) {
+        return NULL;
+    }
+    cid = cid_data.cid;
+
+    PyObject *obj = NULL;
+    int err = _channel_recv(&_globals.channels, cid, &obj);
+    if (handle_channel_error(err, self, cid)) {
+        return NULL;
+    }
+    Py_XINCREF(dflt);
+    if (obj == NULL) {
+        // Use the default.
+        if (dflt == NULL) {
+            (void)handle_channel_error(ERR_CHANNEL_EMPTY, self, cid);
+            return NULL;
+        }
+        obj = Py_NewRef(dflt);
+    }
+    Py_XDECREF(dflt);
+    return obj;
+}
+
+PyDoc_STRVAR(channel_recv_doc,
+"channel_recv(cid, [default]) -> obj\n\
+\n\
+Return a new object from the data at the front of the channel's queue.\n\
+\n\
+If there is nothing to receive then raise ChannelEmptyError, unless\n\
+a default value is provided.  In that case return it.");
+
+static PyObject *
+channel_close(PyObject *self, PyObject *args, PyObject *kwds)
+{
+    static char *kwlist[] = {"cid", "send", "recv", "force", NULL};
+    int64_t cid;
+    struct channel_id_converter_data cid_data = {
+        .module = self,
+    };
+    int send = 0;
+    int recv = 0;
+    int force = 0;
+    if (!PyArg_ParseTupleAndKeywords(args, kwds,
+                                     "O&|$ppp:channel_close", kwlist,
+                                     channel_id_converter, &cid_data,
+                                     &send, &recv, &force)) {
+        return NULL;
+    }
+    cid = cid_data.cid;
+
+    int err = _channel_close(&_globals.channels, cid, send-recv, force);
+    if (handle_channel_error(err, self, cid)) {
+        return NULL;
+    }
+    Py_RETURN_NONE;
+}
+
+PyDoc_STRVAR(channel_close_doc,
+"channel_close(cid, *, send=None, recv=None, force=False)\n\
+\n\
+Close the channel for all interpreters.\n\
+\n\
+If the channel is empty then the keyword args are ignored and both\n\
+ends are immediately closed.  Otherwise, if 'force' is True then\n\
+all queued items are released and both ends are immediately\n\
+closed.\n\
+\n\
+If the channel is not empty *and* 'force' is False then following\n\
+happens:\n\
+\n\
+ * recv is True (regardless of send):\n\
+   - raise ChannelNotEmptyError\n\
+ * recv is None and send is None:\n\
+   - raise ChannelNotEmptyError\n\
+ * send is True and recv is not True:\n\
+   - fully close the 'send' end\n\
+   - close the 'recv' end to interpreters not already receiving\n\
+   - fully close it once empty\n\
+\n\
+Closing an already closed channel results in a ChannelClosedError.\n\
+\n\
+Once the channel's ID has no more ref counts in any interpreter\n\
+the channel will be destroyed.");
+
+static PyObject *
+channel_release(PyObject *self, PyObject *args, PyObject *kwds)
+{
+    // Note that only the current interpreter is affected.
+    static char *kwlist[] = {"cid", "send", "recv", "force", NULL};
+    int64_t cid;
+    struct channel_id_converter_data cid_data = {
+        .module = self,
+    };
+    int send = 0;
+    int recv = 0;
+    int force = 0;
+    if (!PyArg_ParseTupleAndKeywords(args, kwds,
+                                     "O&|$ppp:channel_release", kwlist,
+                                     channel_id_converter, &cid_data,
+                                     &send, &recv, &force)) {
+        return NULL;
+    }
+    cid = cid_data.cid;
+    if (send == 0 && recv == 0) {
+        send = 1;
+        recv = 1;
+    }
+
+    // XXX Handle force is True.
+    // XXX Fix implicit release.
+
+    int err = _channel_drop(&_globals.channels, cid, send, recv);
+    if (handle_channel_error(err, self, cid)) {
+        return NULL;
+    }
+    Py_RETURN_NONE;
+}
+
+PyDoc_STRVAR(channel_release_doc,
+"channel_release(cid, *, send=None, recv=None, force=True)\n\
+\n\
+Close the channel for the current interpreter.  'send' and 'recv'\n\
+(bool) may be used to indicate the ends to close.  By default both\n\
+ends are closed.  Closing an already closed end is a noop.");
+
+static PyObject *
+channel__channel_id(PyObject *self, PyObject *args, PyObject *kwds)
+{
+    module_state *state = get_module_state(self);
+    if (state == NULL) {
+        return NULL;
+    }
+    PyTypeObject *cls = state->ChannelIDType;
+    PyObject *mod = get_module_from_owned_type(cls);
+    if (mod == NULL) {
+        return NULL;
+    }
+    PyObject *cid = _channelid_new(mod, cls, args, kwds);
+    Py_DECREF(mod);
+    return cid;
+}
+
+static PyMethodDef module_functions[] = {
+    {"create",                    channel_create,
+     METH_NOARGS, channel_create_doc},
+    {"destroy",                   _PyCFunction_CAST(channel_destroy),
+     METH_VARARGS | METH_KEYWORDS, channel_destroy_doc},
+    {"list_all",                  channel_list_all,
+     METH_NOARGS, channel_list_all_doc},
+    {"list_interpreters",         _PyCFunction_CAST(channel_list_interpreters),
+     METH_VARARGS | METH_KEYWORDS, channel_list_interpreters_doc},
+    {"send",                      _PyCFunction_CAST(channel_send),
+     METH_VARARGS | METH_KEYWORDS, channel_send_doc},
+    {"recv",                      _PyCFunction_CAST(channel_recv),
+     METH_VARARGS | METH_KEYWORDS, channel_recv_doc},
+    {"close",                     _PyCFunction_CAST(channel_close),
+     METH_VARARGS | METH_KEYWORDS, channel_close_doc},
+    {"release",                   _PyCFunction_CAST(channel_release),
+     METH_VARARGS | METH_KEYWORDS, channel_release_doc},
+    {"_channel_id",               _PyCFunction_CAST(channel__channel_id),
+     METH_VARARGS | METH_KEYWORDS, NULL},
+
+    {NULL,                        NULL}           /* sentinel */
+};
+
+
+/* initialization function */
+
+PyDoc_STRVAR(module_doc,
+"This module provides primitive operations to manage Python interpreters.\n\
+The 'interpreters' module provides a more convenient interface.");
+
+static int
+module_exec(PyObject *mod)
+{
+    if (_globals_init() != 0) {
+        return -1;
+    }
+
+    /* Add exception types */
+    if (exceptions_init(mod) != 0) {
+        goto error;
+    }
+
+    /* Add other types */
+    module_state *state = get_module_state(mod);
+    if (state == NULL) {
+        goto error;
+    }
+
+    // ChannelID
+    state->ChannelIDType = add_new_type(
+            mod, &ChannelIDType_spec, _channelid_shared);
+    if (state->ChannelIDType == NULL) {
+        goto error;
+    }
+
+    return 0;
+
+error:
+    (void)_PyCrossInterpreterData_UnregisterClass(state->ChannelIDType);
+    _globals_fini();
+    return -1;
+}
+
+static struct PyModuleDef_Slot module_slots[] = {
+    {Py_mod_exec, module_exec},
+    {0, NULL},
+};
+
+static int
+module_traverse(PyObject *mod, visitproc visit, void *arg)
+{
+    module_state *state = get_module_state(mod);
+    assert(state != NULL);
+    traverse_module_state(state, visit, arg);
+    return 0;
+}
+
+static int
+module_clear(PyObject *mod)
+{
+    module_state *state = get_module_state(mod);
+    assert(state != NULL);
+    clear_module_state(state);
+    return 0;
+}
+
+static void
+module_free(void *mod)
+{
+    module_state *state = get_module_state(mod);
+    assert(state != NULL);
+    clear_module_state(state);
+    _globals_fini();
+}
+
+static struct PyModuleDef moduledef = {
+    .m_base = PyModuleDef_HEAD_INIT,
+    .m_name = MODULE_NAME,
+    .m_doc = module_doc,
+    .m_size = sizeof(module_state),
+    .m_methods = module_functions,
+    .m_slots = module_slots,
+    .m_traverse = module_traverse,
+    .m_clear = module_clear,
+    .m_free = (freefunc)module_free,
+};
+
+PyMODINIT_FUNC
+PyInit__xxinterpchannels(void)
+{
+    return PyModuleDef_Init(&moduledef);
+}
diff --git a/Modules/_xxsubinterpretersmodule.c b/Modules/_xxsubinterpretersmodule.c
index 0892fa3a9595..461c505c092c 100644
--- a/Modules/_xxsubinterpretersmodule.c
+++ b/Modules/_xxsubinterpretersmodule.c
@@ -39,43 +39,6 @@ _get_current_interp(void)
     return PyInterpreterState_Get();
 }
 
-static PyObject *
-_get_current_module(void)
-{
-    // We ensured it was imported in _run_script().
-    PyObject *name = PyUnicode_FromString(MODULE_NAME);
-    if (name == NULL) {
-        return NULL;
-    }
-    PyObject *mod = PyImport_GetModule(name);
-    Py_DECREF(name);
-    if (mod == NULL) {
-        return NULL;
-    }
-    assert(mod != Py_None);
-    return mod;
-}
-
-static PyObject *
-get_module_from_owned_type(PyTypeObject *cls)
-{
-    assert(cls != NULL);
-    return _get_current_module();
-    // XXX Use the more efficient API now that we use heap types:
-    //return PyType_GetModule(cls);
-}
-
-static struct PyModuleDef moduledef;
-
-static PyObject *
-get_module_from_type(PyTypeObject *cls)
-{
-    assert(cls != NULL);
-    return _get_current_module();
-    // XXX Use the more efficient API now that we use heap types:
-    //return PyType_GetModuleByDef(cls, &moduledef);
-}
-
 static PyObject *
 add_new_exception(PyObject *mod, const char *name, PyObject *base)
 {
@@ -95,27 +58,6 @@ add_new_exception(PyObject *mod, const char *name, PyObject *base)
 #define ADD_NEW_EXCEPTION(MOD, NAME, BASE) \
     add_new_exception(MOD, MODULE_NAME "." Py_STRINGIFY(NAME), BASE)
 
-static PyTypeObject *
-add_new_type(PyObject *mod, PyType_Spec *spec, crossinterpdatafunc shared)
-{
-    PyTypeObject *cls = (PyTypeObject *)PyType_FromMetaclass(
-                NULL, mod, spec, NULL);
-    if (cls == NULL) {
-        return NULL;
-    }
-    if (PyModule_AddType(mod, cls) < 0) {
-        Py_DECREF(cls);
-        return NULL;
-    }
-    if (shared != NULL) {
-        if (_PyCrossInterpreterData_RegisterClass(cls, shared)) {
-            Py_DECREF(cls);
-            return NULL;
-        }
-    }
-    return cls;
-}
-
 static int
 _release_xid_data(_PyCrossInterpreterData *data, int ignoreexc)
 {
@@ -127,9 +69,7 @@ _release_xid_data(_PyCrossInterpreterData *data, int ignoreexc)
     if (res < 0) {
         // XXX Fix this!
         /* The owning interpreter is already destroyed.
-         * Ideally, this shouldn't ever happen.  When an interpreter is
-         * about to be destroyed, we should clear out all of its objects
-         * from every channel associated with that interpreter.
+         * Ideally, this shouldn't ever happen.  (It's highly unlikely.)
          * For now we hack around that to resolve refleaks, by decref'ing
          * the released object here, even if its the wrong interpreter.
          * The owning interpreter has already been destroyed
@@ -153,17 +93,8 @@ _release_xid_data(_PyCrossInterpreterData *data, int ignoreexc)
 /* module state *************************************************************/
 
 typedef struct {
-    PyTypeObject *ChannelIDType;
-
-    /* interpreter exceptions */
+    /* exceptions */
     PyObject *RunFailedError;
-
-    /* channel exceptions */
-    PyObject *ChannelError;
-    PyObject *ChannelNotFoundError;
-    PyObject *ChannelClosedError;
-    PyObject *ChannelEmptyError;
-    PyObject *ChannelNotEmptyError;
 } module_state;
 
 static inline module_state *
@@ -178,37 +109,18 @@ get_module_state(PyObject *mod)
 static int
 traverse_module_state(module_state *state, visitproc visit, void *arg)
 {
-    /* heap types */
-    Py_VISIT(state->ChannelIDType);
-
-    /* interpreter exceptions */
+    /* exceptions */
     Py_VISIT(state->RunFailedError);
 
-    /* channel exceptions */
-    Py_VISIT(state->ChannelError);
-    Py_VISIT(state->ChannelNotFoundError);
-    Py_VISIT(state->ChannelClosedError);
-    Py_VISIT(state->ChannelEmptyError);
-    Py_VISIT(state->ChannelNotEmptyError);
     return 0;
 }
 
 static int
 clear_module_state(module_state *state)
 {
-    /* heap types */
-    (void)_PyCrossInterpreterData_UnregisterClass(state->ChannelIDType);
-    Py_CLEAR(state->ChannelIDType);
-
-    /* interpreter exceptions */
+    /* exceptions */
     Py_CLEAR(state->RunFailedError);
 
-    /* channel exceptions */
-    Py_CLEAR(state->ChannelError);
-    Py_CLEAR(state->ChannelNotFoundError);
-    Py_CLEAR(state->ChannelClosedError);
-    Py_CLEAR(state->ChannelEmptyError);
-    Py_CLEAR(state->ChannelNotEmptyError);
     return 0;
 }
 
@@ -298,10 +210,8 @@ _sharedns_free(_sharedns *shared)
 }
 
 static _sharedns *
-_get_shared_ns(PyObject *shareable, PyTypeObject *channelidtype,
-               int *needs_import)
+_get_shared_ns(PyObject *shareable)
 {
-    *needs_import = 0;
     if (shareable == NULL || shareable == Py_None) {
         return NULL;
     }
@@ -323,9 +233,6 @@ _get_shared_ns(PyObject *shareable, PyTypeObject *channelidtype,
         if (_sharednsitem_init(&shared->items[i], key, value) != 0) {
             break;
         }
-        if (Py_TYPE(value) == channelidtype) {
-            *needs_import = 1;
-        }
     }
     if (PyErr_Occurred()) {
         _sharedns_free(shared);
@@ -394,1701 +301,79 @@ _sharedexception_bind(PyObject *exctype, PyObject *exc, PyObject *tb)
 
     _sharedexception *err = _sharedexception_new();
     if (err == NULL) {
-        goto finally;
-    }
-
-    PyObject *name = PyUnicode_FromFormat("%S", exctype);
-    if (name == NULL) {
-        failure = "unable to format exception type name";
-        goto finally;
-    }
-    err->name = _copy_raw_string(name);
-    Py_DECREF(name);
-    if (err->name == NULL) {
-        if (PyErr_ExceptionMatches(PyExc_MemoryError)) {
-            failure = "out of memory copying exception type name";
-        } else {
-            failure = "unable to encode and copy exception type name";
-        }
-        goto finally;
-    }
-
-    if (exc != NULL) {
-        PyObject *msg = PyUnicode_FromFormat("%S", exc);
-        if (msg == NULL) {
-            failure = "unable to format exception message";
-            goto finally;
-        }
-        err->msg = _copy_raw_string(msg);
-        Py_DECREF(msg);
-        if (err->msg == NULL) {
-            if (PyErr_ExceptionMatches(PyExc_MemoryError)) {
-                failure = "out of memory copying exception message";
-            } else {
-                failure = "unable to encode and copy exception message";
-            }
-            goto finally;
-        }
-    }
-
-finally:
-    if (failure != NULL) {
-        PyErr_Clear();
-        if (err->name != NULL) {
-            PyMem_Free(err->name);
-            err->name = NULL;
-        }
-        err->msg = failure;
-    }
-    return err;
-}
-
-static void
-_sharedexception_apply(_sharedexception *exc, PyObject *wrapperclass)
-{
-    if (exc->name != NULL) {
-        if (exc->msg != NULL) {
-            PyErr_Format(wrapperclass, "%s: %s",  exc->name, exc->msg);
-        }
-        else {
-            PyErr_SetString(wrapperclass, exc->name);
-        }
-    }
-    else if (exc->msg != NULL) {
-        PyErr_SetString(wrapperclass, exc->msg);
-    }
-    else {
-        PyErr_SetNone(wrapperclass);
-    }
-}
-
-
-/* channel-specific code ****************************************************/
-
-#define CHANNEL_SEND 1
-#define CHANNEL_BOTH 0
-#define CHANNEL_RECV -1
-
-/* channel errors */
-
-#define ERR_CHANNEL_NOT_FOUND -2
-#define ERR_CHANNEL_CLOSED -3
-#define ERR_CHANNEL_INTERP_CLOSED -4
-#define ERR_CHANNEL_EMPTY -5
-#define ERR_CHANNEL_NOT_EMPTY -6
-#define ERR_CHANNEL_MUTEX_INIT -7
-#define ERR_CHANNELS_MUTEX_INIT -8
-#define ERR_NO_NEXT_CHANNEL_ID -9
-
-static int
-channel_exceptions_init(PyObject *mod)
-{
-    module_state *state = get_module_state(mod);
-    if (state == NULL) {
-        return -1;
-    }
-
-#define ADD(NAME, BASE) \
-    do { \
-        assert(state->NAME == NULL); \
-        state->NAME = ADD_NEW_EXCEPTION(mod, NAME, BASE); \
-        if (state->NAME == NULL) { \
-            return -1; \
-        } \
-    } while (0)
-
-    // A channel-related operation failed.
-    ADD(ChannelError, PyExc_RuntimeError);
-    // An operation tried to use a channel that doesn't exist.
-    ADD(ChannelNotFoundError, state->ChannelError);
-    // An operation tried to use a closed channel.
-    ADD(ChannelClosedError, state->ChannelError);
-    // An operation tried to pop from an empty channel.
-    ADD(ChannelEmptyError, state->ChannelError);
-    // An operation tried to close a non-empty channel.
-    ADD(ChannelNotEmptyError, state->ChannelError);
-#undef ADD
-
-    return 0;
-}
-
-static int
-handle_channel_error(int err, PyObject *mod, int64_t cid)
-{
-    if (err == 0) {
-        assert(!PyErr_Occurred());
-        return 0;
-    }
-    assert(err < 0);
-    module_state *state = get_module_state(mod);
-    assert(state != NULL);
-    if (err == ERR_CHANNEL_NOT_FOUND) {
-        PyErr_Format(state->ChannelNotFoundError,
-                     "channel %" PRId64 " not found", cid);
-    }
-    else if (err == ERR_CHANNEL_CLOSED) {
-        PyErr_Format(state->ChannelClosedError,
-                     "channel %" PRId64 " is closed", cid);
-    }
-    else if (err == ERR_CHANNEL_INTERP_CLOSED) {
-        PyErr_Format(state->ChannelClosedError,
-                     "channel %" PRId64 " is already closed", cid);
-    }
-    else if (err == ERR_CHANNEL_EMPTY) {
-        PyErr_Format(state->ChannelEmptyError,
-                     "channel %" PRId64 " is empty", cid);
-    }
-    else if (err == ERR_CHANNEL_NOT_EMPTY) {
-        PyErr_Format(state->ChannelNotEmptyError,
-                     "channel %" PRId64 " may not be closed "
-                     "if not empty (try force=True)",
-                     cid);
-    }
-    else if (err == ERR_CHANNEL_MUTEX_INIT) {
-        PyErr_SetString(state->ChannelError,
-                        "can't initialize mutex for new channel");
-    }
-    else if (err == ERR_CHANNELS_MUTEX_INIT) {
-        PyErr_SetString(state->ChannelError,
-                        "can't initialize mutex for channel management");
-    }
-    else if (err == ERR_NO_NEXT_CHANNEL_ID) {
-        PyErr_SetString(state->ChannelError,
-                        "failed to get a channel ID");
-    }
-    else {
-        assert(PyErr_Occurred());
-    }
-    return 1;
-}
-
-/* the channel queue */
-
-struct _channelitem;
-
-typedef struct _channelitem {
-    _PyCrossInterpreterData *data;
-    struct _channelitem *next;
-} _channelitem;
-
-static _channelitem *
-_channelitem_new(void)
-{
-    _channelitem *item = PyMem_NEW(_channelitem, 1);
-    if (item == NULL) {
-        PyErr_NoMemory();
-        return NULL;
-    }
-    item->data = NULL;
-    item->next = NULL;
-    return item;
-}
-
-static void
-_channelitem_clear(_channelitem *item)
-{
-    if (item->data != NULL) {
-        (void)_release_xid_data(item->data, 1);
-        PyMem_Free(item->data);
-        item->data = NULL;
-    }
-    item->next = NULL;
-}
-
-static void
-_channelitem_free(_channelitem *item)
-{
-    _channelitem_clear(item);
-    PyMem_Free(item);
-}
-
-static void
-_channelitem_free_all(_channelitem *item)
-{
-    while (item != NULL) {
-        _channelitem *last = item;
-        item = item->next;
-        _channelitem_free(last);
-    }
-}
-
-static _PyCrossInterpreterData *
-_channelitem_popped(_channelitem *item)
-{
-    _PyCrossInterpreterData *data = item->data;
-    item->data = NULL;
-    _channelitem_free(item);
-    return data;
-}
-
-typedef struct _channelqueue {
-    int64_t count;
-    _channelitem *first;
-    _channelitem *last;
-} _channelqueue;
-
-static _channelqueue *
-_channelqueue_new(void)
-{
-    _channelqueue *queue = PyMem_NEW(_channelqueue, 1);
-    if (queue == NULL) {
-        PyErr_NoMemory();
-        return NULL;
-    }
-    queue->count = 0;
-    queue->first = NULL;
-    queue->last = NULL;
-    return queue;
-}
-
-static void
-_channelqueue_clear(_channelqueue *queue)
-{
-    _channelitem_free_all(queue->first);
-    queue->count = 0;
-    queue->first = NULL;
-    queue->last = NULL;
-}
-
-static void
-_channelqueue_free(_channelqueue *queue)
-{
-    _channelqueue_clear(queue);
-    PyMem_Free(queue);
-}
-
-static int
-_channelqueue_put(_channelqueue *queue, _PyCrossInterpreterData *data)
-{
-    _channelitem *item = _channelitem_new();
-    if (item == NULL) {
-        return -1;
-    }
-    item->data = data;
-
-    queue->count += 1;
-    if (queue->first == NULL) {
-        queue->first = item;
-    }
-    else {
-        queue->last->next = item;
-    }
-    queue->last = item;
-    return 0;
-}
-
-static _PyCrossInterpreterData *
-_channelqueue_get(_channelqueue *queue)
-{
-    _channelitem *item = queue->first;
-    if (item == NULL) {
-        return NULL;
-    }
-    queue->first = item->next;
-    if (queue->last == item) {
-        queue->last = NULL;
-    }
-    queue->count -= 1;
-
-    return _channelitem_popped(item);
-}
-
-/* channel-interpreter associations */
-
-struct _channelend;
-
-typedef struct _channelend {
-    struct _channelend *next;
-    int64_t interp;
-    int open;
-} _channelend;
-
-static _channelend *
-_channelend_new(int64_t interp)
-{
-    _channelend *end = PyMem_NEW(_channelend, 1);
-    if (end == NULL) {
-        PyErr_NoMemory();
-        return NULL;
-    }
-    end->next = NULL;
-    end->interp = interp;
-    end->open = 1;
-    return end;
-}
-
-static void
-_channelend_free(_channelend *end)
-{
-    PyMem_Free(end);
-}
-
-static void
-_channelend_free_all(_channelend *end)
-{
-    while (end != NULL) {
-        _channelend *last = end;
-        end = end->next;
-        _channelend_free(last);
-    }
-}
-
-static _channelend *
-_channelend_find(_channelend *first, int64_t interp, _channelend **pprev)
-{
-    _channelend *prev = NULL;
-    _channelend *end = first;
-    while (end != NULL) {
-        if (end->interp == interp) {
-            break;
-        }
-        prev = end;
-        end = end->next;
-    }
-    if (pprev != NULL) {
-        *pprev = prev;
-    }
-    return end;
-}
-
-typedef struct _channelassociations {
-    // Note that the list entries are never removed for interpreter
-    // for which the channel is closed.  This should not be a problem in
-    // practice.  Also, a channel isn't automatically closed when an
-    // interpreter is destroyed.
-    int64_t numsendopen;
-    int64_t numrecvopen;
-    _channelend *send;
-    _channelend *recv;
-} _channelends;
-
-static _channelends *
-_channelends_new(void)
-{
-    _channelends *ends = PyMem_NEW(_channelends, 1);
-    if (ends== NULL) {
-        return NULL;
-    }
-    ends->numsendopen = 0;
-    ends->numrecvopen = 0;
-    ends->send = NULL;
-    ends->recv = NULL;
-    return ends;
-}
-
-static void
-_channelends_clear(_channelends *ends)
-{
-    _channelend_free_all(ends->send);
-    ends->send = NULL;
-    ends->numsendopen = 0;
-
-    _channelend_free_all(ends->recv);
-    ends->recv = NULL;
-    ends->numrecvopen = 0;
-}
-
-static void
-_channelends_free(_channelends *ends)
-{
-    _channelends_clear(ends);
-    PyMem_Free(ends);
-}
-
-static _channelend *
-_channelends_add(_channelends *ends, _channelend *prev, int64_t interp,
-                 int send)
-{
-    _channelend *end = _channelend_new(interp);
-    if (end == NULL) {
-        return NULL;
-    }
-
-    if (prev == NULL) {
-        if (send) {
-            ends->send = end;
-        }
-        else {
-            ends->recv = end;
-        }
-    }
-    else {
-        prev->next = end;
-    }
-    if (send) {
-        ends->numsendopen += 1;
-    }
-    else {
-        ends->numrecvopen += 1;
-    }
-    return end;
-}
-
-static int
-_channelends_associate(_channelends *ends, int64_t interp, int send)
-{
-    _channelend *prev;
-    _channelend *end = _channelend_find(send ? ends->send : ends->recv,
-                                        interp, &prev);
-    if (end != NULL) {
-        if (!end->open) {
-            return ERR_CHANNEL_CLOSED;
-        }
-        // already associated
-        return 0;
-    }
-    if (_channelends_add(ends, prev, interp, send) == NULL) {
-        return -1;
-    }
-    return 0;
-}
-
-static int
-_channelends_is_open(_channelends *ends)
-{
-    if (ends->numsendopen != 0 || ends->numrecvopen != 0) {
-        return 1;
-    }
-    if (ends->send == NULL && ends->recv == NULL) {
-        return 1;
-    }
-    return 0;
-}
-
-static void
-_channelends_close_end(_channelends *ends, _channelend *end, int send)
-{
-    end->open = 0;
-    if (send) {
-        ends->numsendopen -= 1;
-    }
-    else {
-        ends->numrecvopen -= 1;
-    }
-}
-
-static int
-_channelends_close_interpreter(_channelends *ends, int64_t interp, int which)
-{
-    _channelend *prev;
-    _channelend *end;
-    if (which >= 0) {  // send/both
-        end = _channelend_find(ends->send, interp, &prev);
-        if (end == NULL) {
-            // never associated so add it
-            end = _channelends_add(ends, prev, interp, 1);
-            if (end == NULL) {
-                return -1;
-            }
-        }
-        _channelends_close_end(ends, end, 1);
-    }
-    if (which <= 0) {  // recv/both
-        end = _channelend_find(ends->recv, interp, &prev);
-        if (end == NULL) {
-            // never associated so add it
-            end = _channelends_add(ends, prev, interp, 0);
-            if (end == NULL) {
-                return -1;
-            }
-        }
-        _channelends_close_end(ends, end, 0);
-    }
-    return 0;
-}
-
-static void
-_channelends_close_all(_channelends *ends, int which, int force)
-{
-    // XXX Handle the ends.
-    // XXX Handle force is True.
-
-    // Ensure all the "send"-associated interpreters are closed.
-    _channelend *end;
-    for (end = ends->send; end != NULL; end = end->next) {
-        _channelends_close_end(ends, end, 1);
-    }
-
-    // Ensure all the "recv"-associated interpreters are closed.
-    for (end = ends->recv; end != NULL; end = end->next) {
-        _channelends_close_end(ends, end, 0);
-    }
-}
-
-/* channels */
-
-struct _channel;
-struct _channel_closing;
-static void _channel_clear_closing(struct _channel *);
-static void _channel_finish_closing(struct _channel *);
-
-typedef struct _channel {
-    PyThread_type_lock mutex;
-    _channelqueue *queue;
-    _channelends *ends;
-    int open;
-    struct _channel_closing *closing;
-} _PyChannelState;
-
-static _PyChannelState *
-_channel_new(PyThread_type_lock mutex)
-{
-    _PyChannelState *chan = PyMem_NEW(_PyChannelState, 1);
-    if (chan == NULL) {
-        return NULL;
-    }
-    chan->mutex = mutex;
-    chan->queue = _channelqueue_new();
-    if (chan->queue == NULL) {
-        PyMem_Free(chan);
-        return NULL;
-    }
-    chan->ends = _channelends_new();
-    if (chan->ends == NULL) {
-        _channelqueue_free(chan->queue);
-        PyMem_Free(chan);
-        return NULL;
-    }
-    chan->open = 1;
-    chan->closing = NULL;
-    return chan;
-}
-
-static void
-_channel_free(_PyChannelState *chan)
-{
-    _channel_clear_closing(chan);
-    PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
-    _channelqueue_free(chan->queue);
-    _channelends_free(chan->ends);
-    PyThread_release_lock(chan->mutex);
-
-    PyThread_free_lock(chan->mutex);
-    PyMem_Free(chan);
-}
-
-static int
-_channel_add(_PyChannelState *chan, int64_t interp,
-             _PyCrossInterpreterData *data)
-{
-    int res = -1;
-    PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
-
-    if (!chan->open) {
-        res = ERR_CHANNEL_CLOSED;
-        goto done;
-    }
-    if (_channelends_associate(chan->ends, interp, 1) != 0) {
-        res = ERR_CHANNEL_INTERP_CLOSED;
-        goto done;
-    }
-
-    if (_channelqueue_put(chan->queue, data) != 0) {
-        goto done;
-    }
-
-    res = 0;
-done:
-    PyThread_release_lock(chan->mutex);
-    return res;
-}
-
-static int
-_channel_next(_PyChannelState *chan, int64_t interp,
-              _PyCrossInterpreterData **res)
-{
-    int err = 0;
-    PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
-
-    if (!chan->open) {
-        err = ERR_CHANNEL_CLOSED;
-        goto done;
-    }
-    if (_channelends_associate(chan->ends, interp, 0) != 0) {
-        err = ERR_CHANNEL_INTERP_CLOSED;
-        goto done;
-    }
-
-    _PyCrossInterpreterData *data = _channelqueue_get(chan->queue);
-    if (data == NULL && !PyErr_Occurred() && chan->closing != NULL) {
-        chan->open = 0;
-    }
-    *res = data;
-
-done:
-    PyThread_release_lock(chan->mutex);
-    if (chan->queue->count == 0) {
-        _channel_finish_closing(chan);
-    }
-    return err;
-}
-
-static int
-_channel_close_interpreter(_PyChannelState *chan, int64_t interp, int end)
-{
-    PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
-
-    int res = -1;
-    if (!chan->open) {
-        res = ERR_CHANNEL_CLOSED;
-        goto done;
-    }
-
-    if (_channelends_close_interpreter(chan->ends, interp, end) != 0) {
-        goto done;
-    }
-    chan->open = _channelends_is_open(chan->ends);
-
-    res = 0;
-done:
-    PyThread_release_lock(chan->mutex);
-    return res;
-}
-
-static int
-_channel_close_all(_PyChannelState *chan, int end, int force)
-{
-    int res = -1;
-    PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
-
-    if (!chan->open) {
-        res = ERR_CHANNEL_CLOSED;
-        goto done;
-    }
-
-    if (!force && chan->queue->count > 0) {
-        res = ERR_CHANNEL_NOT_EMPTY;
-        goto done;
-    }
-
-    chan->open = 0;
-
-    // We *could* also just leave these in place, since we've marked
-    // the channel as closed already.
-    _channelends_close_all(chan->ends, end, force);
-
-    res = 0;
-done:
-    PyThread_release_lock(chan->mutex);
-    return res;
-}
-
-/* the set of channels */
-
-struct _channelref;
-
-typedef struct _channelref {
-    int64_t id;
-    _PyChannelState *chan;
-    struct _channelref *next;
-    Py_ssize_t objcount;
-} _channelref;
-
-static _channelref *
-_channelref_new(int64_t id, _PyChannelState *chan)
-{
-    _channelref *ref = PyMem_NEW(_channelref, 1);
-    if (ref == NULL) {
-        return NULL;
-    }
-    ref->id = id;
-    ref->chan = chan;
-    ref->next = NULL;
-    ref->objcount = 0;
-    return ref;
-}
-
-//static void
-//_channelref_clear(_channelref *ref)
-//{
-//    ref->id = -1;
-//    ref->chan = NULL;
-//    ref->next = NULL;
-//    ref->objcount = 0;
-//}
-
-static void
-_channelref_free(_channelref *ref)
-{
-    if (ref->chan != NULL) {
-        _channel_clear_closing(ref->chan);
-    }
-    //_channelref_clear(ref);
-    PyMem_Free(ref);
-}
-
-static _channelref *
-_channelref_find(_channelref *first, int64_t id, _channelref **pprev)
-{
-    _channelref *prev = NULL;
-    _channelref *ref = first;
-    while (ref != NULL) {
-        if (ref->id == id) {
-            break;
-        }
-        prev = ref;
-        ref = ref->next;
-    }
-    if (pprev != NULL) {
-        *pprev = prev;
-    }
-    return ref;
-}
-
-typedef struct _channels {
-    PyThread_type_lock mutex;
-    _channelref *head;
-    int64_t numopen;
-    int64_t next_id;
-} _channels;
-
-static void
-_channels_init(_channels *channels, PyThread_type_lock mutex)
-{
-    channels->mutex = mutex;
-    channels->head = NULL;
-    channels->numopen = 0;
-    channels->next_id = 0;
-}
-
-static void
-_channels_fini(_channels *channels)
-{
-    assert(channels->numopen == 0);
-    assert(channels->head == NULL);
-    if (channels->mutex != NULL) {
-        PyThread_free_lock(channels->mutex);
-        channels->mutex = NULL;
-    }
-}
-
-static int64_t
-_channels_next_id(_channels *channels)  // needs lock
-{
-    int64_t id = channels->next_id;
-    if (id < 0) {
-        /* overflow */
-        return -1;
-    }
-    channels->next_id += 1;
-    return id;
-}
-
-static int
-_channels_lookup(_channels *channels, int64_t id, PyThread_type_lock *pmutex,
-                 _PyChannelState **res)
-{
-    int err = -1;
-    _PyChannelState *chan = NULL;
-    PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
-    if (pmutex != NULL) {
-        *pmutex = NULL;
-    }
-
-    _channelref *ref = _channelref_find(channels->head, id, NULL);
-    if (ref == NULL) {
-        err = ERR_CHANNEL_NOT_FOUND;
-        goto done;
-    }
-    if (ref->chan == NULL || !ref->chan->open) {
-        err = ERR_CHANNEL_CLOSED;
-        goto done;
-    }
-
-    if (pmutex != NULL) {
-        // The mutex will be closed by the caller.
-        *pmutex = channels->mutex;
-    }
-
-    chan = ref->chan;
-    err = 0;
-
-done:
-    if (pmutex == NULL || *pmutex == NULL) {
-        PyThread_release_lock(channels->mutex);
-    }
-    *res = chan;
-    return err;
-}
-
-static int64_t
-_channels_add(_channels *channels, _PyChannelState *chan)
-{
-    int64_t cid = -1;
-    PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
-
-    // Create a new ref.
-    int64_t id = _channels_next_id(channels);
-    if (id < 0) {
-        cid = ERR_NO_NEXT_CHANNEL_ID;
-        goto done;
-    }
-    _channelref *ref = _channelref_new(id, chan);
-    if (ref == NULL) {
-        goto done;
-    }
-
-    // Add it to the list.
-    // We assume that the channel is a new one (not already in the list).
-    ref->next = channels->head;
-    channels->head = ref;
-    channels->numopen += 1;
-
-    cid = id;
-done:
-    PyThread_release_lock(channels->mutex);
-    return cid;
-}
-
-/* forward */
-static int _channel_set_closing(struct _channelref *, PyThread_type_lock);
-
-static int
-_channels_close(_channels *channels, int64_t cid, _PyChannelState **pchan,
-                int end, int force)
-{
-    int res = -1;
-    PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
-    if (pchan != NULL) {
-        *pchan = NULL;
-    }
-
-    _channelref *ref = _channelref_find(channels->head, cid, NULL);
-    if (ref == NULL) {
-        res = ERR_CHANNEL_NOT_FOUND;
-        goto done;
-    }
-
-    if (ref->chan == NULL) {
-        res = ERR_CHANNEL_CLOSED;
-        goto done;
-    }
-    else if (!force && end == CHANNEL_SEND && ref->chan->closing != NULL) {
-        res = ERR_CHANNEL_CLOSED;
-        goto done;
-    }
-    else {
-        int err = _channel_close_all(ref->chan, end, force);
-        if (err != 0) {
-            if (end == CHANNEL_SEND && err == ERR_CHANNEL_NOT_EMPTY) {
-                if (ref->chan->closing != NULL) {
-                    res = ERR_CHANNEL_CLOSED;
-                    goto done;
-                }
-                // Mark the channel as closing and return.  The channel
-                // will be cleaned up in _channel_next().
-                PyErr_Clear();
-                int err = _channel_set_closing(ref, channels->mutex);
-                if (err != 0) {
-                    res = err;
-                    goto done;
-                }
-                if (pchan != NULL) {
-                    *pchan = ref->chan;
-                }
-                res = 0;
-            }
-            else {
-                res = err;
-            }
-            goto done;
-        }
-        if (pchan != NULL) {
-            *pchan = ref->chan;
-        }
-        else  {
-            _channel_free(ref->chan);
-        }
-        ref->chan = NULL;
-    }
-
-    res = 0;
-done:
-    PyThread_release_lock(channels->mutex);
-    return res;
-}
-
-static void
-_channels_remove_ref(_channels *channels, _channelref *ref, _channelref *prev,
-                     _PyChannelState **pchan)
-{
-    if (ref == channels->head) {
-        channels->head = ref->next;
-    }
-    else {
-        prev->next = ref->next;
-    }
-    channels->numopen -= 1;
-
-    if (pchan != NULL) {
-        *pchan = ref->chan;
-    }
-    _channelref_free(ref);
-}
-
-static int
-_channels_remove(_channels *channels, int64_t id, _PyChannelState **pchan)
-{
-    int res = -1;
-    PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
-
-    if (pchan != NULL) {
-        *pchan = NULL;
-    }
-
-    _channelref *prev = NULL;
-    _channelref *ref = _channelref_find(channels->head, id, &prev);
-    if (ref == NULL) {
-        res = ERR_CHANNEL_NOT_FOUND;
-        goto done;
-    }
-
-    _channels_remove_ref(channels, ref, prev, pchan);
-
-    res = 0;
-done:
-    PyThread_release_lock(channels->mutex);
-    return res;
-}
-
-static int
-_channels_add_id_object(_channels *channels, int64_t id)
-{
-    int res = -1;
-    PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
-
-    _channelref *ref = _channelref_find(channels->head, id, NULL);
-    if (ref == NULL) {
-        res = ERR_CHANNEL_NOT_FOUND;
-        goto done;
-    }
-    ref->objcount += 1;
-
-    res = 0;
-done:
-    PyThread_release_lock(channels->mutex);
-    return res;
-}
-
-static void
-_channels_drop_id_object(_channels *channels, int64_t id)
-{
-    PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
-
-    _channelref *prev = NULL;
-    _channelref *ref = _channelref_find(channels->head, id, &prev);
-    if (ref == NULL) {
-        // Already destroyed.
-        goto done;
-    }
-    ref->objcount -= 1;
-
-    // Destroy if no longer used.
-    if (ref->objcount == 0) {
-        _PyChannelState *chan = NULL;
-        _channels_remove_ref(channels, ref, prev, &chan);
-        if (chan != NULL) {
-            _channel_free(chan);
-        }
-    }
-
-done:
-    PyThread_release_lock(channels->mutex);
-}
-
-static int64_t *
-_channels_list_all(_channels *channels, int64_t *count)
-{
-    int64_t *cids = NULL;
-    PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
-    int64_t *ids = PyMem_NEW(int64_t, (Py_ssize_t)(channels->numopen));
-    if (ids == NULL) {
-        goto done;
-    }
-    _channelref *ref = channels->head;
-    for (int64_t i=0; ref != NULL; ref = ref->next, i++) {
-        ids[i] = ref->id;
-    }
-    *count = channels->numopen;
-
-    cids = ids;
-done:
-    PyThread_release_lock(channels->mutex);
-    return cids;
-}
-
-/* support for closing non-empty channels */
-
-struct _channel_closing {
-    struct _channelref *ref;
-};
-
-static int
-_channel_set_closing(struct _channelref *ref, PyThread_type_lock mutex) {
-    struct _channel *chan = ref->chan;
-    if (chan == NULL) {
-        // already closed
-        return 0;
-    }
-    int res = -1;
-    PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
-    if (chan->closing != NULL) {
-        res = ERR_CHANNEL_CLOSED;
-        goto done;
-    }
-    chan->closing = PyMem_NEW(struct _channel_closing, 1);
-    if (chan->closing == NULL) {
-        goto done;
-    }
-    chan->closing->ref = ref;
-
-    res = 0;
-done:
-    PyThread_release_lock(chan->mutex);
-    return res;
-}
-
-static void
-_channel_clear_closing(struct _channel *chan) {
-    PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
-    if (chan->closing != NULL) {
-        PyMem_Free(chan->closing);
-        chan->closing = NULL;
-    }
-    PyThread_release_lock(chan->mutex);
-}
-
-static void
-_channel_finish_closing(struct _channel *chan) {
-    struct _channel_closing *closing = chan->closing;
-    if (closing == NULL) {
-        return;
-    }
-    _channelref *ref = closing->ref;
-    _channel_clear_closing(chan);
-    // Do the things that would have been done in _channels_close().
-    ref->chan = NULL;
-    _channel_free(chan);
-}
-
-/* "high"-level channel-related functions */
-
-static int64_t
-_channel_create(_channels *channels)
-{
-    PyThread_type_lock mutex = PyThread_allocate_lock();
-    if (mutex == NULL) {
-        return ERR_CHANNEL_MUTEX_INIT;
-    }
-    _PyChannelState *chan = _channel_new(mutex);
-    if (chan == NULL) {
-        PyThread_free_lock(mutex);
-        return -1;
-    }
-    int64_t id = _channels_add(channels, chan);
-    if (id < 0) {
-        _channel_free(chan);
-    }
-    return id;
-}
-
-static int
-_channel_destroy(_channels *channels, int64_t id)
-{
-    _PyChannelState *chan = NULL;
-    int err = _channels_remove(channels, id, &chan);
-    if (err != 0) {
-        return err;
-    }
-    if (chan != NULL) {
-        _channel_free(chan);
-    }
-    return 0;
-}
-
-static int
-_channel_send(_channels *channels, int64_t id, PyObject *obj)
-{
-    PyInterpreterState *interp = _get_current_interp();
-    if (interp == NULL) {
-        return -1;
-    }
-
-    // Look up the channel.
-    PyThread_type_lock mutex = NULL;
-    _PyChannelState *chan = NULL;
-    int err = _channels_lookup(channels, id, &mutex, &chan);
-    if (err != 0) {
-        return err;
-    }
-    assert(chan != NULL);
-    // Past this point we are responsible for releasing the mutex.
-
-    if (chan->closing != NULL) {
-        PyThread_release_lock(mutex);
-        return ERR_CHANNEL_CLOSED;
-    }
-
-    // Convert the object to cross-interpreter data.
-    _PyCrossInterpreterData *data = PyMem_NEW(_PyCrossInterpreterData, 1);
-    if (data == NULL) {
-        PyThread_release_lock(mutex);
-        return -1;
-    }
-    if (_PyObject_GetCrossInterpreterData(obj, data) != 0) {
-        PyThread_release_lock(mutex);
-        PyMem_Free(data);
-        return -1;
-    }
-
-    // Add the data to the channel.
-    int res = _channel_add(chan, PyInterpreterState_GetID(interp), data);
-    PyThread_release_lock(mutex);
-    if (res != 0) {
-        // We may chain an exception here:
-        (void)_release_xid_data(data, 0);
-        PyMem_Free(data);
-        return res;
-    }
-
-    return 0;
-}
-
-static int
-_channel_recv(_channels *channels, int64_t id, PyObject **res)
-{
-    int err;
-    *res = NULL;
-
-    PyInterpreterState *interp = _get_current_interp();
-    if (interp == NULL) {
-        // XXX Is this always an error?
-        if (PyErr_Occurred()) {
-            return -1;
-        }
-        return 0;
-    }
-
-    // Look up the channel.
-    PyThread_type_lock mutex = NULL;
-    _PyChannelState *chan = NULL;
-    err = _channels_lookup(channels, id, &mutex, &chan);
-    if (err != 0) {
-        return err;
-    }
-    assert(chan != NULL);
-    // Past this point we are responsible for releasing the mutex.
-
-    // Pop off the next item from the channel.
-    _PyCrossInterpreterData *data = NULL;
-    err = _channel_next(chan, PyInterpreterState_GetID(interp), &data);
-    PyThread_release_lock(mutex);
-    if (err != 0) {
-        return err;
-    }
-    else if (data == NULL) {
-        assert(!PyErr_Occurred());
-        return 0;
-    }
-
-    // Convert the data back to an object.
-    PyObject *obj = _PyCrossInterpreterData_NewObject(data);
-    if (obj == NULL) {
-        assert(PyErr_Occurred());
-        (void)_release_xid_data(data, 1);
-        PyMem_Free(data);
-        return -1;
-    }
-    int release_res = _release_xid_data(data, 0);
-    PyMem_Free(data);
-    if (release_res < 0) {
-        // The source interpreter has been destroyed already.
-        assert(PyErr_Occurred());
-        Py_DECREF(obj);
-        return -1;
-    }
-
-    *res = obj;
-    return 0;
-}
-
-static int
-_channel_drop(_channels *channels, int64_t id, int send, int recv)
-{
-    PyInterpreterState *interp = _get_current_interp();
-    if (interp == NULL) {
-        return -1;
-    }
-
-    // Look up the channel.
-    PyThread_type_lock mutex = NULL;
-    _PyChannelState *chan = NULL;
-    int err = _channels_lookup(channels, id, &mutex, &chan);
-    if (err != 0) {
-        return err;
-    }
-    // Past this point we are responsible for releasing the mutex.
-
-    // Close one or both of the two ends.
-    int res = _channel_close_interpreter(chan, PyInterpreterState_GetID(interp), send-recv);
-    PyThread_release_lock(mutex);
-    return res;
-}
-
-static int
-_channel_close(_channels *channels, int64_t id, int end, int force)
-{
-    return _channels_close(channels, id, NULL, end, force);
-}
-
-static int
-_channel_is_associated(_channels *channels, int64_t cid, int64_t interp,
-                       int send)
-{
-    _PyChannelState *chan = NULL;
-    int err = _channels_lookup(channels, cid, NULL, &chan);
-    if (err != 0) {
-        return err;
-    }
-    else if (send && chan->closing != NULL) {
-        return ERR_CHANNEL_CLOSED;
-    }
-
-    _channelend *end = _channelend_find(send ? chan->ends->send : chan->ends->recv,
-                                        interp, NULL);
-
-    return (end != NULL && end->open);
-}
-
-/* ChannelID class */
-
-typedef struct channelid {
-    PyObject_HEAD
-    int64_t id;
-    int end;
-    int resolve;
-    _channels *channels;
-} channelid;
-
-struct channel_id_converter_data {
-    PyObject *module;
-    int64_t cid;
-};
-
-static int
-channel_id_converter(PyObject *arg, void *ptr)
-{
-    int64_t cid;
-    struct channel_id_converter_data *data = ptr;
-    module_state *state = get_module_state(data->module);
-    assert(state != NULL);
-    if (PyObject_TypeCheck(arg, state->ChannelIDType)) {
-        cid = ((channelid *)arg)->id;
-    }
-    else if (PyIndex_Check(arg)) {
-        cid = PyLong_AsLongLong(arg);
-        if (cid == -1 && PyErr_Occurred()) {
-            return 0;
-        }
-        if (cid < 0) {
-            PyErr_Format(PyExc_ValueError,
-                        "channel ID must be a non-negative int, got %R", arg);
-            return 0;
-        }
-    }
-    else {
-        PyErr_Format(PyExc_TypeError,
-                     "channel ID must be an int, got %.100s",
-                     Py_TYPE(arg)->tp_name);
-        return 0;
-    }
-    data->cid = cid;
-    return 1;
-}
-
-static int
-newchannelid(PyTypeObject *cls, int64_t cid, int end, _channels *channels,
-             int force, int resolve, channelid **res)
-{
-    *res = NULL;
-
-    channelid *self = PyObject_New(channelid, cls);
-    if (self == NULL) {
-        return -1;
-    }
-    self->id = cid;
-    self->end = end;
-    self->resolve = resolve;
-    self->channels = channels;
-
-    int err = _channels_add_id_object(channels, cid);
-    if (err != 0) {
-        if (force && err == ERR_CHANNEL_NOT_FOUND) {
-            assert(!PyErr_Occurred());
-        }
-        else {
-            Py_DECREF((PyObject *)self);
-            return err;
-        }
-    }
-
-    *res = self;
-    return 0;
-}
-
-static _channels * _global_channels(void);
-
-static PyObject *
-_channelid_new(PyObject *mod, PyTypeObject *cls,
-               PyObject *args, PyObject *kwds)
-{
-    static char *kwlist[] = {"id", "send", "recv", "force", "_resolve", NULL};
-    int64_t cid;
-    struct channel_id_converter_data cid_data = {
-        .module = mod,
-    };
-    int send = -1;
-    int recv = -1;
-    int force = 0;
-    int resolve = 0;
-    if (!PyArg_ParseTupleAndKeywords(args, kwds,
-                                     "O&|$pppp:ChannelID.__new__", kwlist,
-                                     channel_id_converter, &cid_data,
-                                     &send, &recv, &force, &resolve)) {
-        return NULL;
-    }
-    cid = cid_data.cid;
-
-    // Handle "send" and "recv".
-    if (send == 0 && recv == 0) {
-        PyErr_SetString(PyExc_ValueError,
-                        "'send' and 'recv' cannot both be False");
-        return NULL;
-    }
-
-    int end = 0;
-    if (send == 1) {
-        if (recv == 0 || recv == -1) {
-            end = CHANNEL_SEND;
-        }
-    }
-    else if (recv == 1) {
-        end = CHANNEL_RECV;
-    }
-
-    PyObject *id = NULL;
-    int err = newchannelid(cls, cid, end, _global_channels(),
-                           force, resolve,
-                           (channelid **)&id);
-    if (handle_channel_error(err, mod, cid)) {
-        assert(id == NULL);
-        return NULL;
-    }
-    assert(id != NULL);
-    return id;
-}
-
-static void
-channelid_dealloc(PyObject *self)
-{
-    int64_t cid = ((channelid *)self)->id;
-    _channels *channels = ((channelid *)self)->channels;
-
-    PyTypeObject *tp = Py_TYPE(self);
-    tp->tp_free(self);
-    /* "Instances of heap-allocated types hold a reference to their type."
-     * See: https://docs.python.org/3.11/howto/isolating-extensions.html#garbage-collection-protocol
-     * See: https://docs.python.org/3.11/c-api/typeobj.html#c.PyTypeObject.tp_traverse
-    */
-    // XXX Why don't we implement Py_TPFLAGS_HAVE_GC, e.g. Py_tp_traverse,
-    // like we do for _abc._abc_data?
-    Py_DECREF(tp);
-
-    _channels_drop_id_object(channels, cid);
-}
-
-static PyObject *
-channelid_repr(PyObject *self)
-{
-    PyTypeObject *type = Py_TYPE(self);
-    const char *name = _PyType_Name(type);
-
-    channelid *cid = (channelid *)self;
-    const char *fmt;
-    if (cid->end == CHANNEL_SEND) {
-        fmt = "%s(%" PRId64 ", send=True)";
-    }
-    else if (cid->end == CHANNEL_RECV) {
-        fmt = "%s(%" PRId64 ", recv=True)";
-    }
-    else {
-        fmt = "%s(%" PRId64 ")";
-    }
-    return PyUnicode_FromFormat(fmt, name, cid->id);
-}
-
-static PyObject *
-channelid_str(PyObject *self)
-{
-    channelid *cid = (channelid *)self;
-    return PyUnicode_FromFormat("%" PRId64 "", cid->id);
-}
-
-static PyObject *
-channelid_int(PyObject *self)
-{
-    channelid *cid = (channelid *)self;
-    return PyLong_FromLongLong(cid->id);
-}
-
-static Py_hash_t
-channelid_hash(PyObject *self)
-{
-    channelid *cid = (channelid *)self;
-    PyObject *id = PyLong_FromLongLong(cid->id);
-    if (id == NULL) {
-        return -1;
-    }
-    Py_hash_t hash = PyObject_Hash(id);
-    Py_DECREF(id);
-    return hash;
-}
-
-static PyObject *
-channelid_richcompare(PyObject *self, PyObject *other, int op)
-{
-    PyObject *res = NULL;
-    if (op != Py_EQ && op != Py_NE) {
-        Py_RETURN_NOTIMPLEMENTED;
-    }
-
-    PyObject *mod = get_module_from_type(Py_TYPE(self));
-    if (mod == NULL) {
-        return NULL;
-    }
-    module_state *state = get_module_state(mod);
-    if (state == NULL) {
-        goto done;
-    }
-
-    if (!PyObject_TypeCheck(self, state->ChannelIDType)) {
-        res = Py_NewRef(Py_NotImplemented);
-        goto done;
-    }
-
-    channelid *cid = (channelid *)self;
-    int equal;
-    if (PyObject_TypeCheck(other, state->ChannelIDType)) {
-        channelid *othercid = (channelid *)other;
-        equal = (cid->end == othercid->end) && (cid->id == othercid->id);
-    }
-    else if (PyLong_Check(other)) {
-        /* Fast path */
-        int overflow;
-        long long othercid = PyLong_AsLongLongAndOverflow(other, &overflow);
-        if (othercid == -1 && PyErr_Occurred()) {
-            goto done;
-        }
-        equal = !overflow && (othercid >= 0) && (cid->id == othercid);
-    }
-    else if (PyNumber_Check(other)) {
-        PyObject *pyid = PyLong_FromLongLong(cid->id);
-        if (pyid == NULL) {
-            goto done;
-        }
-        res = PyObject_RichCompare(pyid, other, op);
-        Py_DECREF(pyid);
-        goto done;
-    }
-    else {
-        res = Py_NewRef(Py_NotImplemented);
-        goto done;
-    }
-
-    if ((op == Py_EQ && equal) || (op == Py_NE && !equal)) {
-        res = Py_NewRef(Py_True);
-    }
-    else {
-        res = Py_NewRef(Py_False);
-    }
-
-done:
-    Py_DECREF(mod);
-    return res;
-}
-
-static PyObject *
-_channel_from_cid(PyObject *cid, int end)
-{
-    PyObject *highlevel = PyImport_ImportModule("interpreters");
-    if (highlevel == NULL) {
-        PyErr_Clear();
-        highlevel = PyImport_ImportModule("test.support.interpreters");
-        if (highlevel == NULL) {
-            return NULL;
-        }
-    }
-    const char *clsname = (end == CHANNEL_RECV) ? "RecvChannel" :
-                                                  "SendChannel";
-    PyObject *cls = PyObject_GetAttrString(highlevel, clsname);
-    Py_DECREF(highlevel);
-    if (cls == NULL) {
-        return NULL;
-    }
-    PyObject *chan = PyObject_CallFunctionObjArgs(cls, cid, NULL);
-    Py_DECREF(cls);
-    if (chan == NULL) {
-        return NULL;
-    }
-    return chan;
-}
-
-struct _channelid_xid {
-    int64_t id;
-    int end;
-    int resolve;
-};
-
-static PyObject *
-_channelid_from_xid(_PyCrossInterpreterData *data)
-{
-    struct _channelid_xid *xid = (struct _channelid_xid *)data->data;
-
-    PyObject *mod = _get_current_module();
-    if (mod == NULL) {
-        return NULL;
-    }
-    module_state *state = get_module_state(mod);
-    if (state == NULL) {
-        return NULL;
+        goto finally;
     }
 
-    // Note that we do not preserve the "resolve" flag.
-    PyObject *cid = NULL;
-    int err = newchannelid(state->ChannelIDType, xid->id, xid->end,
-                           _global_channels(), 0, 0,
-                           (channelid **)&cid);
-    if (err != 0) {
-        assert(cid == NULL);
-        (void)handle_channel_error(err, mod, xid->id);
-        goto done;
-    }
-    assert(cid != NULL);
-    if (xid->end == 0) {
-        goto done;
+    PyObject *name = PyUnicode_FromFormat("%S", exctype);
+    if (name == NULL) {
+        failure = "unable to format exception type name";
+        goto finally;
     }
-    if (!xid->resolve) {
-        goto done;
+    err->name = _copy_raw_string(name);
+    Py_DECREF(name);
+    if (err->name == NULL) {
+        if (PyErr_ExceptionMatches(PyExc_MemoryError)) {
+            failure = "out of memory copying exception type name";
+        } else {
+            failure = "unable to encode and copy exception type name";
+        }
+        goto finally;
     }
 
-    /* Try returning a high-level channel end but fall back to the ID. */
-    PyObject *chan = _channel_from_cid(cid, xid->end);
-    if (chan == NULL) {
-        PyErr_Clear();
-        goto done;
+    if (exc != NULL) {
+        PyObject *msg = PyUnicode_FromFormat("%S", exc);
+        if (msg == NULL) {
+            failure = "unable to format exception message";
+            goto finally;
+        }
+        err->msg = _copy_raw_string(msg);
+        Py_DECREF(msg);
+        if (err->msg == NULL) {
+            if (PyErr_ExceptionMatches(PyExc_MemoryError)) {
+                failure = "out of memory copying exception message";
+            } else {
+                failure = "unable to encode and copy exception message";
+            }
+            goto finally;
+        }
     }
-    Py_DECREF(cid);
-    cid = chan;
 
-done:
-    Py_DECREF(mod);
-    return cid;
-}
-
-static int
-_channelid_shared(PyThreadState *tstate, PyObject *obj,
-                  _PyCrossInterpreterData *data)
-{
-    if (_PyCrossInterpreterData_InitWithSize(
-            data, tstate->interp, sizeof(struct _channelid_xid), obj,
-            _channelid_from_xid
-            ) < 0)
-    {
-        return -1;
+finally:
+    if (failure != NULL) {
+        PyErr_Clear();
+        if (err->name != NULL) {
+            PyMem_Free(err->name);
+            err->name = NULL;
+        }
+        err->msg = failure;
     }
-    struct _channelid_xid *xid = (struct _channelid_xid *)data->data;
-    xid->id = ((channelid *)obj)->id;
-    xid->end = ((channelid *)obj)->end;
-    xid->resolve = ((channelid *)obj)->resolve;
-    return 0;
+    return err;
 }
 
-static PyObject *
-channelid_end(PyObject *self, void *end)
+static void
+_sharedexception_apply(_sharedexception *exc, PyObject *wrapperclass)
 {
-    int force = 1;
-    channelid *cid = (channelid *)self;
-    if (end != NULL) {
-        PyObject *id = NULL;
-        int err = newchannelid(Py_TYPE(self), cid->id, *(int *)end,
-                               cid->channels, force, cid->resolve,
-                               (channelid **)&id);
-        if (err != 0) {
-            assert(id == NULL);
-            PyObject *mod = get_module_from_type(Py_TYPE(self));
-            if (mod == NULL) {
-                return NULL;
-            }
-            (void)handle_channel_error(err, mod, cid->id);
-            Py_DECREF(mod);
-            return NULL;
+    if (exc->name != NULL) {
+        if (exc->msg != NULL) {
+            PyErr_Format(wrapperclass, "%s: %s",  exc->name, exc->msg);
+        }
+        else {
+            PyErr_SetString(wrapperclass, exc->name);
         }
-        assert(id != NULL);
-        return id;
     }
-
-    if (cid->end == CHANNEL_SEND) {
-        return PyUnicode_InternFromString("send");
+    else if (exc->msg != NULL) {
+        PyErr_SetString(wrapperclass, exc->msg);
     }
-    if (cid->end == CHANNEL_RECV) {
-        return PyUnicode_InternFromString("recv");
+    else {
+        PyErr_SetNone(wrapperclass);
     }
-    return PyUnicode_InternFromString("both");
 }
 
-static int _channelid_end_send = CHANNEL_SEND;
-static int _channelid_end_recv = CHANNEL_RECV;
-
-static PyGetSetDef channelid_getsets[] = {
-    {"end", (getter)channelid_end, NULL,
-     PyDoc_STR("'send', 'recv', or 'both'")},
-    {"send", (getter)channelid_end, NULL,
-     PyDoc_STR("the 'send' end of the channel"), &_channelid_end_send},
-    {"recv", (getter)channelid_end, NULL,
-     PyDoc_STR("the 'recv' end of the channel"), &_channelid_end_recv},
-    {NULL}
-};
-
-PyDoc_STRVAR(channelid_doc,
-"A channel ID identifies a channel and may be used as an int.");
-
-static PyType_Slot ChannelIDType_slots[] = {
-    {Py_tp_dealloc, (destructor)channelid_dealloc},
-    {Py_tp_doc, (void *)channelid_doc},
-    {Py_tp_repr, (reprfunc)channelid_repr},
-    {Py_tp_str, (reprfunc)channelid_str},
-    {Py_tp_hash, channelid_hash},
-    {Py_tp_richcompare, channelid_richcompare},
-    {Py_tp_getset, channelid_getsets},
-    // number slots
-    {Py_nb_int, (unaryfunc)channelid_int},
-    {Py_nb_index,  (unaryfunc)channelid_int},
-    {0, NULL},
-};
-
-static PyType_Spec ChannelIDType_spec = {
-    .name = "_xxsubinterpreters.ChannelID",
-    .basicsize = sizeof(channelid),
-    .flags = (Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE |
-              Py_TPFLAGS_DISALLOW_INSTANTIATION | Py_TPFLAGS_IMMUTABLETYPE),
-    .slots = ChannelIDType_slots,
-};
-
 
 /* interpreter-specific code ************************************************/
 
 static int
-interp_exceptions_init(PyObject *mod)
+exceptions_init(PyObject *mod)
 {
     module_state *state = get_module_state(mod);
     if (state == NULL) {
@@ -2145,24 +430,12 @@ _ensure_not_running(PyInterpreterState *interp)
 
 static int
 _run_script(PyInterpreterState *interp, const char *codestr,
-            _sharedns *shared, int needs_import,
-            _sharedexception **exc)
+            _sharedns *shared, _sharedexception **exc)
 {
     PyObject *exctype = NULL;
     PyObject *excval = NULL;
     PyObject *tb = NULL;
 
-    if (needs_import) {
-        // It might not have been imported yet in the current interpreter.
-        // However, it will (almost) always have been imported already
-        // in the main interpreter.
-        PyObject *mod = PyImport_ImportModule(MODULE_NAME);
-        if (mod == NULL) {
-            goto error;
-        }
-        Py_DECREF(mod);
-    }
-
     PyObject *main_mod = _PyInterpreterState_GetMainModule(interp);
     if (main_mod == NULL) {
         goto error;
@@ -2223,9 +496,7 @@ _run_script_in_interpreter(PyObject *mod, PyInterpreterState *interp,
     }
     module_state *state = get_module_state(mod);
 
-    int needs_import = 0;
-    _sharedns *shared = _get_shared_ns(shareables, state->ChannelIDType,
-                                       &needs_import);
+    _sharedns *shared = _get_shared_ns(shareables);
     if (shared == NULL && PyErr_Occurred()) {
         return -1;
     }
@@ -2241,7 +512,7 @@ _run_script_in_interpreter(PyObject *mod, PyInterpreterState *interp,
 
     // Run the script.
     _sharedexception *exc = NULL;
-    int result = _run_script(interp, codestr, shared, needs_import, &exc);
+    int result = _run_script(interp, codestr, shared, &exc);
 
     // Switch back.
     if (save_tstate != NULL) {
@@ -2269,50 +540,6 @@ _run_script_in_interpreter(PyObject *mod, PyInterpreterState *interp,
 
 /* module level code ********************************************************/
 
-/* globals is the process-global state for the module.  It holds all
-   the data that we need to share between interpreters, so it cannot
-   hold PyObject values. */
-static struct globals {
-    int module_count;
-    _channels channels;
-} _globals = {0};
-
-static int
-_globals_init(void)
-{
-    // XXX This isn't thread-safe.
-    _globals.module_count++;
-    if (_globals.module_count > 1) {
-        // Already initialized.
-        return 0;
-    }
-
-    assert(_globals.channels.mutex == NULL);
-    PyThread_type_lock mutex = PyThread_allocate_lock();
-    if (mutex == NULL) {
-        return ERR_CHANNELS_MUTEX_INIT;
-    }
-    _channels_init(&_globals.channels, mutex);
-    return 0;
-}
-
-static void
-_globals_fini(void)
-{
-    // XXX This isn't thread-safe.
-    _globals.module_count--;
-    if (_globals.module_count > 0) {
-        return;
-    }
-
-    _channels_fini(&_globals.channels);
-}
-
-static _channels *
-_global_channels(void) {
-    return &_globals.channels;
-}
-
 static PyObject *
 interp_create(PyObject *self, PyObject *args, PyObject *kwds)
 {
@@ -2578,358 +805,6 @@ PyDoc_STRVAR(is_running_doc,
 \n\
 Return whether or not the identified interpreter is running.");
 
-static PyObject *
-channel_create(PyObject *self, PyObject *Py_UNUSED(ignored))
-{
-    int64_t cid = _channel_create(&_globals.channels);
-    if (cid < 0) {
-        (void)handle_channel_error(-1, self, cid);
-        return NULL;
-    }
-    module_state *state = get_module_state(self);
-    if (state == NULL) {
-        return NULL;
-    }
-    PyObject *id = NULL;
-    int err = newchannelid(state->ChannelIDType, cid, 0,
-                           &_globals.channels, 0, 0,
-                           (channelid **)&id);
-    if (handle_channel_error(err, self, cid)) {
-        assert(id == NULL);
-        err = _channel_destroy(&_globals.channels, cid);
-        if (handle_channel_error(err, self, cid)) {
-            // XXX issue a warning?
-        }
-        return NULL;
-    }
-    assert(id != NULL);
-    assert(((channelid *)id)->channels != NULL);
-    return id;
-}
-
-PyDoc_STRVAR(channel_create_doc,
-"channel_create() -> cid\n\
-\n\
-Create a new cross-interpreter channel and return a unique generated ID.");
-
-static PyObject *
-channel_destroy(PyObject *self, PyObject *args, PyObject *kwds)
-{
-    static char *kwlist[] = {"cid", NULL};
-    int64_t cid;
-    struct channel_id_converter_data cid_data = {
-        .module = self,
-    };
-    if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&:channel_destroy", kwlist,
-                                     channel_id_converter, &cid_data)) {
-        return NULL;
-    }
-    cid = cid_data.cid;
-
-    int err = _channel_destroy(&_globals.channels, cid);
-    if (handle_channel_error(err, self, cid)) {
-        return NULL;
-    }
-    Py_RETURN_NONE;
-}
-
-PyDoc_STRVAR(channel_destroy_doc,
-"channel_destroy(cid)\n\
-\n\
-Close and finalize the channel.  Afterward attempts to use the channel\n\
-will behave as though it never existed.");
-
-static PyObject *
-channel_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
-{
-    int64_t count = 0;
-    int64_t *cids = _channels_list_all(&_globals.channels, &count);
-    if (cids == NULL) {
-        if (count == 0) {
-            return PyList_New(0);
-        }
-        return NULL;
-    }
-    PyObject *ids = PyList_New((Py_ssize_t)count);
-    if (ids == NULL) {
-        goto finally;
-    }
-    module_state *state = get_module_state(self);
-    if (state == NULL) {
-        Py_DECREF(ids);
-        ids = NULL;
-        goto finally;
-    }
-    int64_t *cur = cids;
-    for (int64_t i=0; i < count; cur++, i++) {
-        PyObject *id = NULL;
-        int err = newchannelid(state->ChannelIDType, *cur, 0,
-                               &_globals.channels, 0, 0,
-                               (channelid **)&id);
-        if (handle_channel_error(err, self, *cur)) {
-            assert(id == NULL);
-            Py_SETREF(ids, NULL);
-            break;
-        }
-        assert(id != NULL);
-        PyList_SET_ITEM(ids, (Py_ssize_t)i, id);
-    }
-
-finally:
-    PyMem_Free(cids);
-    return ids;
-}
-
-PyDoc_STRVAR(channel_list_all_doc,
-"channel_list_all() -> [cid]\n\
-\n\
-Return the list of all IDs for active channels.");
-
-static PyObject *
-channel_list_interpreters(PyObject *self, PyObject *args, PyObject *kwds)
-{
-    static char *kwlist[] = {"cid", "send", NULL};
-    int64_t cid;            /* Channel ID */
-    struct channel_id_converter_data cid_data = {
-        .module = self,
-    };
-    int send = 0;           /* Send or receive end? */
-    int64_t id;
-    PyObject *ids, *id_obj;
-    PyInterpreterState *interp;
-
-    if (!PyArg_ParseTupleAndKeywords(
-            args, kwds, "O&$p:channel_list_interpreters",
-            kwlist, channel_id_converter, &cid_data, &send)) {
-        return NULL;
-    }
-    cid = cid_data.cid;
-
-    ids = PyList_New(0);
-    if (ids == NULL) {
-        goto except;
-    }
-
-    interp = PyInterpreterState_Head();
-    while (interp != NULL) {
-        id = PyInterpreterState_GetID(interp);
-        assert(id >= 0);
-        int res = _channel_is_associated(&_globals.channels, cid, id, send);
-        if (res < 0) {
-            (void)handle_channel_error(res, self, cid);
-            goto except;
-        }
-        if (res) {
-            id_obj = _PyInterpreterState_GetIDObject(interp);
-            if (id_obj == NULL) {
-                goto except;
-            }
-            res = PyList_Insert(ids, 0, id_obj);
-            Py_DECREF(id_obj);
-            if (res < 0) {
-                goto except;
-            }
-        }
-        interp = PyInterpreterState_Next(interp);
-    }
-
-    goto finally;
-
-except:
-    Py_CLEAR(ids);
-
-finally:
-    return ids;
-}
-
-PyDoc_STRVAR(channel_list_interpreters_doc,
-"channel_list_interpreters(cid, *, send) -> [id]\n\
-\n\
-Return the list of all interpreter IDs associated with an end of the channel.\n\
-\n\
-The 'send' argument should be a boolean indicating whether to use the send or\n\
-receive end.");
-
-
-static PyObject *
-channel_send(PyObject *self, PyObject *args, PyObject *kwds)
-{
-    static char *kwlist[] = {"cid", "obj", NULL};
-    int64_t cid;
-    struct channel_id_converter_data cid_data = {
-        .module = self,
-    };
-    PyObject *obj;
-    if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O:channel_send", kwlist,
-                                     channel_id_converter, &cid_data, &obj)) {
-        return NULL;
-    }
-    cid = cid_data.cid;
-
-    int err = _channel_send(&_globals.channels, cid, obj);
-    if (handle_channel_error(err, self, cid)) {
-        return NULL;
-    }
-    Py_RETURN_NONE;
-}
-
-PyDoc_STRVAR(channel_send_doc,
-"channel_send(cid, obj)\n\
-\n\
-Add the object's data to the channel's queue.");
-
-static PyObject *
-channel_recv(PyObject *self, PyObject *args, PyObject *kwds)
-{
-    static char *kwlist[] = {"cid", "default", NULL};
-    int64_t cid;
-    struct channel_id_converter_data cid_data = {
-        .module = self,
-    };
-    PyObject *dflt = NULL;
-    if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&|O:channel_recv", kwlist,
-                                     channel_id_converter, &cid_data, &dflt)) {
-        return NULL;
-    }
-    cid = cid_data.cid;
-
-    PyObject *obj = NULL;
-    int err = _channel_recv(&_globals.channels, cid, &obj);
-    if (handle_channel_error(err, self, cid)) {
-        return NULL;
-    }
-    Py_XINCREF(dflt);
-    if (obj == NULL) {
-        // Use the default.
-        if (dflt == NULL) {
-            (void)handle_channel_error(ERR_CHANNEL_EMPTY, self, cid);
-            return NULL;
-        }
-        obj = Py_NewRef(dflt);
-    }
-    Py_XDECREF(dflt);
-    return obj;
-}
-
-PyDoc_STRVAR(channel_recv_doc,
-"channel_recv(cid, [default]) -> obj\n\
-\n\
-Return a new object from the data at the front of the channel's queue.\n\
-\n\
-If there is nothing to receive then raise ChannelEmptyError, unless\n\
-a default value is provided.  In that case return it.");
-
-static PyObject *
-channel_close(PyObject *self, PyObject *args, PyObject *kwds)
-{
-    static char *kwlist[] = {"cid", "send", "recv", "force", NULL};
-    int64_t cid;
-    struct channel_id_converter_data cid_data = {
-        .module = self,
-    };
-    int send = 0;
-    int recv = 0;
-    int force = 0;
-    if (!PyArg_ParseTupleAndKeywords(args, kwds,
-                                     "O&|$ppp:channel_close", kwlist,
-                                     channel_id_converter, &cid_data,
-                                     &send, &recv, &force)) {
-        return NULL;
-    }
-    cid = cid_data.cid;
-
-    int err = _channel_close(&_globals.channels, cid, send-recv, force);
-    if (handle_channel_error(err, self, cid)) {
-        return NULL;
-    }
-    Py_RETURN_NONE;
-}
-
-PyDoc_STRVAR(channel_close_doc,
-"channel_close(cid, *, send=None, recv=None, force=False)\n\
-\n\
-Close the channel for all interpreters.\n\
-\n\
-If the channel is empty then the keyword args are ignored and both\n\
-ends are immediately closed.  Otherwise, if 'force' is True then\n\
-all queued items are released and both ends are immediately\n\
-closed.\n\
-\n\
-If the channel is not empty *and* 'force' is False then following\n\
-happens:\n\
-\n\
- * recv is True (regardless of send):\n\
-   - raise ChannelNotEmptyError\n\
- * recv is None and send is None:\n\
-   - raise ChannelNotEmptyError\n\
- * send is True and recv is not True:\n\
-   - fully close the 'send' end\n\
-   - close the 'recv' end to interpreters not already receiving\n\
-   - fully close it once empty\n\
-\n\
-Closing an already closed channel results in a ChannelClosedError.\n\
-\n\
-Once the channel's ID has no more ref counts in any interpreter\n\
-the channel will be destroyed.");
-
-static PyObject *
-channel_release(PyObject *self, PyObject *args, PyObject *kwds)
-{
-    // Note that only the current interpreter is affected.
-    static char *kwlist[] = {"cid", "send", "recv", "force", NULL};
-    int64_t cid;
-    struct channel_id_converter_data cid_data = {
-        .module = self,
-    };
-    int send = 0;
-    int recv = 0;
-    int force = 0;
-    if (!PyArg_ParseTupleAndKeywords(args, kwds,
-                                     "O&|$ppp:channel_release", kwlist,
-                                     channel_id_converter, &cid_data,
-                                     &send, &recv, &force)) {
-        return NULL;
-    }
-    cid = cid_data.cid;
-    if (send == 0 && recv == 0) {
-        send = 1;
-        recv = 1;
-    }
-
-    // XXX Handle force is True.
-    // XXX Fix implicit release.
-
-    int err = _channel_drop(&_globals.channels, cid, send, recv);
-    if (handle_channel_error(err, self, cid)) {
-        return NULL;
-    }
-    Py_RETURN_NONE;
-}
-
-PyDoc_STRVAR(channel_release_doc,
-"channel_release(cid, *, send=None, recv=None, force=True)\n\
-\n\
-Close the channel for the current interpreter.  'send' and 'recv'\n\
-(bool) may be used to indicate the ends to close.  By default both\n\
-ends are closed.  Closing an already closed end is a noop.");
-
-static PyObject *
-channel__channel_id(PyObject *self, PyObject *args, PyObject *kwds)
-{
-    module_state *state = get_module_state(self);
-    if (state == NULL) {
-        return NULL;
-    }
-    PyTypeObject *cls = state->ChannelIDType;
-    PyObject *mod = get_module_from_owned_type(cls);
-    if (mod == NULL) {
-        return NULL;
-    }
-    PyObject *cid = _channelid_new(mod, cls, args, kwds);
-    Py_DECREF(mod);
-    return cid;
-}
-
 static PyMethodDef module_functions[] = {
     {"create",                    _PyCFunction_CAST(interp_create),
      METH_VARARGS | METH_KEYWORDS, create_doc},
@@ -2941,6 +816,7 @@ static PyMethodDef module_functions[] = {
      METH_NOARGS, get_current_doc},
     {"get_main",                  interp_get_main,
      METH_NOARGS, get_main_doc},
+
     {"is_running",                _PyCFunction_CAST(interp_is_running),
      METH_VARARGS | METH_KEYWORDS, is_running_doc},
     {"run_string",                _PyCFunction_CAST(interp_run_string),
@@ -2949,25 +825,6 @@ static PyMethodDef module_functions[] = {
     {"is_shareable",              _PyCFunction_CAST(object_is_shareable),
      METH_VARARGS | METH_KEYWORDS, is_shareable_doc},
 
-    {"channel_create",            channel_create,
-     METH_NOARGS, channel_create_doc},
-    {"channel_destroy",           _PyCFunction_CAST(channel_destroy),
-     METH_VARARGS | METH_KEYWORDS, channel_destroy_doc},
-    {"channel_list_all",          channel_list_all,
-     METH_NOARGS, channel_list_all_doc},
-    {"channel_list_interpreters", _PyCFunction_CAST(channel_list_interpreters),
-     METH_VARARGS | METH_KEYWORDS, channel_list_interpreters_doc},
-    {"channel_send",              _PyCFunction_CAST(channel_send),
-     METH_VARARGS | METH_KEYWORDS, channel_send_doc},
-    {"channel_recv",              _PyCFunction_CAST(channel_recv),
-     METH_VARARGS | METH_KEYWORDS, channel_recv_doc},
-    {"channel_close",             _PyCFunction_CAST(channel_close),
-     METH_VARARGS | METH_KEYWORDS, channel_close_doc},
-    {"channel_release",           _PyCFunction_CAST(channel_release),
-     METH_VARARGS | METH_KEYWORDS, channel_release_doc},
-    {"_channel_id",               _PyCFunction_CAST(channel__channel_id),
-     METH_VARARGS | METH_KEYWORDS, NULL},
-
     {NULL,                        NULL}           /* sentinel */
 };
 
@@ -2981,29 +838,8 @@ The 'interpreters' module provides a more convenient interface.");
 static int
 module_exec(PyObject *mod)
 {
-    if (_globals_init() != 0) {
-        return -1;
-    }
-
-    module_state *state = get_module_state(mod);
-    if (state == NULL) {
-        goto error;
-    }
-
     /* Add exception types */
-    if (interp_exceptions_init(mod) != 0) {
-        goto error;
-    }
-    if (channel_exceptions_init(mod) != 0) {
-        goto error;
-    }
-
-    /* Add other types */
-
-    // ChannelID
-    state->ChannelIDType = add_new_type(
-            mod, &ChannelIDType_spec, _channelid_shared);
-    if (state->ChannelIDType == NULL) {
+    if (exceptions_init(mod) != 0) {
         goto error;
     }
 
@@ -3015,8 +851,6 @@ module_exec(PyObject *mod)
     return 0;
 
 error:
-    (void)_PyCrossInterpreterData_UnregisterClass(state->ChannelIDType);
-    _globals_fini();
     return -1;
 }
 
@@ -3049,7 +883,6 @@ module_free(void *mod)
     module_state *state = get_module_state(mod);
     assert(state != NULL);
     clear_module_state(state);
-    _globals_fini();
 }
 
 static struct PyModuleDef moduledef = {
diff --git a/PC/config.c b/PC/config.c
index 9d900c78e40d..cdb5db23c4ae 100644
--- a/PC/config.c
+++ b/PC/config.c
@@ -37,6 +37,7 @@ extern PyObject* PyInit__weakref(void);
 /* XXX: These two should really be extracted to standalone extensions. */
 extern PyObject* PyInit_xxsubtype(void);
 extern PyObject* PyInit__xxsubinterpreters(void);
+extern PyObject* PyInit__xxinterpchannels(void);
 extern PyObject* PyInit__random(void);
 extern PyObject* PyInit_itertools(void);
 extern PyObject* PyInit__collections(void);
@@ -134,6 +135,7 @@ struct _inittab _PyImport_Inittab[] = {
 
     {"xxsubtype", PyInit_xxsubtype},
     {"_xxsubinterpreters", PyInit__xxsubinterpreters},
+    {"_xxinterpchannels", PyInit__xxinterpchannels},
 #ifdef _Py_HAVE_ZLIB
     {"zlib", PyInit_zlib},
 #endif
diff --git a/PCbuild/pythoncore.vcxproj b/PCbuild/pythoncore.vcxproj
index d3bd5b378e0d..397d22abe235 100644
--- a/PCbuild/pythoncore.vcxproj
+++ b/PCbuild/pythoncore.vcxproj
@@ -418,6 +418,7 @@
     <ClCompile Include="..\Modules\timemodule.c" />
     <ClCompile Include="..\Modules\xxsubtype.c" />
     <ClCompile Include="..\Modules\_xxsubinterpretersmodule.c" />
+    <ClCompile Include="..\Modules\_xxinterpchannelsmodule.c" />
     <ClCompile Include="..\Modules\_io\fileio.c" />
     <ClCompile Include="..\Modules\_io\bytesio.c" />
     <ClCompile Include="..\Modules\_io\stringio.c" />
diff --git a/PCbuild/pythoncore.vcxproj.filters b/PCbuild/pythoncore.vcxproj.filters
index c1b531fd818a..bcbedcc3235b 100644
--- a/PCbuild/pythoncore.vcxproj.filters
+++ b/PCbuild/pythoncore.vcxproj.filters
@@ -1322,6 +1322,9 @@
     <ClCompile Include="..\Modules\_xxsubinterpretersmodule.c">
       <Filter>Modules</Filter>
     </ClCompile>
+    <ClCompile Include="..\Modules\_xxinterpchannelsmodule.c">
+      <Filter>Modules</Filter>
+    </ClCompile>
     <ClCompile Include="..\Parser\string_parser.c">
       <Filter>Parser</Filter>
     </ClCompile>
diff --git a/Tools/build/generate_stdlib_module_names.py b/Tools/build/generate_stdlib_module_names.py
index c8a23f41fdd4..d15e5e2d5450 100644
--- a/Tools/build/generate_stdlib_module_names.py
+++ b/Tools/build/generate_stdlib_module_names.py
@@ -36,6 +36,7 @@
     '_testmultiphase',
     '_testsinglephase',
     '_xxsubinterpreters',
+    '_xxinterpchannels',
     '_xxtestfuzz',
     'idlelib.idle_test',
     'test',
diff --git a/configure b/configure
index 8b707cda6212..aef8103085bc 100755
--- a/configure
+++ b/configure
@@ -752,6 +752,8 @@ MODULE__MULTIPROCESSING_FALSE
 MODULE__MULTIPROCESSING_TRUE
 MODULE__ZONEINFO_FALSE
 MODULE__ZONEINFO_TRUE
+MODULE__XXINTERPCHANNELS_FALSE
+MODULE__XXINTERPCHANNELS_TRUE
 MODULE__XXSUBINTERPRETERS_FALSE
 MODULE__XXSUBINTERPRETERS_TRUE
 MODULE__TYPING_FALSE
@@ -25615,6 +25617,7 @@ case $ac_sys_system in #(
     py_cv_module__scproxy=n/a
     py_cv_module__tkinter=n/a
     py_cv_module__xxsubinterpreters=n/a
+    py_cv_module__xxinterpchannels=n/a
     py_cv_module_grp=n/a
     py_cv_module_nis=n/a
     py_cv_module_ossaudiodev=n/a
@@ -26057,6 +26060,26 @@ fi
 
 
 
+fi
+
+
+        if test "$py_cv_module__xxinterpchannels" != "n/a"; then :
+  py_cv_module__xxinterpchannels=yes
+fi
+   if test "$py_cv_module__xxinterpchannels" = yes; then
+  MODULE__XXINTERPCHANNELS_TRUE=
+  MODULE__XXINTERPCHANNELS_FALSE='#'
+else
+  MODULE__XXINTERPCHANNELS_TRUE='#'
+  MODULE__XXINTERPCHANNELS_FALSE=
+fi
+
+  as_fn_append MODULE_BLOCK "MODULE__XXINTERPCHANNELS_STATE=$py_cv_module__xxinterpchannels$as_nl"
+  if test "x$py_cv_module__xxinterpchannels" = xyes; then :
+
+
+
+
 fi
 
 
@@ -28236,6 +28259,10 @@ if test -z "${MODULE__XXSUBINTERPRETERS_TRUE}" && test -z "${MODULE__XXSUBINTERP
   as_fn_error $? "conditional \"MODULE__XXSUBINTERPRETERS\" was never defined.
 Usually this means the macro was only invoked conditionally." "$LINENO" 5
 fi
+if test -z "${MODULE__XXINTERPCHANNELS_TRUE}" && test -z "${MODULE__XXINTERPCHANNELS_FALSE}"; then
+  as_fn_error $? "conditional \"MODULE__XXINTERPCHANNELS\" was never defined.
+Usually this means the macro was only invoked conditionally." "$LINENO" 5
+fi
 if test -z "${MODULE__ZONEINFO_TRUE}" && test -z "${MODULE__ZONEINFO_FALSE}"; then
   as_fn_error $? "conditional \"MODULE__ZONEINFO\" was never defined.
 Usually this means the macro was only invoked conditionally." "$LINENO" 5
diff --git a/configure.ac b/configure.ac
index 5eee4586680d..010bca855f00 100644
--- a/configure.ac
+++ b/configure.ac
@@ -7017,6 +7017,7 @@ AS_CASE([$ac_sys_system],
       [_scproxy],
       [_tkinter],
       [_xxsubinterpreters],
+      [_xxinterpchannels],
       [grp],
       [nis],
       [ossaudiodev],
@@ -7135,6 +7136,7 @@ PY_STDLIB_MOD_SIMPLE([select])
 PY_STDLIB_MOD_SIMPLE([_struct])
 PY_STDLIB_MOD_SIMPLE([_typing])
 PY_STDLIB_MOD_SIMPLE([_xxsubinterpreters])
+PY_STDLIB_MOD_SIMPLE([_xxinterpchannels])
 PY_STDLIB_MOD_SIMPLE([_zoneinfo])
 
 dnl multiprocessing modules



More information about the Python-checkins mailing list