[Python-checkins] r57533 - sandbox/trunk/import_in_py/zipimport_/tests.py sandbox/trunk/import_in_py/zipimport_/zipimport.py
brett.cannon
python-checkins at python.org
Mon Aug 27 01:46:44 CEST 2007
Author: brett.cannon
Date: Mon Aug 27 01:46:43 2007
New Revision: 57533
Modified:
sandbox/trunk/import_in_py/zipimport_/tests.py
sandbox/trunk/import_in_py/zipimport_/zipimport.py
Log:
Implement zipimport.zipimporter.__init__.
Modified: sandbox/trunk/import_in_py/zipimport_/tests.py
==============================================================================
--- sandbox/trunk/import_in_py/zipimport_/tests.py (original)
+++ sandbox/trunk/import_in_py/zipimport_/tests.py Mon Aug 27 01:46:43 2007
@@ -4,43 +4,58 @@
import contextlib
import os
import py_compile
+import shutil
from test import test_support
import unittest
import zipfile
example_code = 'attr = None'
+created_paths = set(['_top_level',
+ os.path.join('_pkg', '__init__'),
+ os.path.join('_pkg', 'submodule'),
+ os.path.join('_pkg', '_subpkg', '__init__'),
+ os.path.join('_pkg', '_subpkg', 'submodule')
+ ])
+
@contextlib.contextmanager
def temp_zipfile(source=True, bytecode=True):
- """Create a temporary zip file for testing."""
+ """Create a temporary zip file for testing.
+
+ Clears zipimport._zip_directory_cache.
+
+ """
+ zipimport._zip_directory_cache = {}
zip_path = test_support.TESTFN + '.zip'
+ bytecode_suffix = 'c' if __debug__ else 'o'
zip_file = zipfile.ZipFile(zip_path, 'w')
try:
- os.mkdir('_pkg')
- os.mkdir(os.path.join('_pkg', '_subpkg'))
- for module_path in ['_top_level.py', os.path.join('_pkg', '__init__.py'),
- os.path.join('_pkg', 'submodule.py'),
- os.path.join('_pkg', '_subpkg', '__init__.py'),
- os.path.join('_pkg', '_subpkg', 'submodule.py')]:
- with open(module_path, 'w') as temp_file:
+ for path in created_paths:
+ if os.sep in path:
+ directory = os.path.split(path)[0]
+ if not os.path.exists(directory):
+ os.makedirs(directory)
+ code_path = path + '.py'
+ with open(code_path, 'w') as temp_file:
temp_file.write(example_code)
- try:
- if source:
- zip_file.write(module_path)
- if bytecode:
- py_compile.compile(module_path, doraise=True)
- zip_file.write(module_path + 'c' if __debug__ else 'o')
- finally:
- test_support.unlink(module_path)
- test_support.unlink(module_path + 'c' if __debug__ else 'o')
+ if source:
+ zip_file.write(code_path)
+ if bytecode:
+ py_compile.compile(code_path, doraise=True)
+ zip_file.write(code_path + bytecode_suffix)
zip_file.close()
yield zip_path
finally:
- if os.path.exists(os.path.join('_pkg', '_subpkg')):
- os.rmdir(os.path.join('_pkg', '_subpkg'))
- if os.path.exists('_pkg'):
- os.rmdir('_pkg')
- os.unlink(zip_path)
+ zip_file.close()
+ for path in created_paths:
+ if os.sep in path:
+ directory = os.path.split(path)[0]
+ if os.path.exists(directory):
+ shutil.rmtree(directory)
+ else:
+ for suffix in ('.py', '.py' + bytecode_suffix):
+ test_support.unlink(path + suffix)
+ test_support.unlink(zip_path)
class ZipImportErrorTests(unittest.TestCase):
@@ -71,14 +86,31 @@
with temp_zipfile() as zip_path:
zip_importer = zipimport.zipimporter(zip_path)
self.assert_(isinstance(zip_importer, zipimport.zipimporter))
+ self.assertEqual(zip_importer.archive, zip_path)
+ self.assertEqual(zip_importer.prefix, '')
+ self.assert_(zip_path in zipimport._zip_directory_cache)
def test_pkg_path(self):
# Thanks to __path__, need to be able to work off of a path with a zip
# file at the front and a path for the rest.
with temp_zipfile() as zip_path:
- path = os.path.join(zip_path, '_pkg')
+ prefix = os.path.join('_pkg', '')
+ path = os.path.join(zip_path, prefix)
zip_importer = zipimport.zipimporter(path)
self.assert_(isinstance(zip_importer, zipimport.zipimporter))
+ self.assertEqual(zip_importer.archive, zip_path)
+ self.assertEqual(zip_importer.prefix, prefix)
+ self.assert_(zip_path in zipimport._zip_directory_cache)
+
+ def test_zip_directory_cache(self):
+ # Test that _zip_directory_cache is set properly.
+ # Using a package entry to test using a hard example.
+ with temp_zipfile(bytecode=False) as zip_path:
+ importer = zipimport.zipimporter(os.path.join(zip_path, '_pkg'))
+ self.assert_(zip_path in zipimport._zip_directory_cache)
+ file_set = set(zipimport._zip_directory_cache[zip_path].iterkeys())
+ compare_set = set(path + '.py' for path in created_paths)
+ self.assertEqual(file_set, compare_set)
class FindModule(unittest.TestCase):
Modified: sandbox/trunk/import_in_py/zipimport_/zipimport.py
==============================================================================
--- sandbox/trunk/import_in_py/zipimport_/zipimport.py (original)
+++ sandbox/trunk/import_in_py/zipimport_/zipimport.py Mon Aug 27 01:46:43 2007
@@ -1,7 +1,7 @@
"""A re-implementation of zipimport to use importlib."""
import importlib
-#import contextlib
+import contextlib
#import datetime
#import imp
import os
@@ -11,6 +11,9 @@
# XXX Import lock prevents concurrency issues during importation use, but does
# not make any guarantees for non-import uses.
+# XXX Keyed on archive path with a value of a dict keyed on internal paths to
+# files within the archive and values of zipfile.ZipInfo (different than C
+# version which uses tuples instead).
_zip_directory_cache = {}
@@ -31,29 +34,25 @@
than a zip file is passed in then ZipImportError is raised.
"""
- while archivepath:
- if zipfile.is_zipfile(archivepath):
- break
- archivepath = os.path.split(archivepath)[0]
- else:
- raise ZipImportError("not a zip file")
-
-
-
- # XXX need to have archive, prefix, _files attributes.
- raise NotImplementedError
- path = os.path.abspath(archivepath)
- # XXX Need to tweak to handle zip archive package info like
- # _pkg.zip/pkg
+ path = archivepath
while path:
if zipfile.is_zipfile(path):
break
- path = path.rsplit(os.sep, 1)[0]
+ path = os.path.split(path)[0]
else:
- raise ZipImportError("the specified path must be a zip file")
- self._zip_path = path
- self._path_entry = archivepath
- self._path_cache = {}
+ raise ZipImportError("%s is not a zip file" % archivepath)
+ self.archive = path # Path to zip file.
+ self.prefix = archivepath[len(path)+1:] # Package directory.
+ if self.prefix and not self.prefix.endswith(os.sep):
+ self.prefix += os.sep
+ if not path in _zip_directory_cache:
+ with contextlib.closing(zipfile.ZipFile(path)) as zip_file:
+ zip_info_list = zip_file.infolist()
+ # XXX Need to duplicate tuple from original zipimport?
+ zip_info_dict = dict((info.filename, info)
+ for info in zip_info_list)
+ _zip_directory_cache[path] = zip_info_dict
+ self._files = _zip_directory_cache[path]
def __repr__(self):
raise NotImplementedError
More information about the Python-checkins
mailing list