[Python-checkins] gh-98040: Move the Single-Phase Init Tests Out of test_imp (gh-102561)
ericsnowcurrently
webhook-mailer at python.org
Wed Apr 19 18:09:45 EDT 2023
https://github.com/python/cpython/commit/6be7aee18c5b8e639103df951d0d277f4b46f902
commit: 6be7aee18c5b8e639103df951d0d277f4b46f902
branch: main
author: Eric Snow <ericsnowcurrently at gmail.com>
committer: ericsnowcurrently <ericsnowcurrently at gmail.com>
date: 2023-04-19T16:09:35-06:00
summary:
gh-98040: Move the Single-Phase Init Tests Out of test_imp (gh-102561)
I recently added some tests to test_imp, but @warsaw is removing that file in gh-98573. The tests are worth keeping so here I'm moving them to test_import.
files:
M Lib/test/test_imp.py
M Lib/test/test_import/__init__.py
diff --git a/Lib/test/test_imp.py b/Lib/test/test_imp.py
index 03e3adba221e..80abc720c325 100644
--- a/Lib/test/test_imp.py
+++ b/Lib/test/test_imp.py
@@ -1,5 +1,4 @@
import gc
-import json
import importlib
import importlib.util
import os
@@ -11,28 +10,15 @@
from test.support import os_helper
from test.support import script_helper
from test.support import warnings_helper
-import textwrap
-import types
import unittest
import warnings
imp = warnings_helper.import_deprecated('imp')
import _imp
-import _testinternalcapi
-try:
- import _xxsubinterpreters as _interpreters
-except ModuleNotFoundError:
- _interpreters = None
OS_PATH_NAME = os.path.__name__
-def requires_subinterpreters(meth):
- """Decorator to skip a test if subinterpreters are not supported."""
- return unittest.skipIf(_interpreters is None,
- 'subinterpreters required')(meth)
-
-
def requires_load_dynamic(meth):
"""Decorator to skip a test if not running under CPython or lacking
imp.load_dynamic()."""
@@ -41,169 +27,6 @@ def requires_load_dynamic(meth):
'imp.load_dynamic() required')(meth)
-class ModuleSnapshot(types.SimpleNamespace):
- """A representation of a module for testing.
-
- Fields:
-
- * id - the module's object ID
- * module - the actual module or an adequate substitute
- * __file__
- * __spec__
- * name
- * origin
- * ns - a copy (dict) of the module's __dict__ (or None)
- * ns_id - the object ID of the module's __dict__
- * cached - the sys.modules[mod.__spec__.name] entry (or None)
- * cached_id - the object ID of the sys.modules entry (or None)
-
- In cases where the value is not available (e.g. due to serialization),
- the value will be None.
- """
- _fields = tuple('id module ns ns_id cached cached_id'.split())
-
- @classmethod
- def from_module(cls, mod):
- name = mod.__spec__.name
- cached = sys.modules.get(name)
- return cls(
- id=id(mod),
- module=mod,
- ns=types.SimpleNamespace(**mod.__dict__),
- ns_id=id(mod.__dict__),
- cached=cached,
- cached_id=id(cached),
- )
-
- SCRIPT = textwrap.dedent('''
- {imports}
-
- name = {name!r}
-
- {prescript}
-
- mod = {name}
-
- {body}
-
- {postscript}
- ''')
- IMPORTS = textwrap.dedent('''
- import sys
- ''').strip()
- SCRIPT_BODY = textwrap.dedent('''
- # Capture the snapshot data.
- cached = sys.modules.get(name)
- snapshot = dict(
- id=id(mod),
- module=dict(
- __file__=mod.__file__,
- __spec__=dict(
- name=mod.__spec__.name,
- origin=mod.__spec__.origin,
- ),
- ),
- ns=None,
- ns_id=id(mod.__dict__),
- cached=None,
- cached_id=id(cached) if cached else None,
- )
- ''').strip()
- CLEANUP_SCRIPT = textwrap.dedent('''
- # Clean up the module.
- sys.modules.pop(name, None)
- ''').strip()
-
- @classmethod
- def build_script(cls, name, *,
- prescript=None,
- import_first=False,
- postscript=None,
- postcleanup=False,
- ):
- if postcleanup is True:
- postcleanup = cls.CLEANUP_SCRIPT
- elif isinstance(postcleanup, str):
- postcleanup = textwrap.dedent(postcleanup).strip()
- postcleanup = cls.CLEANUP_SCRIPT + os.linesep + postcleanup
- else:
- postcleanup = ''
- prescript = textwrap.dedent(prescript).strip() if prescript else ''
- postscript = textwrap.dedent(postscript).strip() if postscript else ''
-
- if postcleanup:
- if postscript:
- postscript = postscript + os.linesep * 2 + postcleanup
- else:
- postscript = postcleanup
-
- if import_first:
- prescript += textwrap.dedent(f'''
-
- # Now import the module.
- assert name not in sys.modules
- import {name}''')
-
- return cls.SCRIPT.format(
- imports=cls.IMPORTS.strip(),
- name=name,
- prescript=prescript.strip(),
- body=cls.SCRIPT_BODY.strip(),
- postscript=postscript,
- )
-
- @classmethod
- def parse(cls, text):
- raw = json.loads(text)
- mod = raw['module']
- mod['__spec__'] = types.SimpleNamespace(**mod['__spec__'])
- raw['module'] = types.SimpleNamespace(**mod)
- return cls(**raw)
-
- @classmethod
- def from_subinterp(cls, name, interpid=None, *, pipe=None, **script_kwds):
- if pipe is not None:
- return cls._from_subinterp(name, interpid, pipe, script_kwds)
- pipe = os.pipe()
- try:
- return cls._from_subinterp(name, interpid, pipe, script_kwds)
- finally:
- r, w = pipe
- os.close(r)
- os.close(w)
-
- @classmethod
- def _from_subinterp(cls, name, interpid, pipe, script_kwargs):
- r, w = pipe
-
- # Build the script.
- postscript = textwrap.dedent(f'''
- # Send the result over the pipe.
- import json
- import os
- os.write({w}, json.dumps(snapshot).encode())
-
- ''')
- _postscript = script_kwargs.get('postscript')
- if _postscript:
- _postscript = textwrap.dedent(_postscript).lstrip()
- postscript += _postscript
- script_kwargs['postscript'] = postscript.strip()
- script = cls.build_script(name, **script_kwargs)
-
- # Run the script.
- if interpid is None:
- ret = support.run_in_subinterp(script)
- if ret != 0:
- raise AssertionError(f'{ret} != 0')
- else:
- _interpreters.run_string(interpid, script)
-
- # Parse the results.
- text = os.read(r, 1000)
- return cls.parse(text.decode())
-
-
class LockTests(unittest.TestCase):
"""Very basic test of import lock functions."""
@@ -620,669 +443,6 @@ def check_get_builtins():
check_get_builtins()
-class TestSinglePhaseSnapshot(ModuleSnapshot):
-
- @classmethod
- def from_module(cls, mod):
- self = super().from_module(mod)
- self.summed = mod.sum(1, 2)
- self.lookedup = mod.look_up_self()
- self.lookedup_id = id(self.lookedup)
- self.state_initialized = mod.state_initialized()
- if hasattr(mod, 'initialized_count'):
- self.init_count = mod.initialized_count()
- return self
-
- SCRIPT_BODY = ModuleSnapshot.SCRIPT_BODY + textwrap.dedent(f'''
- snapshot['module'].update(dict(
- int_const=mod.int_const,
- str_const=mod.str_const,
- _module_initialized=mod._module_initialized,
- ))
- snapshot.update(dict(
- summed=mod.sum(1, 2),
- lookedup_id=id(mod.look_up_self()),
- state_initialized=mod.state_initialized(),
- init_count=mod.initialized_count(),
- has_spam=hasattr(mod, 'spam'),
- spam=getattr(mod, 'spam', None),
- ))
- ''').rstrip()
-
- @classmethod
- def parse(cls, text):
- self = super().parse(text)
- if not self.has_spam:
- del self.spam
- del self.has_spam
- return self
-
-
- at requires_load_dynamic
-class SinglephaseInitTests(unittest.TestCase):
-
- NAME = '_testsinglephase'
-
- @classmethod
- def setUpClass(cls):
- if '-R' in sys.argv or '--huntrleaks' in sys.argv:
- # https://github.com/python/cpython/issues/102251
- raise unittest.SkipTest('unresolved refleaks (see gh-102251)')
- fileobj, filename, _ = imp.find_module(cls.NAME)
- fileobj.close()
- cls.FILE = filename
-
- # Start fresh.
- cls.clean_up()
-
- def tearDown(self):
- # Clean up the module.
- self.clean_up()
-
- @classmethod
- def clean_up(cls):
- name = cls.NAME
- filename = cls.FILE
- if name in sys.modules:
- if hasattr(sys.modules[name], '_clear_globals'):
- assert sys.modules[name].__file__ == filename
- sys.modules[name]._clear_globals()
- del sys.modules[name]
- # Clear all internally cached data for the extension.
- _testinternalcapi.clear_extension(name, filename)
-
- #########################
- # helpers
-
- def add_module_cleanup(self, name):
- def clean_up():
- # Clear all internally cached data for the extension.
- _testinternalcapi.clear_extension(name, self.FILE)
- self.addCleanup(clean_up)
-
- def load(self, name):
- try:
- already_loaded = self.already_loaded
- except AttributeError:
- already_loaded = self.already_loaded = {}
- assert name not in already_loaded
- mod = imp.load_dynamic(name, self.FILE)
- self.assertNotIn(mod, already_loaded.values())
- already_loaded[name] = mod
- return types.SimpleNamespace(
- name=name,
- module=mod,
- snapshot=TestSinglePhaseSnapshot.from_module(mod),
- )
-
- def re_load(self, name, mod):
- assert sys.modules[name] is mod
- assert mod.__dict__ == mod.__dict__
- reloaded = imp.load_dynamic(name, self.FILE)
- return types.SimpleNamespace(
- name=name,
- module=reloaded,
- snapshot=TestSinglePhaseSnapshot.from_module(reloaded),
- )
-
- # subinterpreters
-
- def add_subinterpreter(self):
- interpid = _interpreters.create(isolated=False)
- _interpreters.run_string(interpid, textwrap.dedent('''
- import sys
- import _testinternalcapi
- '''))
- def clean_up():
- _interpreters.run_string(interpid, textwrap.dedent(f'''
- name = {self.NAME!r}
- if name in sys.modules:
- sys.modules[name]._clear_globals()
- _testinternalcapi.clear_extension(name, {self.FILE!r})
- '''))
- _interpreters.destroy(interpid)
- self.addCleanup(clean_up)
- return interpid
-
- def import_in_subinterp(self, interpid=None, *,
- postscript=None,
- postcleanup=False,
- ):
- name = self.NAME
-
- if postcleanup:
- import_ = 'import _testinternalcapi' if interpid is None else ''
- postcleanup = f'''
- {import_}
- mod._clear_globals()
- _testinternalcapi.clear_extension(name, {self.FILE!r})
- '''
-
- try:
- pipe = self._pipe
- except AttributeError:
- r, w = pipe = self._pipe = os.pipe()
- self.addCleanup(os.close, r)
- self.addCleanup(os.close, w)
-
- snapshot = TestSinglePhaseSnapshot.from_subinterp(
- name,
- interpid,
- pipe=pipe,
- import_first=True,
- postscript=postscript,
- postcleanup=postcleanup,
- )
-
- return types.SimpleNamespace(
- name=name,
- module=None,
- snapshot=snapshot,
- )
-
- # checks
-
- def check_common(self, loaded):
- isolated = False
-
- mod = loaded.module
- if not mod:
- # It came from a subinterpreter.
- isolated = True
- mod = loaded.snapshot.module
- # mod.__name__ might not match, but the spec will.
- self.assertEqual(mod.__spec__.name, loaded.name)
- self.assertEqual(mod.__file__, self.FILE)
- self.assertEqual(mod.__spec__.origin, self.FILE)
- if not isolated:
- self.assertTrue(issubclass(mod.error, Exception))
- self.assertEqual(mod.int_const, 1969)
- self.assertEqual(mod.str_const, 'something different')
- self.assertIsInstance(mod._module_initialized, float)
- self.assertGreater(mod._module_initialized, 0)
-
- snap = loaded.snapshot
- self.assertEqual(snap.summed, 3)
- if snap.state_initialized is not None:
- self.assertIsInstance(snap.state_initialized, float)
- self.assertGreater(snap.state_initialized, 0)
- if isolated:
- # The "looked up" module is interpreter-specific
- # (interp->imports.modules_by_index was set for the module).
- self.assertEqual(snap.lookedup_id, snap.id)
- self.assertEqual(snap.cached_id, snap.id)
- with self.assertRaises(AttributeError):
- snap.spam
- else:
- self.assertIs(snap.lookedup, mod)
- self.assertIs(snap.cached, mod)
-
- def check_direct(self, loaded):
- # The module has its own PyModuleDef, with a matching name.
- self.assertEqual(loaded.module.__name__, loaded.name)
- self.assertIs(loaded.snapshot.lookedup, loaded.module)
-
- def check_indirect(self, loaded, orig):
- # The module re-uses another's PyModuleDef, with a different name.
- assert orig is not loaded.module
- assert orig.__name__ != loaded.name
- self.assertNotEqual(loaded.module.__name__, loaded.name)
- self.assertIs(loaded.snapshot.lookedup, loaded.module)
-
- def check_basic(self, loaded, expected_init_count):
- # m_size == -1
- # The module loads fresh the first time and copies m_copy after.
- snap = loaded.snapshot
- self.assertIsNot(snap.state_initialized, None)
- self.assertIsInstance(snap.init_count, int)
- self.assertGreater(snap.init_count, 0)
- self.assertEqual(snap.init_count, expected_init_count)
-
- def check_with_reinit(self, loaded):
- # m_size >= 0
- # The module loads fresh every time.
- pass
-
- def check_fresh(self, loaded):
- """
- The module had not been loaded before (at least since fully reset).
- """
- snap = loaded.snapshot
- # The module's init func was run.
- # A copy of the module's __dict__ was stored in def->m_base.m_copy.
- # The previous m_copy was deleted first.
- # _PyRuntime.imports.extensions was set.
- self.assertEqual(snap.init_count, 1)
- # The global state was initialized.
- # The module attrs were initialized from that state.
- self.assertEqual(snap.module._module_initialized,
- snap.state_initialized)
-
- def check_semi_fresh(self, loaded, base, prev):
- """
- The module had been loaded before and then reset
- (but the module global state wasn't).
- """
- snap = loaded.snapshot
- # The module's init func was run again.
- # A copy of the module's __dict__ was stored in def->m_base.m_copy.
- # The previous m_copy was deleted first.
- # The module globals did not get reset.
- self.assertNotEqual(snap.id, base.snapshot.id)
- self.assertNotEqual(snap.id, prev.snapshot.id)
- self.assertEqual(snap.init_count, prev.snapshot.init_count + 1)
- # The global state was updated.
- # The module attrs were initialized from that state.
- self.assertEqual(snap.module._module_initialized,
- snap.state_initialized)
- self.assertNotEqual(snap.state_initialized,
- base.snapshot.state_initialized)
- self.assertNotEqual(snap.state_initialized,
- prev.snapshot.state_initialized)
-
- def check_copied(self, loaded, base):
- """
- The module had been loaded before and never reset.
- """
- snap = loaded.snapshot
- # The module's init func was not run again.
- # The interpreter copied m_copy, as set by the other interpreter,
- # with objects owned by the other interpreter.
- # The module globals did not get reset.
- self.assertNotEqual(snap.id, base.snapshot.id)
- self.assertEqual(snap.init_count, base.snapshot.init_count)
- # The global state was not updated since the init func did not run.
- # The module attrs were not directly initialized from that state.
- # The state and module attrs still match the previous loading.
- self.assertEqual(snap.module._module_initialized,
- snap.state_initialized)
- self.assertEqual(snap.state_initialized,
- base.snapshot.state_initialized)
-
- #########################
- # the tests
-
- def test_cleared_globals(self):
- loaded = self.load(self.NAME)
- _testsinglephase = loaded.module
- init_before = _testsinglephase.state_initialized()
-
- _testsinglephase._clear_globals()
- init_after = _testsinglephase.state_initialized()
- init_count = _testsinglephase.initialized_count()
-
- self.assertGreater(init_before, 0)
- self.assertEqual(init_after, 0)
- self.assertEqual(init_count, -1)
-
- def test_variants(self):
- # Exercise the most meaningful variants described in Python/import.c.
- self.maxDiff = None
-
- # Check the "basic" module.
-
- name = self.NAME
- expected_init_count = 1
- with self.subTest(name):
- loaded = self.load(name)
-
- self.check_common(loaded)
- self.check_direct(loaded)
- self.check_basic(loaded, expected_init_count)
- basic = loaded.module
-
- # Check its indirect variants.
-
- name = f'{self.NAME}_basic_wrapper'
- self.add_module_cleanup(name)
- expected_init_count += 1
- with self.subTest(name):
- loaded = self.load(name)
-
- self.check_common(loaded)
- self.check_indirect(loaded, basic)
- self.check_basic(loaded, expected_init_count)
-
- # Currently PyState_AddModule() always replaces the cached module.
- self.assertIs(basic.look_up_self(), loaded.module)
- self.assertEqual(basic.initialized_count(), expected_init_count)
-
- # The cached module shouldn't change after this point.
- basic_lookedup = loaded.module
-
- # Check its direct variant.
-
- name = f'{self.NAME}_basic_copy'
- self.add_module_cleanup(name)
- expected_init_count += 1
- with self.subTest(name):
- loaded = self.load(name)
-
- self.check_common(loaded)
- self.check_direct(loaded)
- self.check_basic(loaded, expected_init_count)
-
- # This should change the cached module for _testsinglephase.
- self.assertIs(basic.look_up_self(), basic_lookedup)
- self.assertEqual(basic.initialized_count(), expected_init_count)
-
- # Check the non-basic variant that has no state.
-
- name = f'{self.NAME}_with_reinit'
- self.add_module_cleanup(name)
- with self.subTest(name):
- loaded = self.load(name)
-
- self.check_common(loaded)
- self.assertIs(loaded.snapshot.state_initialized, None)
- self.check_direct(loaded)
- self.check_with_reinit(loaded)
-
- # This should change the cached module for _testsinglephase.
- self.assertIs(basic.look_up_self(), basic_lookedup)
- self.assertEqual(basic.initialized_count(), expected_init_count)
-
- # Check the basic variant that has state.
-
- name = f'{self.NAME}_with_state'
- self.add_module_cleanup(name)
- with self.subTest(name):
- loaded = self.load(name)
-
- self.check_common(loaded)
- self.assertIsNot(loaded.snapshot.state_initialized, None)
- self.check_direct(loaded)
- self.check_with_reinit(loaded)
-
- # This should change the cached module for _testsinglephase.
- self.assertIs(basic.look_up_self(), basic_lookedup)
- self.assertEqual(basic.initialized_count(), expected_init_count)
-
- def test_basic_reloaded(self):
- # m_copy is copied into the existing module object.
- # Global state is not changed.
- self.maxDiff = None
-
- for name in [
- self.NAME, # the "basic" module
- f'{self.NAME}_basic_wrapper', # the indirect variant
- f'{self.NAME}_basic_copy', # the direct variant
- ]:
- self.add_module_cleanup(name)
- with self.subTest(name):
- loaded = self.load(name)
- reloaded = self.re_load(name, loaded.module)
-
- self.check_common(loaded)
- self.check_common(reloaded)
-
- # Make sure the original __dict__ did not get replaced.
- self.assertEqual(id(loaded.module.__dict__),
- loaded.snapshot.ns_id)
- self.assertEqual(loaded.snapshot.ns.__dict__,
- loaded.module.__dict__)
-
- self.assertEqual(reloaded.module.__spec__.name, reloaded.name)
- self.assertEqual(reloaded.module.__name__,
- reloaded.snapshot.ns.__name__)
-
- self.assertIs(reloaded.module, loaded.module)
- self.assertIs(reloaded.module.__dict__, loaded.module.__dict__)
- # It only happens to be the same but that's good enough here.
- # We really just want to verify that the re-loaded attrs
- # didn't change.
- self.assertIs(reloaded.snapshot.lookedup,
- loaded.snapshot.lookedup)
- self.assertEqual(reloaded.snapshot.state_initialized,
- loaded.snapshot.state_initialized)
- self.assertEqual(reloaded.snapshot.init_count,
- loaded.snapshot.init_count)
-
- self.assertIs(reloaded.snapshot.cached, reloaded.module)
-
- def test_with_reinit_reloaded(self):
- # The module's m_init func is run again.
- self.maxDiff = None
-
- # Keep a reference around.
- basic = self.load(self.NAME)
-
- for name in [
- f'{self.NAME}_with_reinit', # m_size == 0
- f'{self.NAME}_with_state', # m_size > 0
- ]:
- self.add_module_cleanup(name)
- with self.subTest(name):
- loaded = self.load(name)
- reloaded = self.re_load(name, loaded.module)
-
- self.check_common(loaded)
- self.check_common(reloaded)
-
- # Make sure the original __dict__ did not get replaced.
- self.assertEqual(id(loaded.module.__dict__),
- loaded.snapshot.ns_id)
- self.assertEqual(loaded.snapshot.ns.__dict__,
- loaded.module.__dict__)
-
- self.assertEqual(reloaded.module.__spec__.name, reloaded.name)
- self.assertEqual(reloaded.module.__name__,
- reloaded.snapshot.ns.__name__)
-
- self.assertIsNot(reloaded.module, loaded.module)
- self.assertNotEqual(reloaded.module.__dict__,
- loaded.module.__dict__)
- self.assertIs(reloaded.snapshot.lookedup, reloaded.module)
- if loaded.snapshot.state_initialized is None:
- self.assertIs(reloaded.snapshot.state_initialized, None)
- else:
- self.assertGreater(reloaded.snapshot.state_initialized,
- loaded.snapshot.state_initialized)
-
- self.assertIs(reloaded.snapshot.cached, reloaded.module)
-
- # Currently, for every single-phrase init module loaded
- # in multiple interpreters, those interpreters share a
- # PyModuleDef for that object, which can be a problem.
- # Also, we test with a single-phase module that has global state,
- # which is shared by all interpreters.
-
- @requires_subinterpreters
- def test_basic_multiple_interpreters_main_no_reset(self):
- # without resetting; already loaded in main interpreter
-
- # At this point:
- # * alive in 0 interpreters
- # * module def may or may not be loaded already
- # * module def not in _PyRuntime.imports.extensions
- # * mod init func has not run yet (since reset, at least)
- # * m_copy not set (hasn't been loaded yet or already cleared)
- # * module's global state has not been initialized yet
- # (or already cleared)
-
- main_loaded = self.load(self.NAME)
- _testsinglephase = main_loaded.module
- # Attrs set after loading are not in m_copy.
- _testsinglephase.spam = 'spam, spam, spam, spam, eggs, and spam'
-
- self.check_common(main_loaded)
- self.check_fresh(main_loaded)
-
- interpid1 = self.add_subinterpreter()
- interpid2 = self.add_subinterpreter()
-
- # At this point:
- # * alive in 1 interpreter (main)
- # * module def in _PyRuntime.imports.extensions
- # * mod init func ran for the first time (since reset, at least)
- # * m_copy was copied from the main interpreter (was NULL)
- # * module's global state was initialized
-
- # Use an interpreter that gets destroyed right away.
- loaded = self.import_in_subinterp()
- self.check_common(loaded)
- self.check_copied(loaded, main_loaded)
-
- # At this point:
- # * alive in 1 interpreter (main)
- # * module def still in _PyRuntime.imports.extensions
- # * mod init func ran again
- # * m_copy is NULL (claered when the interpreter was destroyed)
- # (was from main interpreter)
- # * module's global state was updated, not reset
-
- # Use a subinterpreter that sticks around.
- loaded = self.import_in_subinterp(interpid1)
- self.check_common(loaded)
- self.check_copied(loaded, main_loaded)
-
- # At this point:
- # * alive in 2 interpreters (main, interp1)
- # * module def still in _PyRuntime.imports.extensions
- # * mod init func ran again
- # * m_copy was copied from interp1
- # * module's global state was updated, not reset
-
- # Use a subinterpreter while the previous one is still alive.
- loaded = self.import_in_subinterp(interpid2)
- self.check_common(loaded)
- self.check_copied(loaded, main_loaded)
-
- # At this point:
- # * alive in 3 interpreters (main, interp1, interp2)
- # * module def still in _PyRuntime.imports.extensions
- # * mod init func ran again
- # * m_copy was copied from interp2 (was from interp1)
- # * module's global state was updated, not reset
-
- @requires_subinterpreters
- def test_basic_multiple_interpreters_deleted_no_reset(self):
- # without resetting; already loaded in a deleted interpreter
-
- # At this point:
- # * alive in 0 interpreters
- # * module def may or may not be loaded already
- # * module def not in _PyRuntime.imports.extensions
- # * mod init func has not run yet (since reset, at least)
- # * m_copy not set (hasn't been loaded yet or already cleared)
- # * module's global state has not been initialized yet
- # (or already cleared)
-
- interpid1 = self.add_subinterpreter()
- interpid2 = self.add_subinterpreter()
-
- # First, load in the main interpreter but then completely clear it.
- loaded_main = self.load(self.NAME)
- loaded_main.module._clear_globals()
- _testinternalcapi.clear_extension(self.NAME, self.FILE)
-
- # At this point:
- # * alive in 0 interpreters
- # * module def loaded already
- # * module def was in _PyRuntime.imports.extensions, but cleared
- # * mod init func ran for the first time (since reset, at least)
- # * m_copy was set, but cleared (was NULL)
- # * module's global state was initialized but cleared
-
- # Start with an interpreter that gets destroyed right away.
- base = self.import_in_subinterp(postscript='''
- # Attrs set after loading are not in m_copy.
- mod.spam = 'spam, spam, mash, spam, eggs, and spam'
- ''')
- self.check_common(base)
- self.check_fresh(base)
-
- # At this point:
- # * alive in 0 interpreters
- # * module def in _PyRuntime.imports.extensions
- # * mod init func ran again
- # * m_copy is NULL (claered when the interpreter was destroyed)
- # * module's global state was initialized, not reset
-
- # Use a subinterpreter that sticks around.
- loaded_interp1 = self.import_in_subinterp(interpid1)
- self.check_common(loaded_interp1)
- self.check_semi_fresh(loaded_interp1, loaded_main, base)
-
- # At this point:
- # * alive in 1 interpreter (interp1)
- # * module def still in _PyRuntime.imports.extensions
- # * mod init func ran again
- # * m_copy was copied from interp1 (was NULL)
- # * module's global state was updated, not reset
-
- # Use a subinterpreter while the previous one is still alive.
- loaded_interp2 = self.import_in_subinterp(interpid2)
- self.check_common(loaded_interp2)
- self.check_copied(loaded_interp2, loaded_interp1)
-
- # At this point:
- # * alive in 2 interpreters (interp1, interp2)
- # * module def still in _PyRuntime.imports.extensions
- # * mod init func ran again
- # * m_copy was copied from interp2 (was from interp1)
- # * module's global state was updated, not reset
-
- @requires_subinterpreters
- @requires_load_dynamic
- def test_basic_multiple_interpreters_reset_each(self):
- # resetting between each interpreter
-
- # At this point:
- # * alive in 0 interpreters
- # * module def may or may not be loaded already
- # * module def not in _PyRuntime.imports.extensions
- # * mod init func has not run yet (since reset, at least)
- # * m_copy not set (hasn't been loaded yet or already cleared)
- # * module's global state has not been initialized yet
- # (or already cleared)
-
- interpid1 = self.add_subinterpreter()
- interpid2 = self.add_subinterpreter()
-
- # Use an interpreter that gets destroyed right away.
- loaded = self.import_in_subinterp(
- postscript='''
- # Attrs set after loading are not in m_copy.
- mod.spam = 'spam, spam, mash, spam, eggs, and spam'
- ''',
- postcleanup=True,
- )
- self.check_common(loaded)
- self.check_fresh(loaded)
-
- # At this point:
- # * alive in 0 interpreters
- # * module def in _PyRuntime.imports.extensions
- # * mod init func ran for the first time (since reset, at least)
- # * m_copy is NULL (claered when the interpreter was destroyed)
- # * module's global state was initialized, not reset
-
- # Use a subinterpreter that sticks around.
- loaded = self.import_in_subinterp(interpid1, postcleanup=True)
- self.check_common(loaded)
- self.check_fresh(loaded)
-
- # At this point:
- # * alive in 1 interpreter (interp1)
- # * module def still in _PyRuntime.imports.extensions
- # * mod init func ran again
- # * m_copy was copied from interp1 (was NULL)
- # * module's global state was initialized, not reset
-
- # Use a subinterpreter while the previous one is still alive.
- loaded = self.import_in_subinterp(interpid2, postcleanup=True)
- self.check_common(loaded)
- self.check_fresh(loaded)
-
- # At this point:
- # * alive in 2 interpreters (interp2, interp2)
- # * module def still in _PyRuntime.imports.extensions
- # * mod init func ran again
- # * m_copy was copied from interp2 (was from interp1)
- # * module's global state was initialized, not reset
-
-
class ReloadTests(unittest.TestCase):
"""Very basic tests to make sure that imp.reload() operates just like
diff --git a/Lib/test/test_import/__init__.py b/Lib/test/test_import/__init__.py
index 3ef07203c46c..66ae554f984f 100644
--- a/Lib/test/test_import/__init__.py
+++ b/Lib/test/test_import/__init__.py
@@ -2,6 +2,7 @@
import contextlib
import errno
import glob
+import json
import importlib.util
from importlib._bootstrap_external import _get_sourcefile
from importlib.machinery import (
@@ -18,13 +19,15 @@
import textwrap
import threading
import time
+import types
import unittest
from unittest import mock
+import _testinternalcapi
from test.support import os_helper
from test.support import (
STDLIB_DIR, swap_attr, swap_item, cpython_only, is_emscripten,
- is_wasi, run_in_subinterp_with_config)
+ is_wasi, run_in_subinterp, run_in_subinterp_with_config)
from test.support.import_helper import (
forget, make_legacy_pyc, unlink, unload, DirsOnSysPath, CleanImport)
from test.support.os_helper import (
@@ -41,6 +44,10 @@
import _testmultiphase
except ImportError:
_testmultiphase = None
+try:
+ import _xxsubinterpreters as _interpreters
+except ModuleNotFoundError:
+ _interpreters = None
skip_if_dont_write_bytecode = unittest.skipIf(
@@ -120,6 +127,182 @@ def _ready_to_import(name=None, source=""):
del sys.modules[name]
+def requires_subinterpreters(meth):
+ """Decorator to skip a test if subinterpreters are not supported."""
+ return unittest.skipIf(_interpreters is None,
+ 'subinterpreters required')(meth)
+
+
+def requires_singlephase_init(meth):
+ """Decorator to skip if single-phase init modules are not supported."""
+ meth = cpython_only(meth)
+ return unittest.skipIf(_testsinglephase is None,
+ 'test requires _testsinglephase module')(meth)
+
+
+class ModuleSnapshot(types.SimpleNamespace):
+ """A representation of a module for testing.
+
+ Fields:
+
+ * id - the module's object ID
+ * module - the actual module or an adequate substitute
+ * __file__
+ * __spec__
+ * name
+ * origin
+ * ns - a copy (dict) of the module's __dict__ (or None)
+ * ns_id - the object ID of the module's __dict__
+ * cached - the sys.modules[mod.__spec__.name] entry (or None)
+ * cached_id - the object ID of the sys.modules entry (or None)
+
+ In cases where the value is not available (e.g. due to serialization),
+ the value will be None.
+ """
+ _fields = tuple('id module ns ns_id cached cached_id'.split())
+
+ @classmethod
+ def from_module(cls, mod):
+ name = mod.__spec__.name
+ cached = sys.modules.get(name)
+ return cls(
+ id=id(mod),
+ module=mod,
+ ns=types.SimpleNamespace(**mod.__dict__),
+ ns_id=id(mod.__dict__),
+ cached=cached,
+ cached_id=id(cached),
+ )
+
+ SCRIPT = textwrap.dedent('''
+ {imports}
+
+ name = {name!r}
+
+ {prescript}
+
+ mod = {name}
+
+ {body}
+
+ {postscript}
+ ''')
+ IMPORTS = textwrap.dedent('''
+ import sys
+ ''').strip()
+ SCRIPT_BODY = textwrap.dedent('''
+ # Capture the snapshot data.
+ cached = sys.modules.get(name)
+ snapshot = dict(
+ id=id(mod),
+ module=dict(
+ __file__=mod.__file__,
+ __spec__=dict(
+ name=mod.__spec__.name,
+ origin=mod.__spec__.origin,
+ ),
+ ),
+ ns=None,
+ ns_id=id(mod.__dict__),
+ cached=None,
+ cached_id=id(cached) if cached else None,
+ )
+ ''').strip()
+ CLEANUP_SCRIPT = textwrap.dedent('''
+ # Clean up the module.
+ sys.modules.pop(name, None)
+ ''').strip()
+
+ @classmethod
+ def build_script(cls, name, *,
+ prescript=None,
+ import_first=False,
+ postscript=None,
+ postcleanup=False,
+ ):
+ if postcleanup is True:
+ postcleanup = cls.CLEANUP_SCRIPT
+ elif isinstance(postcleanup, str):
+ postcleanup = textwrap.dedent(postcleanup).strip()
+ postcleanup = cls.CLEANUP_SCRIPT + os.linesep + postcleanup
+ else:
+ postcleanup = ''
+ prescript = textwrap.dedent(prescript).strip() if prescript else ''
+ postscript = textwrap.dedent(postscript).strip() if postscript else ''
+
+ if postcleanup:
+ if postscript:
+ postscript = postscript + os.linesep * 2 + postcleanup
+ else:
+ postscript = postcleanup
+
+ if import_first:
+ prescript += textwrap.dedent(f'''
+
+ # Now import the module.
+ assert name not in sys.modules
+ import {name}''')
+
+ return cls.SCRIPT.format(
+ imports=cls.IMPORTS.strip(),
+ name=name,
+ prescript=prescript.strip(),
+ body=cls.SCRIPT_BODY.strip(),
+ postscript=postscript,
+ )
+
+ @classmethod
+ def parse(cls, text):
+ raw = json.loads(text)
+ mod = raw['module']
+ mod['__spec__'] = types.SimpleNamespace(**mod['__spec__'])
+ raw['module'] = types.SimpleNamespace(**mod)
+ return cls(**raw)
+
+ @classmethod
+ def from_subinterp(cls, name, interpid=None, *, pipe=None, **script_kwds):
+ if pipe is not None:
+ return cls._from_subinterp(name, interpid, pipe, script_kwds)
+ pipe = os.pipe()
+ try:
+ return cls._from_subinterp(name, interpid, pipe, script_kwds)
+ finally:
+ r, w = pipe
+ os.close(r)
+ os.close(w)
+
+ @classmethod
+ def _from_subinterp(cls, name, interpid, pipe, script_kwargs):
+ r, w = pipe
+
+ # Build the script.
+ postscript = textwrap.dedent(f'''
+ # Send the result over the pipe.
+ import json
+ import os
+ os.write({w}, json.dumps(snapshot).encode())
+
+ ''')
+ _postscript = script_kwargs.get('postscript')
+ if _postscript:
+ _postscript = textwrap.dedent(_postscript).lstrip()
+ postscript += _postscript
+ script_kwargs['postscript'] = postscript.strip()
+ script = cls.build_script(name, **script_kwargs)
+
+ # Run the script.
+ if interpid is None:
+ ret = run_in_subinterp(script)
+ if ret != 0:
+ raise AssertionError(f'{ret} != 0')
+ else:
+ _interpreters.run_string(interpid, script)
+
+ # Parse the results.
+ text = os.read(r, 1000)
+ return cls.parse(text.decode())
+
+
class ImportTests(unittest.TestCase):
def setUp(self):
@@ -1604,7 +1787,7 @@ def test_frozen_compat(self):
with self.subTest(f'{module}: strict, not fresh'):
self.check_compatible_here(module, strict=True)
- @unittest.skipIf(_testsinglephase is None, "test requires _testsinglephase module")
+ @requires_singlephase_init
def test_single_init_extension_compat(self):
module = '_testsinglephase'
require_extension(module)
@@ -1636,7 +1819,7 @@ def test_python_compat(self):
with self.subTest(f'{module}: strict, fresh'):
self.check_compatible_fresh(module, strict=True)
- @unittest.skipIf(_testsinglephase is None, "test requires _testsinglephase module")
+ @requires_singlephase_init
def test_singlephase_check_with_setting_and_override(self):
module = '_testsinglephase'
require_extension(module)
@@ -1672,6 +1855,685 @@ def check_incompatible(setting, override):
check_compatible(False, -1)
+class TestSinglePhaseSnapshot(ModuleSnapshot):
+
+ @classmethod
+ def from_module(cls, mod):
+ self = super().from_module(mod)
+ self.summed = mod.sum(1, 2)
+ self.lookedup = mod.look_up_self()
+ self.lookedup_id = id(self.lookedup)
+ self.state_initialized = mod.state_initialized()
+ if hasattr(mod, 'initialized_count'):
+ self.init_count = mod.initialized_count()
+ return self
+
+ SCRIPT_BODY = ModuleSnapshot.SCRIPT_BODY + textwrap.dedent(f'''
+ snapshot['module'].update(dict(
+ int_const=mod.int_const,
+ str_const=mod.str_const,
+ _module_initialized=mod._module_initialized,
+ ))
+ snapshot.update(dict(
+ summed=mod.sum(1, 2),
+ lookedup_id=id(mod.look_up_self()),
+ state_initialized=mod.state_initialized(),
+ init_count=mod.initialized_count(),
+ has_spam=hasattr(mod, 'spam'),
+ spam=getattr(mod, 'spam', None),
+ ))
+ ''').rstrip()
+
+ @classmethod
+ def parse(cls, text):
+ self = super().parse(text)
+ if not self.has_spam:
+ del self.spam
+ del self.has_spam
+ return self
+
+
+ at requires_singlephase_init
+class SinglephaseInitTests(unittest.TestCase):
+
+ NAME = '_testsinglephase'
+
+ @classmethod
+ def setUpClass(cls):
+ if '-R' in sys.argv or '--huntrleaks' in sys.argv:
+ # https://github.com/python/cpython/issues/102251
+ raise unittest.SkipTest('unresolved refleaks (see gh-102251)')
+
+ spec = importlib.util.find_spec(cls.NAME)
+ from importlib.machinery import ExtensionFileLoader
+ cls.FILE = spec.origin
+ cls.LOADER = type(spec.loader)
+ assert cls.LOADER is ExtensionFileLoader
+
+ # Start fresh.
+ cls.clean_up()
+
+ def tearDown(self):
+ # Clean up the module.
+ self.clean_up()
+
+ @classmethod
+ def clean_up(cls):
+ name = cls.NAME
+ filename = cls.FILE
+ if name in sys.modules:
+ if hasattr(sys.modules[name], '_clear_globals'):
+ assert sys.modules[name].__file__ == filename
+ sys.modules[name]._clear_globals()
+ del sys.modules[name]
+ # Clear all internally cached data for the extension.
+ _testinternalcapi.clear_extension(name, filename)
+
+ #########################
+ # helpers
+
+ def add_module_cleanup(self, name):
+ def clean_up():
+ # Clear all internally cached data for the extension.
+ _testinternalcapi.clear_extension(name, self.FILE)
+ self.addCleanup(clean_up)
+
+ def _load_dynamic(self, name, path):
+ """
+ Load an extension module.
+ """
+ # This is essentially copied from the old imp module.
+ from importlib._bootstrap import _load
+ loader = self.LOADER(name, path)
+
+ # Issue bpo-24748: Skip the sys.modules check in _load_module_shim;
+ # always load new extension.
+ spec = importlib.util.spec_from_file_location(name, path,
+ loader=loader)
+ return _load(spec)
+
+ def load(self, name):
+ try:
+ already_loaded = self.already_loaded
+ except AttributeError:
+ already_loaded = self.already_loaded = {}
+ assert name not in already_loaded
+ mod = self._load_dynamic(name, self.FILE)
+ self.assertNotIn(mod, already_loaded.values())
+ already_loaded[name] = mod
+ return types.SimpleNamespace(
+ name=name,
+ module=mod,
+ snapshot=TestSinglePhaseSnapshot.from_module(mod),
+ )
+
+ def re_load(self, name, mod):
+ assert sys.modules[name] is mod
+ assert mod.__dict__ == mod.__dict__
+ reloaded = self._load_dynamic(name, self.FILE)
+ return types.SimpleNamespace(
+ name=name,
+ module=reloaded,
+ snapshot=TestSinglePhaseSnapshot.from_module(reloaded),
+ )
+
+ # subinterpreters
+
+ def add_subinterpreter(self):
+ interpid = _interpreters.create(isolated=False)
+ _interpreters.run_string(interpid, textwrap.dedent('''
+ import sys
+ import _testinternalcapi
+ '''))
+ def clean_up():
+ _interpreters.run_string(interpid, textwrap.dedent(f'''
+ name = {self.NAME!r}
+ if name in sys.modules:
+ sys.modules[name]._clear_globals()
+ _testinternalcapi.clear_extension(name, {self.FILE!r})
+ '''))
+ _interpreters.destroy(interpid)
+ self.addCleanup(clean_up)
+ return interpid
+
+ def import_in_subinterp(self, interpid=None, *,
+ postscript=None,
+ postcleanup=False,
+ ):
+ name = self.NAME
+
+ if postcleanup:
+ import_ = 'import _testinternalcapi' if interpid is None else ''
+ postcleanup = f'''
+ {import_}
+ mod._clear_globals()
+ _testinternalcapi.clear_extension(name, {self.FILE!r})
+ '''
+
+ try:
+ pipe = self._pipe
+ except AttributeError:
+ r, w = pipe = self._pipe = os.pipe()
+ self.addCleanup(os.close, r)
+ self.addCleanup(os.close, w)
+
+ snapshot = TestSinglePhaseSnapshot.from_subinterp(
+ name,
+ interpid,
+ pipe=pipe,
+ import_first=True,
+ postscript=postscript,
+ postcleanup=postcleanup,
+ )
+
+ return types.SimpleNamespace(
+ name=name,
+ module=None,
+ snapshot=snapshot,
+ )
+
+ # checks
+
+ def check_common(self, loaded):
+ isolated = False
+
+ mod = loaded.module
+ if not mod:
+ # It came from a subinterpreter.
+ isolated = True
+ mod = loaded.snapshot.module
+ # mod.__name__ might not match, but the spec will.
+ self.assertEqual(mod.__spec__.name, loaded.name)
+ self.assertEqual(mod.__file__, self.FILE)
+ self.assertEqual(mod.__spec__.origin, self.FILE)
+ if not isolated:
+ self.assertTrue(issubclass(mod.error, Exception))
+ self.assertEqual(mod.int_const, 1969)
+ self.assertEqual(mod.str_const, 'something different')
+ self.assertIsInstance(mod._module_initialized, float)
+ self.assertGreater(mod._module_initialized, 0)
+
+ snap = loaded.snapshot
+ self.assertEqual(snap.summed, 3)
+ if snap.state_initialized is not None:
+ self.assertIsInstance(snap.state_initialized, float)
+ self.assertGreater(snap.state_initialized, 0)
+ if isolated:
+ # The "looked up" module is interpreter-specific
+ # (interp->imports.modules_by_index was set for the module).
+ self.assertEqual(snap.lookedup_id, snap.id)
+ self.assertEqual(snap.cached_id, snap.id)
+ with self.assertRaises(AttributeError):
+ snap.spam
+ else:
+ self.assertIs(snap.lookedup, mod)
+ self.assertIs(snap.cached, mod)
+
+ def check_direct(self, loaded):
+ # The module has its own PyModuleDef, with a matching name.
+ self.assertEqual(loaded.module.__name__, loaded.name)
+ self.assertIs(loaded.snapshot.lookedup, loaded.module)
+
+ def check_indirect(self, loaded, orig):
+ # The module re-uses another's PyModuleDef, with a different name.
+ assert orig is not loaded.module
+ assert orig.__name__ != loaded.name
+ self.assertNotEqual(loaded.module.__name__, loaded.name)
+ self.assertIs(loaded.snapshot.lookedup, loaded.module)
+
+ def check_basic(self, loaded, expected_init_count):
+ # m_size == -1
+ # The module loads fresh the first time and copies m_copy after.
+ snap = loaded.snapshot
+ self.assertIsNot(snap.state_initialized, None)
+ self.assertIsInstance(snap.init_count, int)
+ self.assertGreater(snap.init_count, 0)
+ self.assertEqual(snap.init_count, expected_init_count)
+
+ def check_with_reinit(self, loaded):
+ # m_size >= 0
+ # The module loads fresh every time.
+ pass
+
+ def check_fresh(self, loaded):
+ """
+ The module had not been loaded before (at least since fully reset).
+ """
+ snap = loaded.snapshot
+ # The module's init func was run.
+ # A copy of the module's __dict__ was stored in def->m_base.m_copy.
+ # The previous m_copy was deleted first.
+ # _PyRuntime.imports.extensions was set.
+ self.assertEqual(snap.init_count, 1)
+ # The global state was initialized.
+ # The module attrs were initialized from that state.
+ self.assertEqual(snap.module._module_initialized,
+ snap.state_initialized)
+
+ def check_semi_fresh(self, loaded, base, prev):
+ """
+ The module had been loaded before and then reset
+ (but the module global state wasn't).
+ """
+ snap = loaded.snapshot
+ # The module's init func was run again.
+ # A copy of the module's __dict__ was stored in def->m_base.m_copy.
+ # The previous m_copy was deleted first.
+ # The module globals did not get reset.
+ self.assertNotEqual(snap.id, base.snapshot.id)
+ self.assertNotEqual(snap.id, prev.snapshot.id)
+ self.assertEqual(snap.init_count, prev.snapshot.init_count + 1)
+ # The global state was updated.
+ # The module attrs were initialized from that state.
+ self.assertEqual(snap.module._module_initialized,
+ snap.state_initialized)
+ self.assertNotEqual(snap.state_initialized,
+ base.snapshot.state_initialized)
+ self.assertNotEqual(snap.state_initialized,
+ prev.snapshot.state_initialized)
+
+ def check_copied(self, loaded, base):
+ """
+ The module had been loaded before and never reset.
+ """
+ snap = loaded.snapshot
+ # The module's init func was not run again.
+ # The interpreter copied m_copy, as set by the other interpreter,
+ # with objects owned by the other interpreter.
+ # The module globals did not get reset.
+ self.assertNotEqual(snap.id, base.snapshot.id)
+ self.assertEqual(snap.init_count, base.snapshot.init_count)
+ # The global state was not updated since the init func did not run.
+ # The module attrs were not directly initialized from that state.
+ # The state and module attrs still match the previous loading.
+ self.assertEqual(snap.module._module_initialized,
+ snap.state_initialized)
+ self.assertEqual(snap.state_initialized,
+ base.snapshot.state_initialized)
+
+ #########################
+ # the tests
+
+ def test_cleared_globals(self):
+ loaded = self.load(self.NAME)
+ _testsinglephase = loaded.module
+ init_before = _testsinglephase.state_initialized()
+
+ _testsinglephase._clear_globals()
+ init_after = _testsinglephase.state_initialized()
+ init_count = _testsinglephase.initialized_count()
+
+ self.assertGreater(init_before, 0)
+ self.assertEqual(init_after, 0)
+ self.assertEqual(init_count, -1)
+
+ def test_variants(self):
+ # Exercise the most meaningful variants described in Python/import.c.
+ self.maxDiff = None
+
+ # Check the "basic" module.
+
+ name = self.NAME
+ expected_init_count = 1
+ with self.subTest(name):
+ loaded = self.load(name)
+
+ self.check_common(loaded)
+ self.check_direct(loaded)
+ self.check_basic(loaded, expected_init_count)
+ basic = loaded.module
+
+ # Check its indirect variants.
+
+ name = f'{self.NAME}_basic_wrapper'
+ self.add_module_cleanup(name)
+ expected_init_count += 1
+ with self.subTest(name):
+ loaded = self.load(name)
+
+ self.check_common(loaded)
+ self.check_indirect(loaded, basic)
+ self.check_basic(loaded, expected_init_count)
+
+ # Currently PyState_AddModule() always replaces the cached module.
+ self.assertIs(basic.look_up_self(), loaded.module)
+ self.assertEqual(basic.initialized_count(), expected_init_count)
+
+ # The cached module shouldn't change after this point.
+ basic_lookedup = loaded.module
+
+ # Check its direct variant.
+
+ name = f'{self.NAME}_basic_copy'
+ self.add_module_cleanup(name)
+ expected_init_count += 1
+ with self.subTest(name):
+ loaded = self.load(name)
+
+ self.check_common(loaded)
+ self.check_direct(loaded)
+ self.check_basic(loaded, expected_init_count)
+
+ # This should change the cached module for _testsinglephase.
+ self.assertIs(basic.look_up_self(), basic_lookedup)
+ self.assertEqual(basic.initialized_count(), expected_init_count)
+
+ # Check the non-basic variant that has no state.
+
+ name = f'{self.NAME}_with_reinit'
+ self.add_module_cleanup(name)
+ with self.subTest(name):
+ loaded = self.load(name)
+
+ self.check_common(loaded)
+ self.assertIs(loaded.snapshot.state_initialized, None)
+ self.check_direct(loaded)
+ self.check_with_reinit(loaded)
+
+ # This should change the cached module for _testsinglephase.
+ self.assertIs(basic.look_up_self(), basic_lookedup)
+ self.assertEqual(basic.initialized_count(), expected_init_count)
+
+ # Check the basic variant that has state.
+
+ name = f'{self.NAME}_with_state'
+ self.add_module_cleanup(name)
+ with self.subTest(name):
+ loaded = self.load(name)
+
+ self.check_common(loaded)
+ self.assertIsNot(loaded.snapshot.state_initialized, None)
+ self.check_direct(loaded)
+ self.check_with_reinit(loaded)
+
+ # This should change the cached module for _testsinglephase.
+ self.assertIs(basic.look_up_self(), basic_lookedup)
+ self.assertEqual(basic.initialized_count(), expected_init_count)
+
+ def test_basic_reloaded(self):
+ # m_copy is copied into the existing module object.
+ # Global state is not changed.
+ self.maxDiff = None
+
+ for name in [
+ self.NAME, # the "basic" module
+ f'{self.NAME}_basic_wrapper', # the indirect variant
+ f'{self.NAME}_basic_copy', # the direct variant
+ ]:
+ self.add_module_cleanup(name)
+ with self.subTest(name):
+ loaded = self.load(name)
+ reloaded = self.re_load(name, loaded.module)
+
+ self.check_common(loaded)
+ self.check_common(reloaded)
+
+ # Make sure the original __dict__ did not get replaced.
+ self.assertEqual(id(loaded.module.__dict__),
+ loaded.snapshot.ns_id)
+ self.assertEqual(loaded.snapshot.ns.__dict__,
+ loaded.module.__dict__)
+
+ self.assertEqual(reloaded.module.__spec__.name, reloaded.name)
+ self.assertEqual(reloaded.module.__name__,
+ reloaded.snapshot.ns.__name__)
+
+ self.assertIs(reloaded.module, loaded.module)
+ self.assertIs(reloaded.module.__dict__, loaded.module.__dict__)
+ # It only happens to be the same but that's good enough here.
+ # We really just want to verify that the re-loaded attrs
+ # didn't change.
+ self.assertIs(reloaded.snapshot.lookedup,
+ loaded.snapshot.lookedup)
+ self.assertEqual(reloaded.snapshot.state_initialized,
+ loaded.snapshot.state_initialized)
+ self.assertEqual(reloaded.snapshot.init_count,
+ loaded.snapshot.init_count)
+
+ self.assertIs(reloaded.snapshot.cached, reloaded.module)
+
+ def test_with_reinit_reloaded(self):
+ # The module's m_init func is run again.
+ self.maxDiff = None
+
+ # Keep a reference around.
+ basic = self.load(self.NAME)
+
+ for name in [
+ f'{self.NAME}_with_reinit', # m_size == 0
+ f'{self.NAME}_with_state', # m_size > 0
+ ]:
+ self.add_module_cleanup(name)
+ with self.subTest(name):
+ loaded = self.load(name)
+ reloaded = self.re_load(name, loaded.module)
+
+ self.check_common(loaded)
+ self.check_common(reloaded)
+
+ # Make sure the original __dict__ did not get replaced.
+ self.assertEqual(id(loaded.module.__dict__),
+ loaded.snapshot.ns_id)
+ self.assertEqual(loaded.snapshot.ns.__dict__,
+ loaded.module.__dict__)
+
+ self.assertEqual(reloaded.module.__spec__.name, reloaded.name)
+ self.assertEqual(reloaded.module.__name__,
+ reloaded.snapshot.ns.__name__)
+
+ self.assertIsNot(reloaded.module, loaded.module)
+ self.assertNotEqual(reloaded.module.__dict__,
+ loaded.module.__dict__)
+ self.assertIs(reloaded.snapshot.lookedup, reloaded.module)
+ if loaded.snapshot.state_initialized is None:
+ self.assertIs(reloaded.snapshot.state_initialized, None)
+ else:
+ self.assertGreater(reloaded.snapshot.state_initialized,
+ loaded.snapshot.state_initialized)
+
+ self.assertIs(reloaded.snapshot.cached, reloaded.module)
+
+ # Currently, for every single-phrase init module loaded
+ # in multiple interpreters, those interpreters share a
+ # PyModuleDef for that object, which can be a problem.
+ # Also, we test with a single-phase module that has global state,
+ # which is shared by all interpreters.
+
+ @requires_subinterpreters
+ def test_basic_multiple_interpreters_main_no_reset(self):
+ # without resetting; already loaded in main interpreter
+
+ # At this point:
+ # * alive in 0 interpreters
+ # * module def may or may not be loaded already
+ # * module def not in _PyRuntime.imports.extensions
+ # * mod init func has not run yet (since reset, at least)
+ # * m_copy not set (hasn't been loaded yet or already cleared)
+ # * module's global state has not been initialized yet
+ # (or already cleared)
+
+ main_loaded = self.load(self.NAME)
+ _testsinglephase = main_loaded.module
+ # Attrs set after loading are not in m_copy.
+ _testsinglephase.spam = 'spam, spam, spam, spam, eggs, and spam'
+
+ self.check_common(main_loaded)
+ self.check_fresh(main_loaded)
+
+ interpid1 = self.add_subinterpreter()
+ interpid2 = self.add_subinterpreter()
+
+ # At this point:
+ # * alive in 1 interpreter (main)
+ # * module def in _PyRuntime.imports.extensions
+ # * mod init func ran for the first time (since reset, at least)
+ # * m_copy was copied from the main interpreter (was NULL)
+ # * module's global state was initialized
+
+ # Use an interpreter that gets destroyed right away.
+ loaded = self.import_in_subinterp()
+ self.check_common(loaded)
+ self.check_copied(loaded, main_loaded)
+
+ # At this point:
+ # * alive in 1 interpreter (main)
+ # * module def still in _PyRuntime.imports.extensions
+ # * mod init func ran again
+ # * m_copy is NULL (claered when the interpreter was destroyed)
+ # (was from main interpreter)
+ # * module's global state was updated, not reset
+
+ # Use a subinterpreter that sticks around.
+ loaded = self.import_in_subinterp(interpid1)
+ self.check_common(loaded)
+ self.check_copied(loaded, main_loaded)
+
+ # At this point:
+ # * alive in 2 interpreters (main, interp1)
+ # * module def still in _PyRuntime.imports.extensions
+ # * mod init func ran again
+ # * m_copy was copied from interp1
+ # * module's global state was updated, not reset
+
+ # Use a subinterpreter while the previous one is still alive.
+ loaded = self.import_in_subinterp(interpid2)
+ self.check_common(loaded)
+ self.check_copied(loaded, main_loaded)
+
+ # At this point:
+ # * alive in 3 interpreters (main, interp1, interp2)
+ # * module def still in _PyRuntime.imports.extensions
+ # * mod init func ran again
+ # * m_copy was copied from interp2 (was from interp1)
+ # * module's global state was updated, not reset
+
+ @requires_subinterpreters
+ def test_basic_multiple_interpreters_deleted_no_reset(self):
+ # without resetting; already loaded in a deleted interpreter
+
+ # At this point:
+ # * alive in 0 interpreters
+ # * module def may or may not be loaded already
+ # * module def not in _PyRuntime.imports.extensions
+ # * mod init func has not run yet (since reset, at least)
+ # * m_copy not set (hasn't been loaded yet or already cleared)
+ # * module's global state has not been initialized yet
+ # (or already cleared)
+
+ interpid1 = self.add_subinterpreter()
+ interpid2 = self.add_subinterpreter()
+
+ # First, load in the main interpreter but then completely clear it.
+ loaded_main = self.load(self.NAME)
+ loaded_main.module._clear_globals()
+ _testinternalcapi.clear_extension(self.NAME, self.FILE)
+
+ # At this point:
+ # * alive in 0 interpreters
+ # * module def loaded already
+ # * module def was in _PyRuntime.imports.extensions, but cleared
+ # * mod init func ran for the first time (since reset, at least)
+ # * m_copy was set, but cleared (was NULL)
+ # * module's global state was initialized but cleared
+
+ # Start with an interpreter that gets destroyed right away.
+ base = self.import_in_subinterp(postscript='''
+ # Attrs set after loading are not in m_copy.
+ mod.spam = 'spam, spam, mash, spam, eggs, and spam'
+ ''')
+ self.check_common(base)
+ self.check_fresh(base)
+
+ # At this point:
+ # * alive in 0 interpreters
+ # * module def in _PyRuntime.imports.extensions
+ # * mod init func ran again
+ # * m_copy is NULL (claered when the interpreter was destroyed)
+ # * module's global state was initialized, not reset
+
+ # Use a subinterpreter that sticks around.
+ loaded_interp1 = self.import_in_subinterp(interpid1)
+ self.check_common(loaded_interp1)
+ self.check_semi_fresh(loaded_interp1, loaded_main, base)
+
+ # At this point:
+ # * alive in 1 interpreter (interp1)
+ # * module def still in _PyRuntime.imports.extensions
+ # * mod init func ran again
+ # * m_copy was copied from interp1 (was NULL)
+ # * module's global state was updated, not reset
+
+ # Use a subinterpreter while the previous one is still alive.
+ loaded_interp2 = self.import_in_subinterp(interpid2)
+ self.check_common(loaded_interp2)
+ self.check_copied(loaded_interp2, loaded_interp1)
+
+ # At this point:
+ # * alive in 2 interpreters (interp1, interp2)
+ # * module def still in _PyRuntime.imports.extensions
+ # * mod init func ran again
+ # * m_copy was copied from interp2 (was from interp1)
+ # * module's global state was updated, not reset
+
+ @requires_subinterpreters
+ def test_basic_multiple_interpreters_reset_each(self):
+ # resetting between each interpreter
+
+ # At this point:
+ # * alive in 0 interpreters
+ # * module def may or may not be loaded already
+ # * module def not in _PyRuntime.imports.extensions
+ # * mod init func has not run yet (since reset, at least)
+ # * m_copy not set (hasn't been loaded yet or already cleared)
+ # * module's global state has not been initialized yet
+ # (or already cleared)
+
+ interpid1 = self.add_subinterpreter()
+ interpid2 = self.add_subinterpreter()
+
+ # Use an interpreter that gets destroyed right away.
+ loaded = self.import_in_subinterp(
+ postscript='''
+ # Attrs set after loading are not in m_copy.
+ mod.spam = 'spam, spam, mash, spam, eggs, and spam'
+ ''',
+ postcleanup=True,
+ )
+ self.check_common(loaded)
+ self.check_fresh(loaded)
+
+ # At this point:
+ # * alive in 0 interpreters
+ # * module def in _PyRuntime.imports.extensions
+ # * mod init func ran for the first time (since reset, at least)
+ # * m_copy is NULL (claered when the interpreter was destroyed)
+ # * module's global state was initialized, not reset
+
+ # Use a subinterpreter that sticks around.
+ loaded = self.import_in_subinterp(interpid1, postcleanup=True)
+ self.check_common(loaded)
+ self.check_fresh(loaded)
+
+ # At this point:
+ # * alive in 1 interpreter (interp1)
+ # * module def still in _PyRuntime.imports.extensions
+ # * mod init func ran again
+ # * m_copy was copied from interp1 (was NULL)
+ # * module's global state was initialized, not reset
+
+ # Use a subinterpreter while the previous one is still alive.
+ loaded = self.import_in_subinterp(interpid2, postcleanup=True)
+ self.check_common(loaded)
+ self.check_fresh(loaded)
+
+ # At this point:
+ # * alive in 2 interpreters (interp2, interp2)
+ # * module def still in _PyRuntime.imports.extensions
+ # * mod init func ran again
+ # * m_copy was copied from interp2 (was from interp1)
+ # * module's global state was initialized, not reset
+
+
if __name__ == '__main__':
# Test needs to be a package, so we can do relative imports.
unittest.main()
More information about the Python-checkins
mailing list