[Python-checkins] gh-98040: Move the Single-Phase Init Tests Out of test_imp (gh-102561)

ericsnowcurrently webhook-mailer at python.org
Wed Apr 19 18:09:45 EDT 2023


https://github.com/python/cpython/commit/6be7aee18c5b8e639103df951d0d277f4b46f902
commit: 6be7aee18c5b8e639103df951d0d277f4b46f902
branch: main
author: Eric Snow <ericsnowcurrently at gmail.com>
committer: ericsnowcurrently <ericsnowcurrently at gmail.com>
date: 2023-04-19T16:09:35-06:00
summary:

gh-98040: Move the Single-Phase Init Tests Out of test_imp (gh-102561)

I recently added some tests to test_imp, but @warsaw is removing that file in gh-98573. The tests are worth keeping so here I'm moving them to test_import.

files:
M Lib/test/test_imp.py
M Lib/test/test_import/__init__.py

diff --git a/Lib/test/test_imp.py b/Lib/test/test_imp.py
index 03e3adba221e..80abc720c325 100644
--- a/Lib/test/test_imp.py
+++ b/Lib/test/test_imp.py
@@ -1,5 +1,4 @@
 import gc
-import json
 import importlib
 import importlib.util
 import os
@@ -11,28 +10,15 @@
 from test.support import os_helper
 from test.support import script_helper
 from test.support import warnings_helper
-import textwrap
-import types
 import unittest
 import warnings
 imp = warnings_helper.import_deprecated('imp')
 import _imp
-import _testinternalcapi
-try:
-    import _xxsubinterpreters as _interpreters
-except ModuleNotFoundError:
-    _interpreters = None
 
 
 OS_PATH_NAME = os.path.__name__
 
 
-def requires_subinterpreters(meth):
-    """Decorator to skip a test if subinterpreters are not supported."""
-    return unittest.skipIf(_interpreters is None,
-                           'subinterpreters required')(meth)
-
-
 def requires_load_dynamic(meth):
     """Decorator to skip a test if not running under CPython or lacking
     imp.load_dynamic()."""
@@ -41,169 +27,6 @@ def requires_load_dynamic(meth):
                            'imp.load_dynamic() required')(meth)
 
 
-class ModuleSnapshot(types.SimpleNamespace):
-    """A representation of a module for testing.
-
-    Fields:
-
-    * id - the module's object ID
-    * module - the actual module or an adequate substitute
-       * __file__
-       * __spec__
-          * name
-          * origin
-    * ns - a copy (dict) of the module's __dict__ (or None)
-    * ns_id - the object ID of the module's __dict__
-    * cached - the sys.modules[mod.__spec__.name] entry (or None)
-    * cached_id - the object ID of the sys.modules entry (or None)
-
-    In cases where the value is not available (e.g. due to serialization),
-    the value will be None.
-    """
-    _fields = tuple('id module ns ns_id cached cached_id'.split())
-
-    @classmethod
-    def from_module(cls, mod):
-        name = mod.__spec__.name
-        cached = sys.modules.get(name)
-        return cls(
-            id=id(mod),
-            module=mod,
-            ns=types.SimpleNamespace(**mod.__dict__),
-            ns_id=id(mod.__dict__),
-            cached=cached,
-            cached_id=id(cached),
-        )
-
-    SCRIPT = textwrap.dedent('''
-        {imports}
-
-        name = {name!r}
-
-        {prescript}
-
-        mod = {name}
-
-        {body}
-
-        {postscript}
-        ''')
-    IMPORTS = textwrap.dedent('''
-        import sys
-        ''').strip()
-    SCRIPT_BODY = textwrap.dedent('''
-        # Capture the snapshot data.
-        cached = sys.modules.get(name)
-        snapshot = dict(
-            id=id(mod),
-            module=dict(
-                __file__=mod.__file__,
-                __spec__=dict(
-                    name=mod.__spec__.name,
-                    origin=mod.__spec__.origin,
-                ),
-            ),
-            ns=None,
-            ns_id=id(mod.__dict__),
-            cached=None,
-            cached_id=id(cached) if cached else None,
-        )
-        ''').strip()
-    CLEANUP_SCRIPT = textwrap.dedent('''
-        # Clean up the module.
-        sys.modules.pop(name, None)
-        ''').strip()
-
-    @classmethod
-    def build_script(cls, name, *,
-                     prescript=None,
-                     import_first=False,
-                     postscript=None,
-                     postcleanup=False,
-                     ):
-        if postcleanup is True:
-            postcleanup = cls.CLEANUP_SCRIPT
-        elif isinstance(postcleanup, str):
-            postcleanup = textwrap.dedent(postcleanup).strip()
-            postcleanup = cls.CLEANUP_SCRIPT + os.linesep + postcleanup
-        else:
-            postcleanup = ''
-        prescript = textwrap.dedent(prescript).strip() if prescript else ''
-        postscript = textwrap.dedent(postscript).strip() if postscript else ''
-
-        if postcleanup:
-            if postscript:
-                postscript = postscript + os.linesep * 2 + postcleanup
-            else:
-                postscript = postcleanup
-
-        if import_first:
-            prescript += textwrap.dedent(f'''
-
-                # Now import the module.
-                assert name not in sys.modules
-                import {name}''')
-
-        return cls.SCRIPT.format(
-            imports=cls.IMPORTS.strip(),
-            name=name,
-            prescript=prescript.strip(),
-            body=cls.SCRIPT_BODY.strip(),
-            postscript=postscript,
-        )
-
-    @classmethod
-    def parse(cls, text):
-        raw = json.loads(text)
-        mod = raw['module']
-        mod['__spec__'] = types.SimpleNamespace(**mod['__spec__'])
-        raw['module'] = types.SimpleNamespace(**mod)
-        return cls(**raw)
-
-    @classmethod
-    def from_subinterp(cls, name, interpid=None, *, pipe=None, **script_kwds):
-        if pipe is not None:
-            return cls._from_subinterp(name, interpid, pipe, script_kwds)
-        pipe = os.pipe()
-        try:
-            return cls._from_subinterp(name, interpid, pipe, script_kwds)
-        finally:
-            r, w = pipe
-            os.close(r)
-            os.close(w)
-
-    @classmethod
-    def _from_subinterp(cls, name, interpid, pipe, script_kwargs):
-        r, w = pipe
-
-        # Build the script.
-        postscript = textwrap.dedent(f'''
-            # Send the result over the pipe.
-            import json
-            import os
-            os.write({w}, json.dumps(snapshot).encode())
-
-            ''')
-        _postscript = script_kwargs.get('postscript')
-        if _postscript:
-            _postscript = textwrap.dedent(_postscript).lstrip()
-            postscript += _postscript
-        script_kwargs['postscript'] = postscript.strip()
-        script = cls.build_script(name, **script_kwargs)
-
-        # Run the script.
-        if interpid is None:
-            ret = support.run_in_subinterp(script)
-            if ret != 0:
-                raise AssertionError(f'{ret} != 0')
-        else:
-            _interpreters.run_string(interpid, script)
-
-        # Parse the results.
-        text = os.read(r, 1000)
-        return cls.parse(text.decode())
-
-
 class LockTests(unittest.TestCase):
 
     """Very basic test of import lock functions."""
@@ -620,669 +443,6 @@ def check_get_builtins():
         check_get_builtins()
 
 
-class TestSinglePhaseSnapshot(ModuleSnapshot):
-
-    @classmethod
-    def from_module(cls, mod):
-        self = super().from_module(mod)
-        self.summed = mod.sum(1, 2)
-        self.lookedup = mod.look_up_self()
-        self.lookedup_id = id(self.lookedup)
-        self.state_initialized = mod.state_initialized()
-        if hasattr(mod, 'initialized_count'):
-            self.init_count = mod.initialized_count()
-        return self
-
-    SCRIPT_BODY = ModuleSnapshot.SCRIPT_BODY + textwrap.dedent(f'''
-        snapshot['module'].update(dict(
-            int_const=mod.int_const,
-            str_const=mod.str_const,
-            _module_initialized=mod._module_initialized,
-        ))
-        snapshot.update(dict(
-            summed=mod.sum(1, 2),
-            lookedup_id=id(mod.look_up_self()),
-            state_initialized=mod.state_initialized(),
-            init_count=mod.initialized_count(),
-            has_spam=hasattr(mod, 'spam'),
-            spam=getattr(mod, 'spam', None),
-        ))
-        ''').rstrip()
-
-    @classmethod
-    def parse(cls, text):
-        self = super().parse(text)
-        if not self.has_spam:
-            del self.spam
-        del self.has_spam
-        return self
-
-
- at requires_load_dynamic
-class SinglephaseInitTests(unittest.TestCase):
-
-    NAME = '_testsinglephase'
-
-    @classmethod
-    def setUpClass(cls):
-        if '-R' in sys.argv or '--huntrleaks' in sys.argv:
-            # https://github.com/python/cpython/issues/102251
-            raise unittest.SkipTest('unresolved refleaks (see gh-102251)')
-        fileobj, filename, _ = imp.find_module(cls.NAME)
-        fileobj.close()
-        cls.FILE = filename
-
-        # Start fresh.
-        cls.clean_up()
-
-    def tearDown(self):
-        # Clean up the module.
-        self.clean_up()
-
-    @classmethod
-    def clean_up(cls):
-        name = cls.NAME
-        filename = cls.FILE
-        if name in sys.modules:
-            if hasattr(sys.modules[name], '_clear_globals'):
-                assert sys.modules[name].__file__ == filename
-                sys.modules[name]._clear_globals()
-            del sys.modules[name]
-        # Clear all internally cached data for the extension.
-        _testinternalcapi.clear_extension(name, filename)
-
-    #########################
-    # helpers
-
-    def add_module_cleanup(self, name):
-        def clean_up():
-            # Clear all internally cached data for the extension.
-            _testinternalcapi.clear_extension(name, self.FILE)
-        self.addCleanup(clean_up)
-
-    def load(self, name):
-        try:
-            already_loaded = self.already_loaded
-        except AttributeError:
-            already_loaded = self.already_loaded = {}
-        assert name not in already_loaded
-        mod = imp.load_dynamic(name, self.FILE)
-        self.assertNotIn(mod, already_loaded.values())
-        already_loaded[name] = mod
-        return types.SimpleNamespace(
-            name=name,
-            module=mod,
-            snapshot=TestSinglePhaseSnapshot.from_module(mod),
-        )
-
-    def re_load(self, name, mod):
-        assert sys.modules[name] is mod
-        assert mod.__dict__ == mod.__dict__
-        reloaded = imp.load_dynamic(name, self.FILE)
-        return types.SimpleNamespace(
-            name=name,
-            module=reloaded,
-            snapshot=TestSinglePhaseSnapshot.from_module(reloaded),
-        )
-
-    # subinterpreters
-
-    def add_subinterpreter(self):
-        interpid = _interpreters.create(isolated=False)
-        _interpreters.run_string(interpid, textwrap.dedent('''
-            import sys
-            import _testinternalcapi
-            '''))
-        def clean_up():
-            _interpreters.run_string(interpid, textwrap.dedent(f'''
-                name = {self.NAME!r}
-                if name in sys.modules:
-                    sys.modules[name]._clear_globals()
-                _testinternalcapi.clear_extension(name, {self.FILE!r})
-                '''))
-            _interpreters.destroy(interpid)
-        self.addCleanup(clean_up)
-        return interpid
-
-    def import_in_subinterp(self, interpid=None, *,
-                            postscript=None,
-                            postcleanup=False,
-                            ):
-        name = self.NAME
-
-        if postcleanup:
-            import_ = 'import _testinternalcapi' if interpid is None else ''
-            postcleanup = f'''
-                {import_}
-                mod._clear_globals()
-                _testinternalcapi.clear_extension(name, {self.FILE!r})
-                '''
-
-        try:
-            pipe = self._pipe
-        except AttributeError:
-            r, w = pipe = self._pipe = os.pipe()
-            self.addCleanup(os.close, r)
-            self.addCleanup(os.close, w)
-
-        snapshot = TestSinglePhaseSnapshot.from_subinterp(
-            name,
-            interpid,
-            pipe=pipe,
-            import_first=True,
-            postscript=postscript,
-            postcleanup=postcleanup,
-        )
-
-        return types.SimpleNamespace(
-            name=name,
-            module=None,
-            snapshot=snapshot,
-        )
-
-    # checks
-
-    def check_common(self, loaded):
-        isolated = False
-
-        mod = loaded.module
-        if not mod:
-            # It came from a subinterpreter.
-            isolated = True
-            mod = loaded.snapshot.module
-        # mod.__name__  might not match, but the spec will.
-        self.assertEqual(mod.__spec__.name, loaded.name)
-        self.assertEqual(mod.__file__, self.FILE)
-        self.assertEqual(mod.__spec__.origin, self.FILE)
-        if not isolated:
-            self.assertTrue(issubclass(mod.error, Exception))
-        self.assertEqual(mod.int_const, 1969)
-        self.assertEqual(mod.str_const, 'something different')
-        self.assertIsInstance(mod._module_initialized, float)
-        self.assertGreater(mod._module_initialized, 0)
-
-        snap = loaded.snapshot
-        self.assertEqual(snap.summed, 3)
-        if snap.state_initialized is not None:
-            self.assertIsInstance(snap.state_initialized, float)
-            self.assertGreater(snap.state_initialized, 0)
-        if isolated:
-            # The "looked up" module is interpreter-specific
-            # (interp->imports.modules_by_index was set for the module).
-            self.assertEqual(snap.lookedup_id, snap.id)
-            self.assertEqual(snap.cached_id, snap.id)
-            with self.assertRaises(AttributeError):
-                snap.spam
-        else:
-            self.assertIs(snap.lookedup, mod)
-            self.assertIs(snap.cached, mod)
-
-    def check_direct(self, loaded):
-        # The module has its own PyModuleDef, with a matching name.
-        self.assertEqual(loaded.module.__name__, loaded.name)
-        self.assertIs(loaded.snapshot.lookedup, loaded.module)
-
-    def check_indirect(self, loaded, orig):
-        # The module re-uses another's PyModuleDef, with a different name.
-        assert orig is not loaded.module
-        assert orig.__name__ != loaded.name
-        self.assertNotEqual(loaded.module.__name__, loaded.name)
-        self.assertIs(loaded.snapshot.lookedup, loaded.module)
-
-    def check_basic(self, loaded, expected_init_count):
-        # m_size == -1
-        # The module loads fresh the first time and copies m_copy after.
-        snap = loaded.snapshot
-        self.assertIsNot(snap.state_initialized, None)
-        self.assertIsInstance(snap.init_count, int)
-        self.assertGreater(snap.init_count, 0)
-        self.assertEqual(snap.init_count, expected_init_count)
-
-    def check_with_reinit(self, loaded):
-        # m_size >= 0
-        # The module loads fresh every time.
-        pass
-
-    def check_fresh(self, loaded):
-        """
-        The module had not been loaded before (at least since fully reset).
-        """
-        snap = loaded.snapshot
-        # The module's init func was run.
-        # A copy of the module's __dict__ was stored in def->m_base.m_copy.
-        # The previous m_copy was deleted first.
-        # _PyRuntime.imports.extensions was set.
-        self.assertEqual(snap.init_count, 1)
-        # The global state was initialized.
-        # The module attrs were initialized from that state.
-        self.assertEqual(snap.module._module_initialized,
-                         snap.state_initialized)
-
-    def check_semi_fresh(self, loaded, base, prev):
-        """
-        The module had been loaded before and then reset
-        (but the module global state wasn't).
-        """
-        snap = loaded.snapshot
-        # The module's init func was run again.
-        # A copy of the module's __dict__ was stored in def->m_base.m_copy.
-        # The previous m_copy was deleted first.
-        # The module globals did not get reset.
-        self.assertNotEqual(snap.id, base.snapshot.id)
-        self.assertNotEqual(snap.id, prev.snapshot.id)
-        self.assertEqual(snap.init_count, prev.snapshot.init_count + 1)
-        # The global state was updated.
-        # The module attrs were initialized from that state.
-        self.assertEqual(snap.module._module_initialized,
-                         snap.state_initialized)
-        self.assertNotEqual(snap.state_initialized,
-                            base.snapshot.state_initialized)
-        self.assertNotEqual(snap.state_initialized,
-                            prev.snapshot.state_initialized)
-
-    def check_copied(self, loaded, base):
-        """
-        The module had been loaded before and never reset.
-        """
-        snap = loaded.snapshot
-        # The module's init func was not run again.
-        # The interpreter copied m_copy, as set by the other interpreter,
-        # with objects owned by the other interpreter.
-        # The module globals did not get reset.
-        self.assertNotEqual(snap.id, base.snapshot.id)
-        self.assertEqual(snap.init_count, base.snapshot.init_count)
-        # The global state was not updated since the init func did not run.
-        # The module attrs were not directly initialized from that state.
-        # The state and module attrs still match the previous loading.
-        self.assertEqual(snap.module._module_initialized,
-                         snap.state_initialized)
-        self.assertEqual(snap.state_initialized,
-                         base.snapshot.state_initialized)
-
-    #########################
-    # the tests
-
-    def test_cleared_globals(self):
-        loaded = self.load(self.NAME)
-        _testsinglephase = loaded.module
-        init_before = _testsinglephase.state_initialized()
-
-        _testsinglephase._clear_globals()
-        init_after = _testsinglephase.state_initialized()
-        init_count = _testsinglephase.initialized_count()
-
-        self.assertGreater(init_before, 0)
-        self.assertEqual(init_after, 0)
-        self.assertEqual(init_count, -1)
-
-    def test_variants(self):
-        # Exercise the most meaningful variants described in Python/import.c.
-        self.maxDiff = None
-
-        # Check the "basic" module.
-
-        name = self.NAME
-        expected_init_count = 1
-        with self.subTest(name):
-            loaded = self.load(name)
-
-            self.check_common(loaded)
-            self.check_direct(loaded)
-            self.check_basic(loaded, expected_init_count)
-        basic = loaded.module
-
-        # Check its indirect variants.
-
-        name = f'{self.NAME}_basic_wrapper'
-        self.add_module_cleanup(name)
-        expected_init_count += 1
-        with self.subTest(name):
-            loaded = self.load(name)
-
-            self.check_common(loaded)
-            self.check_indirect(loaded, basic)
-            self.check_basic(loaded, expected_init_count)
-
-            # Currently PyState_AddModule() always replaces the cached module.
-            self.assertIs(basic.look_up_self(), loaded.module)
-            self.assertEqual(basic.initialized_count(), expected_init_count)
-
-        # The cached module shouldn't change after this point.
-        basic_lookedup = loaded.module
-
-        # Check its direct variant.
-
-        name = f'{self.NAME}_basic_copy'
-        self.add_module_cleanup(name)
-        expected_init_count += 1
-        with self.subTest(name):
-            loaded = self.load(name)
-
-            self.check_common(loaded)
-            self.check_direct(loaded)
-            self.check_basic(loaded, expected_init_count)
-
-            # This should change the cached module for _testsinglephase.
-            self.assertIs(basic.look_up_self(), basic_lookedup)
-            self.assertEqual(basic.initialized_count(), expected_init_count)
-
-        # Check the non-basic variant that has no state.
-
-        name = f'{self.NAME}_with_reinit'
-        self.add_module_cleanup(name)
-        with self.subTest(name):
-            loaded = self.load(name)
-
-            self.check_common(loaded)
-            self.assertIs(loaded.snapshot.state_initialized, None)
-            self.check_direct(loaded)
-            self.check_with_reinit(loaded)
-
-            # This should change the cached module for _testsinglephase.
-            self.assertIs(basic.look_up_self(), basic_lookedup)
-            self.assertEqual(basic.initialized_count(), expected_init_count)
-
-        # Check the basic variant that has state.
-
-        name = f'{self.NAME}_with_state'
-        self.add_module_cleanup(name)
-        with self.subTest(name):
-            loaded = self.load(name)
-
-            self.check_common(loaded)
-            self.assertIsNot(loaded.snapshot.state_initialized, None)
-            self.check_direct(loaded)
-            self.check_with_reinit(loaded)
-
-            # This should change the cached module for _testsinglephase.
-            self.assertIs(basic.look_up_self(), basic_lookedup)
-            self.assertEqual(basic.initialized_count(), expected_init_count)
-
-    def test_basic_reloaded(self):
-        # m_copy is copied into the existing module object.
-        # Global state is not changed.
-        self.maxDiff = None
-
-        for name in [
-            self.NAME,  # the "basic" module
-            f'{self.NAME}_basic_wrapper',  # the indirect variant
-            f'{self.NAME}_basic_copy',  # the direct variant
-        ]:
-            self.add_module_cleanup(name)
-            with self.subTest(name):
-                loaded = self.load(name)
-                reloaded = self.re_load(name, loaded.module)
-
-                self.check_common(loaded)
-                self.check_common(reloaded)
-
-                # Make sure the original __dict__ did not get replaced.
-                self.assertEqual(id(loaded.module.__dict__),
-                                 loaded.snapshot.ns_id)
-                self.assertEqual(loaded.snapshot.ns.__dict__,
-                                 loaded.module.__dict__)
-
-                self.assertEqual(reloaded.module.__spec__.name, reloaded.name)
-                self.assertEqual(reloaded.module.__name__,
-                                 reloaded.snapshot.ns.__name__)
-
-                self.assertIs(reloaded.module, loaded.module)
-                self.assertIs(reloaded.module.__dict__, loaded.module.__dict__)
-                # It only happens to be the same but that's good enough here.
-                # We really just want to verify that the re-loaded attrs
-                # didn't change.
-                self.assertIs(reloaded.snapshot.lookedup,
-                              loaded.snapshot.lookedup)
-                self.assertEqual(reloaded.snapshot.state_initialized,
-                                 loaded.snapshot.state_initialized)
-                self.assertEqual(reloaded.snapshot.init_count,
-                                 loaded.snapshot.init_count)
-
-                self.assertIs(reloaded.snapshot.cached, reloaded.module)
-
-    def test_with_reinit_reloaded(self):
-        # The module's m_init func is run again.
-        self.maxDiff = None
-
-        # Keep a reference around.
-        basic = self.load(self.NAME)
-
-        for name in [
-            f'{self.NAME}_with_reinit',  # m_size == 0
-            f'{self.NAME}_with_state',  # m_size > 0
-        ]:
-            self.add_module_cleanup(name)
-            with self.subTest(name):
-                loaded = self.load(name)
-                reloaded = self.re_load(name, loaded.module)
-
-                self.check_common(loaded)
-                self.check_common(reloaded)
-
-                # Make sure the original __dict__ did not get replaced.
-                self.assertEqual(id(loaded.module.__dict__),
-                                 loaded.snapshot.ns_id)
-                self.assertEqual(loaded.snapshot.ns.__dict__,
-                                 loaded.module.__dict__)
-
-                self.assertEqual(reloaded.module.__spec__.name, reloaded.name)
-                self.assertEqual(reloaded.module.__name__,
-                                 reloaded.snapshot.ns.__name__)
-
-                self.assertIsNot(reloaded.module, loaded.module)
-                self.assertNotEqual(reloaded.module.__dict__,
-                                    loaded.module.__dict__)
-                self.assertIs(reloaded.snapshot.lookedup, reloaded.module)
-                if loaded.snapshot.state_initialized is None:
-                    self.assertIs(reloaded.snapshot.state_initialized, None)
-                else:
-                    self.assertGreater(reloaded.snapshot.state_initialized,
-                                       loaded.snapshot.state_initialized)
-
-                self.assertIs(reloaded.snapshot.cached, reloaded.module)
-
-    # Currently, for every single-phrase init module loaded
-    # in multiple interpreters, those interpreters share a
-    # PyModuleDef for that object, which can be a problem.
-    # Also, we test with a single-phase module that has global state,
-    # which is shared by all interpreters.
-
-    @requires_subinterpreters
-    def test_basic_multiple_interpreters_main_no_reset(self):
-        # without resetting; already loaded in main interpreter
-
-        # At this point:
-        #  * alive in 0 interpreters
-        #  * module def may or may not be loaded already
-        #  * module def not in _PyRuntime.imports.extensions
-        #  * mod init func has not run yet (since reset, at least)
-        #  * m_copy not set (hasn't been loaded yet or already cleared)
-        #  * module's global state has not been initialized yet
-        #    (or already cleared)
-
-        main_loaded = self.load(self.NAME)
-        _testsinglephase = main_loaded.module
-        # Attrs set after loading are not in m_copy.
-        _testsinglephase.spam = 'spam, spam, spam, spam, eggs, and spam'
-
-        self.check_common(main_loaded)
-        self.check_fresh(main_loaded)
-
-        interpid1 = self.add_subinterpreter()
-        interpid2 = self.add_subinterpreter()
-
-        # At this point:
-        #  * alive in 1 interpreter (main)
-        #  * module def in _PyRuntime.imports.extensions
-        #  * mod init func ran for the first time (since reset, at least)
-        #  * m_copy was copied from the main interpreter (was NULL)
-        #  * module's global state was initialized
-
-        # Use an interpreter that gets destroyed right away.
-        loaded = self.import_in_subinterp()
-        self.check_common(loaded)
-        self.check_copied(loaded, main_loaded)
-
-        # At this point:
-        #  * alive in 1 interpreter (main)
-        #  * module def still in _PyRuntime.imports.extensions
-        #  * mod init func ran again
-        #  * m_copy is NULL (claered when the interpreter was destroyed)
-        #    (was from main interpreter)
-        #  * module's global state was updated, not reset
-
-        # Use a subinterpreter that sticks around.
-        loaded = self.import_in_subinterp(interpid1)
-        self.check_common(loaded)
-        self.check_copied(loaded, main_loaded)
-
-        # At this point:
-        #  * alive in 2 interpreters (main, interp1)
-        #  * module def still in _PyRuntime.imports.extensions
-        #  * mod init func ran again
-        #  * m_copy was copied from interp1
-        #  * module's global state was updated, not reset
-
-        # Use a subinterpreter while the previous one is still alive.
-        loaded = self.import_in_subinterp(interpid2)
-        self.check_common(loaded)
-        self.check_copied(loaded, main_loaded)
-
-        # At this point:
-        #  * alive in 3 interpreters (main, interp1, interp2)
-        #  * module def still in _PyRuntime.imports.extensions
-        #  * mod init func ran again
-        #  * m_copy was copied from interp2 (was from interp1)
-        #  * module's global state was updated, not reset
-
-    @requires_subinterpreters
-    def test_basic_multiple_interpreters_deleted_no_reset(self):
-        # without resetting; already loaded in a deleted interpreter
-
-        # At this point:
-        #  * alive in 0 interpreters
-        #  * module def may or may not be loaded already
-        #  * module def not in _PyRuntime.imports.extensions
-        #  * mod init func has not run yet (since reset, at least)
-        #  * m_copy not set (hasn't been loaded yet or already cleared)
-        #  * module's global state has not been initialized yet
-        #    (or already cleared)
-
-        interpid1 = self.add_subinterpreter()
-        interpid2 = self.add_subinterpreter()
-
-        # First, load in the main interpreter but then completely clear it.
-        loaded_main = self.load(self.NAME)
-        loaded_main.module._clear_globals()
-        _testinternalcapi.clear_extension(self.NAME, self.FILE)
-
-        # At this point:
-        #  * alive in 0 interpreters
-        #  * module def loaded already
-        #  * module def was in _PyRuntime.imports.extensions, but cleared
-        #  * mod init func ran for the first time (since reset, at least)
-        #  * m_copy was set, but cleared (was NULL)
-        #  * module's global state was initialized but cleared
-
-        # Start with an interpreter that gets destroyed right away.
-        base = self.import_in_subinterp(postscript='''
-            # Attrs set after loading are not in m_copy.
-            mod.spam = 'spam, spam, mash, spam, eggs, and spam'
-        ''')
-        self.check_common(base)
-        self.check_fresh(base)
-
-        # At this point:
-        #  * alive in 0 interpreters
-        #  * module def in _PyRuntime.imports.extensions
-        #  * mod init func ran again
-        #  * m_copy is NULL (claered when the interpreter was destroyed)
-        #  * module's global state was initialized, not reset
-
-        # Use a subinterpreter that sticks around.
-        loaded_interp1 = self.import_in_subinterp(interpid1)
-        self.check_common(loaded_interp1)
-        self.check_semi_fresh(loaded_interp1, loaded_main, base)
-
-        # At this point:
-        #  * alive in 1 interpreter (interp1)
-        #  * module def still in _PyRuntime.imports.extensions
-        #  * mod init func ran again
-        #  * m_copy was copied from interp1 (was NULL)
-        #  * module's global state was updated, not reset
-
-        # Use a subinterpreter while the previous one is still alive.
-        loaded_interp2 = self.import_in_subinterp(interpid2)
-        self.check_common(loaded_interp2)
-        self.check_copied(loaded_interp2, loaded_interp1)
-
-        # At this point:
-        #  * alive in 2 interpreters (interp1, interp2)
-        #  * module def still in _PyRuntime.imports.extensions
-        #  * mod init func ran again
-        #  * m_copy was copied from interp2 (was from interp1)
-        #  * module's global state was updated, not reset
-
-    @requires_subinterpreters
-    @requires_load_dynamic
-    def test_basic_multiple_interpreters_reset_each(self):
-        # resetting between each interpreter
-
-        # At this point:
-        #  * alive in 0 interpreters
-        #  * module def may or may not be loaded already
-        #  * module def not in _PyRuntime.imports.extensions
-        #  * mod init func has not run yet (since reset, at least)
-        #  * m_copy not set (hasn't been loaded yet or already cleared)
-        #  * module's global state has not been initialized yet
-        #    (or already cleared)
-
-        interpid1 = self.add_subinterpreter()
-        interpid2 = self.add_subinterpreter()
-
-        # Use an interpreter that gets destroyed right away.
-        loaded = self.import_in_subinterp(
-            postscript='''
-            # Attrs set after loading are not in m_copy.
-            mod.spam = 'spam, spam, mash, spam, eggs, and spam'
-            ''',
-            postcleanup=True,
-        )
-        self.check_common(loaded)
-        self.check_fresh(loaded)
-
-        # At this point:
-        #  * alive in 0 interpreters
-        #  * module def in _PyRuntime.imports.extensions
-        #  * mod init func ran for the first time (since reset, at least)
-        #  * m_copy is NULL (claered when the interpreter was destroyed)
-        #  * module's global state was initialized, not reset
-
-        # Use a subinterpreter that sticks around.
-        loaded = self.import_in_subinterp(interpid1, postcleanup=True)
-        self.check_common(loaded)
-        self.check_fresh(loaded)
-
-        # At this point:
-        #  * alive in 1 interpreter (interp1)
-        #  * module def still in _PyRuntime.imports.extensions
-        #  * mod init func ran again
-        #  * m_copy was copied from interp1 (was NULL)
-        #  * module's global state was initialized, not reset
-
-        # Use a subinterpreter while the previous one is still alive.
-        loaded = self.import_in_subinterp(interpid2, postcleanup=True)
-        self.check_common(loaded)
-        self.check_fresh(loaded)
-
-        # At this point:
-        #  * alive in 2 interpreters (interp2, interp2)
-        #  * module def still in _PyRuntime.imports.extensions
-        #  * mod init func ran again
-        #  * m_copy was copied from interp2 (was from interp1)
-        #  * module's global state was initialized, not reset
-
-
 class ReloadTests(unittest.TestCase):
 
     """Very basic tests to make sure that imp.reload() operates just like
diff --git a/Lib/test/test_import/__init__.py b/Lib/test/test_import/__init__.py
index 3ef07203c46c..66ae554f984f 100644
--- a/Lib/test/test_import/__init__.py
+++ b/Lib/test/test_import/__init__.py
@@ -2,6 +2,7 @@
 import contextlib
 import errno
 import glob
+import json
 import importlib.util
 from importlib._bootstrap_external import _get_sourcefile
 from importlib.machinery import (
@@ -18,13 +19,15 @@
 import textwrap
 import threading
 import time
+import types
 import unittest
 from unittest import mock
+import _testinternalcapi
 
 from test.support import os_helper
 from test.support import (
     STDLIB_DIR, swap_attr, swap_item, cpython_only, is_emscripten,
-    is_wasi, run_in_subinterp_with_config)
+    is_wasi, run_in_subinterp, run_in_subinterp_with_config)
 from test.support.import_helper import (
     forget, make_legacy_pyc, unlink, unload, DirsOnSysPath, CleanImport)
 from test.support.os_helper import (
@@ -41,6 +44,10 @@
     import _testmultiphase
 except ImportError:
     _testmultiphase = None
+try:
+    import _xxsubinterpreters as _interpreters
+except ModuleNotFoundError:
+    _interpreters = None
 
 
 skip_if_dont_write_bytecode = unittest.skipIf(
@@ -120,6 +127,182 @@ def _ready_to_import(name=None, source=""):
                 del sys.modules[name]
 
 
+def requires_subinterpreters(meth):
+    """Decorator to skip a test if subinterpreters are not supported."""
+    return unittest.skipIf(_interpreters is None,
+                           'subinterpreters required')(meth)
+
+
+def requires_singlephase_init(meth):
+    """Decorator to skip if single-phase init modules are not supported."""
+    meth = cpython_only(meth)
+    return unittest.skipIf(_testsinglephase is None,
+                           'test requires _testsinglephase module')(meth)
+
+
+class ModuleSnapshot(types.SimpleNamespace):
+    """A representation of a module for testing.
+
+    Fields:
+
+    * id - the module's object ID
+    * module - the actual module or an adequate substitute
+       * __file__
+       * __spec__
+          * name
+          * origin
+    * ns - a copy (dict) of the module's __dict__ (or None)
+    * ns_id - the object ID of the module's __dict__
+    * cached - the sys.modules[mod.__spec__.name] entry (or None)
+    * cached_id - the object ID of the sys.modules entry (or None)
+
+    In cases where the value is not available (e.g. due to serialization),
+    the value will be None.
+    """
+    _fields = tuple('id module ns ns_id cached cached_id'.split())
+
+    @classmethod
+    def from_module(cls, mod):
+        name = mod.__spec__.name
+        cached = sys.modules.get(name)
+        return cls(
+            id=id(mod),
+            module=mod,
+            ns=types.SimpleNamespace(**mod.__dict__),
+            ns_id=id(mod.__dict__),
+            cached=cached,
+            cached_id=id(cached),
+        )
+
+    SCRIPT = textwrap.dedent('''
+        {imports}
+
+        name = {name!r}
+
+        {prescript}
+
+        mod = {name}
+
+        {body}
+
+        {postscript}
+        ''')
+    IMPORTS = textwrap.dedent('''
+        import sys
+        ''').strip()
+    SCRIPT_BODY = textwrap.dedent('''
+        # Capture the snapshot data.
+        cached = sys.modules.get(name)
+        snapshot = dict(
+            id=id(mod),
+            module=dict(
+                __file__=mod.__file__,
+                __spec__=dict(
+                    name=mod.__spec__.name,
+                    origin=mod.__spec__.origin,
+                ),
+            ),
+            ns=None,
+            ns_id=id(mod.__dict__),
+            cached=None,
+            cached_id=id(cached) if cached else None,
+        )
+        ''').strip()
+    CLEANUP_SCRIPT = textwrap.dedent('''
+        # Clean up the module.
+        sys.modules.pop(name, None)
+        ''').strip()
+
+    @classmethod
+    def build_script(cls, name, *,
+                     prescript=None,
+                     import_first=False,
+                     postscript=None,
+                     postcleanup=False,
+                     ):
+        if postcleanup is True:
+            postcleanup = cls.CLEANUP_SCRIPT
+        elif isinstance(postcleanup, str):
+            postcleanup = textwrap.dedent(postcleanup).strip()
+            postcleanup = cls.CLEANUP_SCRIPT + os.linesep + postcleanup
+        else:
+            postcleanup = ''
+        prescript = textwrap.dedent(prescript).strip() if prescript else ''
+        postscript = textwrap.dedent(postscript).strip() if postscript else ''
+
+        if postcleanup:
+            if postscript:
+                postscript = postscript + os.linesep * 2 + postcleanup
+            else:
+                postscript = postcleanup
+
+        if import_first:
+            prescript += textwrap.dedent(f'''
+
+                # Now import the module.
+                assert name not in sys.modules
+                import {name}''')
+
+        return cls.SCRIPT.format(
+            imports=cls.IMPORTS.strip(),
+            name=name,
+            prescript=prescript.strip(),
+            body=cls.SCRIPT_BODY.strip(),
+            postscript=postscript,
+        )
+
+    @classmethod
+    def parse(cls, text):
+        raw = json.loads(text)
+        mod = raw['module']
+        mod['__spec__'] = types.SimpleNamespace(**mod['__spec__'])
+        raw['module'] = types.SimpleNamespace(**mod)
+        return cls(**raw)
+
+    @classmethod
+    def from_subinterp(cls, name, interpid=None, *, pipe=None, **script_kwds):
+        if pipe is not None:
+            return cls._from_subinterp(name, interpid, pipe, script_kwds)
+        pipe = os.pipe()
+        try:
+            return cls._from_subinterp(name, interpid, pipe, script_kwds)
+        finally:
+            r, w = pipe
+            os.close(r)
+            os.close(w)
+
+    @classmethod
+    def _from_subinterp(cls, name, interpid, pipe, script_kwargs):
+        r, w = pipe
+
+        # Build the script.
+        postscript = textwrap.dedent(f'''
+            # Send the result over the pipe.
+            import json
+            import os
+            os.write({w}, json.dumps(snapshot).encode())
+
+            ''')
+        _postscript = script_kwargs.get('postscript')
+        if _postscript:
+            _postscript = textwrap.dedent(_postscript).lstrip()
+            postscript += _postscript
+        script_kwargs['postscript'] = postscript.strip()
+        script = cls.build_script(name, **script_kwargs)
+
+        # Run the script.
+        if interpid is None:
+            ret = run_in_subinterp(script)
+            if ret != 0:
+                raise AssertionError(f'{ret} != 0')
+        else:
+            _interpreters.run_string(interpid, script)
+
+        # Parse the results.
+        text = os.read(r, 1000)
+        return cls.parse(text.decode())
+
+
 class ImportTests(unittest.TestCase):
 
     def setUp(self):
@@ -1604,7 +1787,7 @@ def test_frozen_compat(self):
         with self.subTest(f'{module}: strict, not fresh'):
             self.check_compatible_here(module, strict=True)
 
-    @unittest.skipIf(_testsinglephase is None, "test requires _testsinglephase module")
+    @requires_singlephase_init
     def test_single_init_extension_compat(self):
         module = '_testsinglephase'
         require_extension(module)
@@ -1636,7 +1819,7 @@ def test_python_compat(self):
         with self.subTest(f'{module}: strict, fresh'):
             self.check_compatible_fresh(module, strict=True)
 
-    @unittest.skipIf(_testsinglephase is None, "test requires _testsinglephase module")
+    @requires_singlephase_init
     def test_singlephase_check_with_setting_and_override(self):
         module = '_testsinglephase'
         require_extension(module)
@@ -1672,6 +1855,685 @@ def check_incompatible(setting, override):
             check_compatible(False, -1)
 
 
+class TestSinglePhaseSnapshot(ModuleSnapshot):
+
+    @classmethod
+    def from_module(cls, mod):
+        self = super().from_module(mod)
+        self.summed = mod.sum(1, 2)
+        self.lookedup = mod.look_up_self()
+        self.lookedup_id = id(self.lookedup)
+        self.state_initialized = mod.state_initialized()
+        if hasattr(mod, 'initialized_count'):
+            self.init_count = mod.initialized_count()
+        return self
+
+    SCRIPT_BODY = ModuleSnapshot.SCRIPT_BODY + textwrap.dedent(f'''
+        snapshot['module'].update(dict(
+            int_const=mod.int_const,
+            str_const=mod.str_const,
+            _module_initialized=mod._module_initialized,
+        ))
+        snapshot.update(dict(
+            summed=mod.sum(1, 2),
+            lookedup_id=id(mod.look_up_self()),
+            state_initialized=mod.state_initialized(),
+            init_count=mod.initialized_count(),
+            has_spam=hasattr(mod, 'spam'),
+            spam=getattr(mod, 'spam', None),
+        ))
+        ''').rstrip()
+
+    @classmethod
+    def parse(cls, text):
+        self = super().parse(text)
+        if not self.has_spam:
+            del self.spam
+        del self.has_spam
+        return self
+
+
+ at requires_singlephase_init
+class SinglephaseInitTests(unittest.TestCase):
+
+    NAME = '_testsinglephase'
+
+    @classmethod
+    def setUpClass(cls):
+        if '-R' in sys.argv or '--huntrleaks' in sys.argv:
+            # https://github.com/python/cpython/issues/102251
+            raise unittest.SkipTest('unresolved refleaks (see gh-102251)')
+
+        spec = importlib.util.find_spec(cls.NAME)
+        from importlib.machinery import ExtensionFileLoader
+        cls.FILE = spec.origin
+        cls.LOADER = type(spec.loader)
+        assert cls.LOADER is ExtensionFileLoader
+
+        # Start fresh.
+        cls.clean_up()
+
+    def tearDown(self):
+        # Clean up the module.
+        self.clean_up()
+
+    @classmethod
+    def clean_up(cls):
+        name = cls.NAME
+        filename = cls.FILE
+        if name in sys.modules:
+            if hasattr(sys.modules[name], '_clear_globals'):
+                assert sys.modules[name].__file__ == filename
+                sys.modules[name]._clear_globals()
+            del sys.modules[name]
+        # Clear all internally cached data for the extension.
+        _testinternalcapi.clear_extension(name, filename)
+
+    #########################
+    # helpers
+
+    def add_module_cleanup(self, name):
+        def clean_up():
+            # Clear all internally cached data for the extension.
+            _testinternalcapi.clear_extension(name, self.FILE)
+        self.addCleanup(clean_up)
+
+    def _load_dynamic(self, name, path):
+        """
+        Load an extension module.
+        """
+        # This is essentially copied from the old imp module.
+        from importlib._bootstrap import _load
+        loader = self.LOADER(name, path)
+
+        # Issue bpo-24748: Skip the sys.modules check in _load_module_shim;
+        # always load new extension.
+        spec = importlib.util.spec_from_file_location(name, path,
+                                                      loader=loader)
+        return _load(spec)
+
+    def load(self, name):
+        try:
+            already_loaded = self.already_loaded
+        except AttributeError:
+            already_loaded = self.already_loaded = {}
+        assert name not in already_loaded
+        mod = self._load_dynamic(name, self.FILE)
+        self.assertNotIn(mod, already_loaded.values())
+        already_loaded[name] = mod
+        return types.SimpleNamespace(
+            name=name,
+            module=mod,
+            snapshot=TestSinglePhaseSnapshot.from_module(mod),
+        )
+
+    def re_load(self, name, mod):
+        assert sys.modules[name] is mod
+        assert mod.__dict__ == mod.__dict__
+        reloaded = self._load_dynamic(name, self.FILE)
+        return types.SimpleNamespace(
+            name=name,
+            module=reloaded,
+            snapshot=TestSinglePhaseSnapshot.from_module(reloaded),
+        )
+
+    # subinterpreters
+
+    def add_subinterpreter(self):
+        interpid = _interpreters.create(isolated=False)
+        _interpreters.run_string(interpid, textwrap.dedent('''
+            import sys
+            import _testinternalcapi
+            '''))
+        def clean_up():
+            _interpreters.run_string(interpid, textwrap.dedent(f'''
+                name = {self.NAME!r}
+                if name in sys.modules:
+                    sys.modules[name]._clear_globals()
+                _testinternalcapi.clear_extension(name, {self.FILE!r})
+                '''))
+            _interpreters.destroy(interpid)
+        self.addCleanup(clean_up)
+        return interpid
+
+    def import_in_subinterp(self, interpid=None, *,
+                            postscript=None,
+                            postcleanup=False,
+                            ):
+        name = self.NAME
+
+        if postcleanup:
+            import_ = 'import _testinternalcapi' if interpid is None else ''
+            postcleanup = f'''
+                {import_}
+                mod._clear_globals()
+                _testinternalcapi.clear_extension(name, {self.FILE!r})
+                '''
+
+        try:
+            pipe = self._pipe
+        except AttributeError:
+            r, w = pipe = self._pipe = os.pipe()
+            self.addCleanup(os.close, r)
+            self.addCleanup(os.close, w)
+
+        snapshot = TestSinglePhaseSnapshot.from_subinterp(
+            name,
+            interpid,
+            pipe=pipe,
+            import_first=True,
+            postscript=postscript,
+            postcleanup=postcleanup,
+        )
+
+        return types.SimpleNamespace(
+            name=name,
+            module=None,
+            snapshot=snapshot,
+        )
+
+    # checks
+
+    def check_common(self, loaded):
+        isolated = False
+
+        mod = loaded.module
+        if not mod:
+            # It came from a subinterpreter.
+            isolated = True
+            mod = loaded.snapshot.module
+        # mod.__name__  might not match, but the spec will.
+        self.assertEqual(mod.__spec__.name, loaded.name)
+        self.assertEqual(mod.__file__, self.FILE)
+        self.assertEqual(mod.__spec__.origin, self.FILE)
+        if not isolated:
+            self.assertTrue(issubclass(mod.error, Exception))
+        self.assertEqual(mod.int_const, 1969)
+        self.assertEqual(mod.str_const, 'something different')
+        self.assertIsInstance(mod._module_initialized, float)
+        self.assertGreater(mod._module_initialized, 0)
+
+        snap = loaded.snapshot
+        self.assertEqual(snap.summed, 3)
+        if snap.state_initialized is not None:
+            self.assertIsInstance(snap.state_initialized, float)
+            self.assertGreater(snap.state_initialized, 0)
+        if isolated:
+            # The "looked up" module is interpreter-specific
+            # (interp->imports.modules_by_index was set for the module).
+            self.assertEqual(snap.lookedup_id, snap.id)
+            self.assertEqual(snap.cached_id, snap.id)
+            with self.assertRaises(AttributeError):
+                snap.spam
+        else:
+            self.assertIs(snap.lookedup, mod)
+            self.assertIs(snap.cached, mod)
+
+    def check_direct(self, loaded):
+        # The module has its own PyModuleDef, with a matching name.
+        self.assertEqual(loaded.module.__name__, loaded.name)
+        self.assertIs(loaded.snapshot.lookedup, loaded.module)
+
+    def check_indirect(self, loaded, orig):
+        # The module re-uses another's PyModuleDef, with a different name.
+        assert orig is not loaded.module
+        assert orig.__name__ != loaded.name
+        self.assertNotEqual(loaded.module.__name__, loaded.name)
+        self.assertIs(loaded.snapshot.lookedup, loaded.module)
+
+    def check_basic(self, loaded, expected_init_count):
+        # m_size == -1
+        # The module loads fresh the first time and copies m_copy after.
+        snap = loaded.snapshot
+        self.assertIsNot(snap.state_initialized, None)
+        self.assertIsInstance(snap.init_count, int)
+        self.assertGreater(snap.init_count, 0)
+        self.assertEqual(snap.init_count, expected_init_count)
+
+    def check_with_reinit(self, loaded):
+        # m_size >= 0
+        # The module loads fresh every time.
+        pass
+
+    def check_fresh(self, loaded):
+        """
+        The module had not been loaded before (at least since fully reset).
+        """
+        snap = loaded.snapshot
+        # The module's init func was run.
+        # A copy of the module's __dict__ was stored in def->m_base.m_copy.
+        # The previous m_copy was deleted first.
+        # _PyRuntime.imports.extensions was set.
+        self.assertEqual(snap.init_count, 1)
+        # The global state was initialized.
+        # The module attrs were initialized from that state.
+        self.assertEqual(snap.module._module_initialized,
+                         snap.state_initialized)
+
+    def check_semi_fresh(self, loaded, base, prev):
+        """
+        The module had been loaded before and then reset
+        (but the module global state wasn't).
+        """
+        snap = loaded.snapshot
+        # The module's init func was run again.
+        # A copy of the module's __dict__ was stored in def->m_base.m_copy.
+        # The previous m_copy was deleted first.
+        # The module globals did not get reset.
+        self.assertNotEqual(snap.id, base.snapshot.id)
+        self.assertNotEqual(snap.id, prev.snapshot.id)
+        self.assertEqual(snap.init_count, prev.snapshot.init_count + 1)
+        # The global state was updated.
+        # The module attrs were initialized from that state.
+        self.assertEqual(snap.module._module_initialized,
+                         snap.state_initialized)
+        self.assertNotEqual(snap.state_initialized,
+                            base.snapshot.state_initialized)
+        self.assertNotEqual(snap.state_initialized,
+                            prev.snapshot.state_initialized)
+
+    def check_copied(self, loaded, base):
+        """
+        The module had been loaded before and never reset.
+        """
+        snap = loaded.snapshot
+        # The module's init func was not run again.
+        # The interpreter copied m_copy, as set by the other interpreter,
+        # with objects owned by the other interpreter.
+        # The module globals did not get reset.
+        self.assertNotEqual(snap.id, base.snapshot.id)
+        self.assertEqual(snap.init_count, base.snapshot.init_count)
+        # The global state was not updated since the init func did not run.
+        # The module attrs were not directly initialized from that state.
+        # The state and module attrs still match the previous loading.
+        self.assertEqual(snap.module._module_initialized,
+                         snap.state_initialized)
+        self.assertEqual(snap.state_initialized,
+                         base.snapshot.state_initialized)
+
+    #########################
+    # the tests
+
+    def test_cleared_globals(self):
+        loaded = self.load(self.NAME)
+        _testsinglephase = loaded.module
+        init_before = _testsinglephase.state_initialized()
+
+        _testsinglephase._clear_globals()
+        init_after = _testsinglephase.state_initialized()
+        init_count = _testsinglephase.initialized_count()
+
+        self.assertGreater(init_before, 0)
+        self.assertEqual(init_after, 0)
+        self.assertEqual(init_count, -1)
+
+    def test_variants(self):
+        # Exercise the most meaningful variants described in Python/import.c.
+        self.maxDiff = None
+
+        # Check the "basic" module.
+
+        name = self.NAME
+        expected_init_count = 1
+        with self.subTest(name):
+            loaded = self.load(name)
+
+            self.check_common(loaded)
+            self.check_direct(loaded)
+            self.check_basic(loaded, expected_init_count)
+        basic = loaded.module
+
+        # Check its indirect variants.
+
+        name = f'{self.NAME}_basic_wrapper'
+        self.add_module_cleanup(name)
+        expected_init_count += 1
+        with self.subTest(name):
+            loaded = self.load(name)
+
+            self.check_common(loaded)
+            self.check_indirect(loaded, basic)
+            self.check_basic(loaded, expected_init_count)
+
+            # Currently PyState_AddModule() always replaces the cached module.
+            self.assertIs(basic.look_up_self(), loaded.module)
+            self.assertEqual(basic.initialized_count(), expected_init_count)
+
+        # The cached module shouldn't change after this point.
+        basic_lookedup = loaded.module
+
+        # Check its direct variant.
+
+        name = f'{self.NAME}_basic_copy'
+        self.add_module_cleanup(name)
+        expected_init_count += 1
+        with self.subTest(name):
+            loaded = self.load(name)
+
+            self.check_common(loaded)
+            self.check_direct(loaded)
+            self.check_basic(loaded, expected_init_count)
+
+            # This should change the cached module for _testsinglephase.
+            self.assertIs(basic.look_up_self(), basic_lookedup)
+            self.assertEqual(basic.initialized_count(), expected_init_count)
+
+        # Check the non-basic variant that has no state.
+
+        name = f'{self.NAME}_with_reinit'
+        self.add_module_cleanup(name)
+        with self.subTest(name):
+            loaded = self.load(name)
+
+            self.check_common(loaded)
+            self.assertIs(loaded.snapshot.state_initialized, None)
+            self.check_direct(loaded)
+            self.check_with_reinit(loaded)
+
+            # This should change the cached module for _testsinglephase.
+            self.assertIs(basic.look_up_self(), basic_lookedup)
+            self.assertEqual(basic.initialized_count(), expected_init_count)
+
+        # Check the basic variant that has state.
+
+        name = f'{self.NAME}_with_state'
+        self.add_module_cleanup(name)
+        with self.subTest(name):
+            loaded = self.load(name)
+
+            self.check_common(loaded)
+            self.assertIsNot(loaded.snapshot.state_initialized, None)
+            self.check_direct(loaded)
+            self.check_with_reinit(loaded)
+
+            # This should change the cached module for _testsinglephase.
+            self.assertIs(basic.look_up_self(), basic_lookedup)
+            self.assertEqual(basic.initialized_count(), expected_init_count)
+
+    def test_basic_reloaded(self):
+        # m_copy is copied into the existing module object.
+        # Global state is not changed.
+        self.maxDiff = None
+
+        for name in [
+            self.NAME,  # the "basic" module
+            f'{self.NAME}_basic_wrapper',  # the indirect variant
+            f'{self.NAME}_basic_copy',  # the direct variant
+        ]:
+            self.add_module_cleanup(name)
+            with self.subTest(name):
+                loaded = self.load(name)
+                reloaded = self.re_load(name, loaded.module)
+
+                self.check_common(loaded)
+                self.check_common(reloaded)
+
+                # Make sure the original __dict__ did not get replaced.
+                self.assertEqual(id(loaded.module.__dict__),
+                                 loaded.snapshot.ns_id)
+                self.assertEqual(loaded.snapshot.ns.__dict__,
+                                 loaded.module.__dict__)
+
+                self.assertEqual(reloaded.module.__spec__.name, reloaded.name)
+                self.assertEqual(reloaded.module.__name__,
+                                 reloaded.snapshot.ns.__name__)
+
+                self.assertIs(reloaded.module, loaded.module)
+                self.assertIs(reloaded.module.__dict__, loaded.module.__dict__)
+                # It only happens to be the same but that's good enough here.
+                # We really just want to verify that the re-loaded attrs
+                # didn't change.
+                self.assertIs(reloaded.snapshot.lookedup,
+                              loaded.snapshot.lookedup)
+                self.assertEqual(reloaded.snapshot.state_initialized,
+                                 loaded.snapshot.state_initialized)
+                self.assertEqual(reloaded.snapshot.init_count,
+                                 loaded.snapshot.init_count)
+
+                self.assertIs(reloaded.snapshot.cached, reloaded.module)
+
+    def test_with_reinit_reloaded(self):
+        # The module's m_init func is run again.
+        self.maxDiff = None
+
+        # Keep a reference around.
+        basic = self.load(self.NAME)
+
+        for name in [
+            f'{self.NAME}_with_reinit',  # m_size == 0
+            f'{self.NAME}_with_state',  # m_size > 0
+        ]:
+            self.add_module_cleanup(name)
+            with self.subTest(name):
+                loaded = self.load(name)
+                reloaded = self.re_load(name, loaded.module)
+
+                self.check_common(loaded)
+                self.check_common(reloaded)
+
+                # Make sure the original __dict__ did not get replaced.
+                self.assertEqual(id(loaded.module.__dict__),
+                                 loaded.snapshot.ns_id)
+                self.assertEqual(loaded.snapshot.ns.__dict__,
+                                 loaded.module.__dict__)
+
+                self.assertEqual(reloaded.module.__spec__.name, reloaded.name)
+                self.assertEqual(reloaded.module.__name__,
+                                 reloaded.snapshot.ns.__name__)
+
+                self.assertIsNot(reloaded.module, loaded.module)
+                self.assertNotEqual(reloaded.module.__dict__,
+                                    loaded.module.__dict__)
+                self.assertIs(reloaded.snapshot.lookedup, reloaded.module)
+                if loaded.snapshot.state_initialized is None:
+                    self.assertIs(reloaded.snapshot.state_initialized, None)
+                else:
+                    self.assertGreater(reloaded.snapshot.state_initialized,
+                                       loaded.snapshot.state_initialized)
+
+                self.assertIs(reloaded.snapshot.cached, reloaded.module)
+
+    # Currently, for every single-phrase init module loaded
+    # in multiple interpreters, those interpreters share a
+    # PyModuleDef for that object, which can be a problem.
+    # Also, we test with a single-phase module that has global state,
+    # which is shared by all interpreters.
+
+    @requires_subinterpreters
+    def test_basic_multiple_interpreters_main_no_reset(self):
+        # without resetting; already loaded in main interpreter
+
+        # At this point:
+        #  * alive in 0 interpreters
+        #  * module def may or may not be loaded already
+        #  * module def not in _PyRuntime.imports.extensions
+        #  * mod init func has not run yet (since reset, at least)
+        #  * m_copy not set (hasn't been loaded yet or already cleared)
+        #  * module's global state has not been initialized yet
+        #    (or already cleared)
+
+        main_loaded = self.load(self.NAME)
+        _testsinglephase = main_loaded.module
+        # Attrs set after loading are not in m_copy.
+        _testsinglephase.spam = 'spam, spam, spam, spam, eggs, and spam'
+
+        self.check_common(main_loaded)
+        self.check_fresh(main_loaded)
+
+        interpid1 = self.add_subinterpreter()
+        interpid2 = self.add_subinterpreter()
+
+        # At this point:
+        #  * alive in 1 interpreter (main)
+        #  * module def in _PyRuntime.imports.extensions
+        #  * mod init func ran for the first time (since reset, at least)
+        #  * m_copy was copied from the main interpreter (was NULL)
+        #  * module's global state was initialized
+
+        # Use an interpreter that gets destroyed right away.
+        loaded = self.import_in_subinterp()
+        self.check_common(loaded)
+        self.check_copied(loaded, main_loaded)
+
+        # At this point:
+        #  * alive in 1 interpreter (main)
+        #  * module def still in _PyRuntime.imports.extensions
+        #  * mod init func ran again
+        #  * m_copy is NULL (claered when the interpreter was destroyed)
+        #    (was from main interpreter)
+        #  * module's global state was updated, not reset
+
+        # Use a subinterpreter that sticks around.
+        loaded = self.import_in_subinterp(interpid1)
+        self.check_common(loaded)
+        self.check_copied(loaded, main_loaded)
+
+        # At this point:
+        #  * alive in 2 interpreters (main, interp1)
+        #  * module def still in _PyRuntime.imports.extensions
+        #  * mod init func ran again
+        #  * m_copy was copied from interp1
+        #  * module's global state was updated, not reset
+
+        # Use a subinterpreter while the previous one is still alive.
+        loaded = self.import_in_subinterp(interpid2)
+        self.check_common(loaded)
+        self.check_copied(loaded, main_loaded)
+
+        # At this point:
+        #  * alive in 3 interpreters (main, interp1, interp2)
+        #  * module def still in _PyRuntime.imports.extensions
+        #  * mod init func ran again
+        #  * m_copy was copied from interp2 (was from interp1)
+        #  * module's global state was updated, not reset
+
+    @requires_subinterpreters
+    def test_basic_multiple_interpreters_deleted_no_reset(self):
+        # without resetting; already loaded in a deleted interpreter
+
+        # At this point:
+        #  * alive in 0 interpreters
+        #  * module def may or may not be loaded already
+        #  * module def not in _PyRuntime.imports.extensions
+        #  * mod init func has not run yet (since reset, at least)
+        #  * m_copy not set (hasn't been loaded yet or already cleared)
+        #  * module's global state has not been initialized yet
+        #    (or already cleared)
+
+        interpid1 = self.add_subinterpreter()
+        interpid2 = self.add_subinterpreter()
+
+        # First, load in the main interpreter but then completely clear it.
+        loaded_main = self.load(self.NAME)
+        loaded_main.module._clear_globals()
+        _testinternalcapi.clear_extension(self.NAME, self.FILE)
+
+        # At this point:
+        #  * alive in 0 interpreters
+        #  * module def loaded already
+        #  * module def was in _PyRuntime.imports.extensions, but cleared
+        #  * mod init func ran for the first time (since reset, at least)
+        #  * m_copy was set, but cleared (was NULL)
+        #  * module's global state was initialized but cleared
+
+        # Start with an interpreter that gets destroyed right away.
+        base = self.import_in_subinterp(postscript='''
+            # Attrs set after loading are not in m_copy.
+            mod.spam = 'spam, spam, mash, spam, eggs, and spam'
+        ''')
+        self.check_common(base)
+        self.check_fresh(base)
+
+        # At this point:
+        #  * alive in 0 interpreters
+        #  * module def in _PyRuntime.imports.extensions
+        #  * mod init func ran again
+        #  * m_copy is NULL (claered when the interpreter was destroyed)
+        #  * module's global state was initialized, not reset
+
+        # Use a subinterpreter that sticks around.
+        loaded_interp1 = self.import_in_subinterp(interpid1)
+        self.check_common(loaded_interp1)
+        self.check_semi_fresh(loaded_interp1, loaded_main, base)
+
+        # At this point:
+        #  * alive in 1 interpreter (interp1)
+        #  * module def still in _PyRuntime.imports.extensions
+        #  * mod init func ran again
+        #  * m_copy was copied from interp1 (was NULL)
+        #  * module's global state was updated, not reset
+
+        # Use a subinterpreter while the previous one is still alive.
+        loaded_interp2 = self.import_in_subinterp(interpid2)
+        self.check_common(loaded_interp2)
+        self.check_copied(loaded_interp2, loaded_interp1)
+
+        # At this point:
+        #  * alive in 2 interpreters (interp1, interp2)
+        #  * module def still in _PyRuntime.imports.extensions
+        #  * mod init func ran again
+        #  * m_copy was copied from interp2 (was from interp1)
+        #  * module's global state was updated, not reset
+
+    @requires_subinterpreters
+    def test_basic_multiple_interpreters_reset_each(self):
+        # resetting between each interpreter
+
+        # At this point:
+        #  * alive in 0 interpreters
+        #  * module def may or may not be loaded already
+        #  * module def not in _PyRuntime.imports.extensions
+        #  * mod init func has not run yet (since reset, at least)
+        #  * m_copy not set (hasn't been loaded yet or already cleared)
+        #  * module's global state has not been initialized yet
+        #    (or already cleared)
+
+        interpid1 = self.add_subinterpreter()
+        interpid2 = self.add_subinterpreter()
+
+        # Use an interpreter that gets destroyed right away.
+        loaded = self.import_in_subinterp(
+            postscript='''
+            # Attrs set after loading are not in m_copy.
+            mod.spam = 'spam, spam, mash, spam, eggs, and spam'
+            ''',
+            postcleanup=True,
+        )
+        self.check_common(loaded)
+        self.check_fresh(loaded)
+
+        # At this point:
+        #  * alive in 0 interpreters
+        #  * module def in _PyRuntime.imports.extensions
+        #  * mod init func ran for the first time (since reset, at least)
+        #  * m_copy is NULL (claered when the interpreter was destroyed)
+        #  * module's global state was initialized, not reset
+
+        # Use a subinterpreter that sticks around.
+        loaded = self.import_in_subinterp(interpid1, postcleanup=True)
+        self.check_common(loaded)
+        self.check_fresh(loaded)
+
+        # At this point:
+        #  * alive in 1 interpreter (interp1)
+        #  * module def still in _PyRuntime.imports.extensions
+        #  * mod init func ran again
+        #  * m_copy was copied from interp1 (was NULL)
+        #  * module's global state was initialized, not reset
+
+        # Use a subinterpreter while the previous one is still alive.
+        loaded = self.import_in_subinterp(interpid2, postcleanup=True)
+        self.check_common(loaded)
+        self.check_fresh(loaded)
+
+        # At this point:
+        #  * alive in 2 interpreters (interp2, interp2)
+        #  * module def still in _PyRuntime.imports.extensions
+        #  * mod init func ran again
+        #  * m_copy was copied from interp2 (was from interp1)
+        #  * module's global state was initialized, not reset
+
+
 if __name__ == '__main__':
     # Test needs to be a package, so we can do relative imports.
     unittest.main()



More information about the Python-checkins mailing list