Many functions related to compiling or parsing Python code, such as compile(), ast.parse(), symtable.symtable(), and importlib.abc.InspectLoader.source_to_code() now allow to pass the module name used when filtering syntax warnings.
831 lines
33 KiB
Python
831 lines
33 KiB
Python
"""zipimport provides support for importing Python modules from Zip archives.
|
|
|
|
This module exports two objects:
|
|
- zipimporter: a class; its constructor takes a path to a Zip archive.
|
|
- ZipImportError: exception raised by zipimporter objects. It's a
|
|
subclass of ImportError, so it can be caught as ImportError, too.
|
|
|
|
It is usually not needed to use the zipimport module explicitly; it is
|
|
used by the builtin import mechanism for sys.path items that are paths
|
|
to Zip archives.
|
|
"""
|
|
|
|
import _frozen_importlib_external as _bootstrap_external
|
|
from _frozen_importlib_external import _unpack_uint16, _unpack_uint32, _unpack_uint64
|
|
import _frozen_importlib as _bootstrap # for _verbose_message
|
|
import _imp # for check_hash_based_pycs
|
|
import _io # for open
|
|
import marshal # for loads
|
|
import time # for mktime
|
|
|
|
__all__ = ['ZipImportError', 'zipimporter']
|
|
|
|
|
|
path_sep = _bootstrap_external.path_sep
|
|
alt_path_sep = _bootstrap_external.path_separators[1:]
|
|
|
|
|
|
class ZipImportError(ImportError):
|
|
pass
|
|
|
|
# _read_directory() cache
|
|
_zip_directory_cache = {}
|
|
|
|
END_CENTRAL_DIR_SIZE = 22
|
|
END_CENTRAL_DIR_SIZE_64 = 56
|
|
END_CENTRAL_DIR_LOCATOR_SIZE_64 = 20
|
|
STRING_END_ARCHIVE = b'PK\x05\x06' # standard EOCD signature
|
|
STRING_END_LOCATOR_64 = b'PK\x06\x07' # Zip64 EOCD Locator signature
|
|
STRING_END_ZIP_64 = b'PK\x06\x06' # Zip64 EOCD signature
|
|
MAX_COMMENT_LEN = (1 << 16) - 1
|
|
MAX_UINT32 = 0xffffffff
|
|
ZIP64_EXTRA_TAG = 0x1
|
|
|
|
class zipimporter(_bootstrap_external._LoaderBasics):
|
|
"""zipimporter(archivepath) -> zipimporter object
|
|
|
|
Create a new zipimporter instance. 'archivepath' must be a path to
|
|
a zipfile, or to a specific path inside a zipfile. For example, it can be
|
|
'/tmp/myimport.zip', or '/tmp/myimport.zip/mydirectory', if mydirectory is a
|
|
valid directory inside the archive.
|
|
|
|
'ZipImportError is raised if 'archivepath' doesn't point to a valid Zip
|
|
archive.
|
|
|
|
The 'archive' attribute of zipimporter objects contains the name of the
|
|
zipfile targeted.
|
|
"""
|
|
|
|
# Split the "subdirectory" from the Zip archive path, lookup a matching
|
|
# entry in sys.path_importer_cache, fetch the file directory from there
|
|
# if found, or else read it from the archive.
|
|
def __init__(self, path):
|
|
if not isinstance(path, str):
|
|
raise TypeError(f"expected str, not {type(path)!r}")
|
|
if not path:
|
|
raise ZipImportError('archive path is empty', path=path)
|
|
if alt_path_sep:
|
|
path = path.replace(alt_path_sep, path_sep)
|
|
|
|
prefix = []
|
|
while True:
|
|
try:
|
|
st = _bootstrap_external._path_stat(path)
|
|
except (OSError, ValueError):
|
|
# On Windows a ValueError is raised for too long paths.
|
|
# Back up one path element.
|
|
dirname, basename = _bootstrap_external._path_split(path)
|
|
if dirname == path:
|
|
raise ZipImportError('not a Zip file', path=path)
|
|
path = dirname
|
|
prefix.append(basename)
|
|
else:
|
|
# it exists
|
|
if (st.st_mode & 0o170000) != 0o100000: # stat.S_ISREG
|
|
# it's a not file
|
|
raise ZipImportError('not a Zip file', path=path)
|
|
break
|
|
|
|
if path not in _zip_directory_cache:
|
|
_zip_directory_cache[path] = _read_directory(path)
|
|
self.archive = path
|
|
# a prefix directory following the ZIP file path.
|
|
self.prefix = _bootstrap_external._path_join(*prefix[::-1])
|
|
if self.prefix:
|
|
self.prefix += path_sep
|
|
|
|
|
|
def find_spec(self, fullname, target=None):
|
|
"""Create a ModuleSpec for the specified module.
|
|
|
|
Returns None if the module cannot be found.
|
|
"""
|
|
module_info = _get_module_info(self, fullname)
|
|
if module_info is not None:
|
|
return _bootstrap.spec_from_loader(fullname, self, is_package=module_info)
|
|
else:
|
|
# Not a module or regular package. See if this is a directory, and
|
|
# therefore possibly a portion of a namespace package.
|
|
|
|
# We're only interested in the last path component of fullname
|
|
# earlier components are recorded in self.prefix.
|
|
modpath = _get_module_path(self, fullname)
|
|
if _is_dir(self, modpath):
|
|
# This is possibly a portion of a namespace
|
|
# package. Return the string representing its path,
|
|
# without a trailing separator.
|
|
path = f'{self.archive}{path_sep}{modpath}'
|
|
spec = _bootstrap.ModuleSpec(name=fullname, loader=None,
|
|
is_package=True)
|
|
spec.submodule_search_locations.append(path)
|
|
return spec
|
|
else:
|
|
return None
|
|
|
|
def get_code(self, fullname):
|
|
"""get_code(fullname) -> code object.
|
|
|
|
Return the code object for the specified module. Raise ZipImportError
|
|
if the module couldn't be imported.
|
|
"""
|
|
code, ispackage, modpath = _get_module_code(self, fullname)
|
|
return code
|
|
|
|
|
|
def get_data(self, pathname):
|
|
"""get_data(pathname) -> string with file data.
|
|
|
|
Return the data associated with 'pathname'. Raise OSError if
|
|
the file wasn't found.
|
|
"""
|
|
if alt_path_sep:
|
|
pathname = pathname.replace(alt_path_sep, path_sep)
|
|
|
|
key = pathname
|
|
if pathname.startswith(self.archive + path_sep):
|
|
key = pathname[len(self.archive + path_sep):]
|
|
|
|
try:
|
|
toc_entry = self._get_files()[key]
|
|
except KeyError:
|
|
raise OSError(0, '', key)
|
|
if toc_entry is None:
|
|
return b''
|
|
return _get_data(self.archive, toc_entry)
|
|
|
|
|
|
# Return a string matching __file__ for the named module
|
|
def get_filename(self, fullname):
|
|
"""get_filename(fullname) -> filename string.
|
|
|
|
Return the filename for the specified module or raise ZipImportError
|
|
if it couldn't be imported.
|
|
"""
|
|
# Deciding the filename requires working out where the code
|
|
# would come from if the module was actually loaded
|
|
code, ispackage, modpath = _get_module_code(self, fullname)
|
|
return modpath
|
|
|
|
|
|
def get_source(self, fullname):
|
|
"""get_source(fullname) -> source string.
|
|
|
|
Return the source code for the specified module. Raise ZipImportError
|
|
if the module couldn't be found, return None if the archive does
|
|
contain the module, but has no source for it.
|
|
"""
|
|
mi = _get_module_info(self, fullname)
|
|
if mi is None:
|
|
raise ZipImportError(f"can't find module {fullname!r}", name=fullname)
|
|
|
|
path = _get_module_path(self, fullname)
|
|
if mi:
|
|
fullpath = _bootstrap_external._path_join(path, '__init__.py')
|
|
else:
|
|
fullpath = f'{path}.py'
|
|
|
|
try:
|
|
toc_entry = self._get_files()[fullpath]
|
|
except KeyError:
|
|
# we have the module, but no source
|
|
return None
|
|
return _get_data(self.archive, toc_entry).decode()
|
|
|
|
|
|
# Return a bool signifying whether the module is a package or not.
|
|
def is_package(self, fullname):
|
|
"""is_package(fullname) -> bool.
|
|
|
|
Return True if the module specified by fullname is a package.
|
|
Raise ZipImportError if the module couldn't be found.
|
|
"""
|
|
mi = _get_module_info(self, fullname)
|
|
if mi is None:
|
|
raise ZipImportError(f"can't find module {fullname!r}", name=fullname)
|
|
return mi
|
|
|
|
|
|
def get_resource_reader(self, fullname):
|
|
"""Return the ResourceReader for a module in a zip file."""
|
|
from importlib.readers import ZipReader
|
|
|
|
return ZipReader(self, fullname)
|
|
|
|
|
|
def _get_files(self):
|
|
"""Return the files within the archive path."""
|
|
try:
|
|
files = _zip_directory_cache[self.archive]
|
|
except KeyError:
|
|
try:
|
|
files = _zip_directory_cache[self.archive] = _read_directory(self.archive)
|
|
except ZipImportError:
|
|
files = {}
|
|
|
|
return files
|
|
|
|
|
|
def invalidate_caches(self):
|
|
"""Invalidates the cache of file data of the archive path."""
|
|
_zip_directory_cache.pop(self.archive, None)
|
|
|
|
|
|
def __repr__(self):
|
|
return f'<zipimporter object "{self.archive}{path_sep}{self.prefix}">'
|
|
|
|
|
|
# _zip_searchorder defines how we search for a module in the Zip
|
|
# archive: we first search for a package __init__, then for
|
|
# non-package .pyc, and .py entries. The .pyc entries
|
|
# are swapped by initzipimport() if we run in optimized mode. Also,
|
|
# '/' is replaced by path_sep there.
|
|
_zip_searchorder = (
|
|
(path_sep + '__init__.pyc', True, True),
|
|
(path_sep + '__init__.py', False, True),
|
|
('.pyc', True, False),
|
|
('.py', False, False),
|
|
)
|
|
|
|
# Given a module name, return the potential file path in the
|
|
# archive (without extension).
|
|
def _get_module_path(self, fullname):
|
|
return self.prefix + fullname.rpartition('.')[2]
|
|
|
|
# Does this path represent a directory?
|
|
def _is_dir(self, path):
|
|
# See if this is a "directory". If so, it's eligible to be part
|
|
# of a namespace package. We test by seeing if the name, with an
|
|
# appended path separator, exists.
|
|
dirpath = path + path_sep
|
|
# If dirpath is present in self._get_files(), we have a directory.
|
|
return dirpath in self._get_files()
|
|
|
|
# Return some information about a module.
|
|
def _get_module_info(self, fullname):
|
|
path = _get_module_path(self, fullname)
|
|
for suffix, isbytecode, ispackage in _zip_searchorder:
|
|
fullpath = path + suffix
|
|
if fullpath in self._get_files():
|
|
return ispackage
|
|
return None
|
|
|
|
|
|
# implementation
|
|
|
|
# _read_directory(archive) -> files dict (new reference)
|
|
#
|
|
# Given a path to a Zip archive, build a dict, mapping file names
|
|
# (local to the archive, using SEP as a separator) to toc entries.
|
|
#
|
|
# A toc_entry is a tuple:
|
|
#
|
|
# (__file__, # value to use for __file__, available for all files,
|
|
# # encoded to the filesystem encoding
|
|
# compress, # compression kind; 0 for uncompressed
|
|
# data_size, # size of compressed data on disk
|
|
# file_size, # size of decompressed data
|
|
# file_offset, # offset of file header from start of archive
|
|
# time, # mod time of file (in dos format)
|
|
# date, # mod data of file (in dos format)
|
|
# crc, # crc checksum of the data
|
|
# )
|
|
#
|
|
# Directories can be recognized by the trailing path_sep in the name,
|
|
# data_size and file_offset are 0.
|
|
def _read_directory(archive):
|
|
try:
|
|
fp = _io.open_code(archive)
|
|
except OSError:
|
|
raise ZipImportError(f"can't open Zip file: {archive!r}", path=archive)
|
|
|
|
with fp:
|
|
# GH-87235: On macOS all file descriptors for /dev/fd/N share the same
|
|
# file offset, reset the file offset after scanning the zipfile directory
|
|
# to not cause problems when some runs 'python3 /dev/fd/9 9<some_script'
|
|
start_offset = fp.tell()
|
|
try:
|
|
# Check if there's a comment.
|
|
try:
|
|
fp.seek(0, 2)
|
|
file_size = fp.tell()
|
|
except OSError:
|
|
raise ZipImportError(f"can't read Zip file: {archive!r}",
|
|
path=archive)
|
|
max_comment_plus_dirs_size = (
|
|
MAX_COMMENT_LEN + END_CENTRAL_DIR_SIZE +
|
|
END_CENTRAL_DIR_SIZE_64 + END_CENTRAL_DIR_LOCATOR_SIZE_64)
|
|
max_comment_start = max(file_size - max_comment_plus_dirs_size, 0)
|
|
try:
|
|
fp.seek(max_comment_start)
|
|
data = fp.read(max_comment_plus_dirs_size)
|
|
except OSError:
|
|
raise ZipImportError(f"can't read Zip file: {archive!r}",
|
|
path=archive)
|
|
pos = data.rfind(STRING_END_ARCHIVE)
|
|
pos64 = data.rfind(STRING_END_ZIP_64)
|
|
|
|
if (pos64 >= 0 and pos64+END_CENTRAL_DIR_SIZE_64+END_CENTRAL_DIR_LOCATOR_SIZE_64==pos):
|
|
# Zip64 at "correct" offset from standard EOCD
|
|
buffer = data[pos64:pos64 + END_CENTRAL_DIR_SIZE_64]
|
|
if len(buffer) != END_CENTRAL_DIR_SIZE_64:
|
|
raise ZipImportError(
|
|
f"corrupt Zip64 file: Expected {END_CENTRAL_DIR_SIZE_64} byte "
|
|
f"zip64 central directory, but read {len(buffer)} bytes.",
|
|
path=archive)
|
|
header_position = file_size - len(data) + pos64
|
|
|
|
central_directory_size = _unpack_uint64(buffer[40:48])
|
|
central_directory_position = _unpack_uint64(buffer[48:56])
|
|
num_entries = _unpack_uint64(buffer[24:32])
|
|
elif pos >= 0:
|
|
buffer = data[pos:pos+END_CENTRAL_DIR_SIZE]
|
|
if len(buffer) != END_CENTRAL_DIR_SIZE:
|
|
raise ZipImportError(f"corrupt Zip file: {archive!r}",
|
|
path=archive)
|
|
|
|
header_position = file_size - len(data) + pos
|
|
|
|
# Buffer now contains a valid EOCD, and header_position gives the
|
|
# starting position of it.
|
|
central_directory_size = _unpack_uint32(buffer[12:16])
|
|
central_directory_position = _unpack_uint32(buffer[16:20])
|
|
num_entries = _unpack_uint16(buffer[8:10])
|
|
|
|
# N.b. if someday you want to prefer the standard (non-zip64) EOCD,
|
|
# you need to adjust position by 76 for arc to be 0.
|
|
else:
|
|
raise ZipImportError(f'not a Zip file: {archive!r}',
|
|
path=archive)
|
|
|
|
# Buffer now contains a valid EOCD, and header_position gives the
|
|
# starting position of it.
|
|
# XXX: These are cursory checks but are not as exact or strict as they
|
|
# could be. Checking the arc-adjusted value is probably good too.
|
|
if header_position < central_directory_size:
|
|
raise ZipImportError(f'bad central directory size: {archive!r}', path=archive)
|
|
if header_position < central_directory_position:
|
|
raise ZipImportError(f'bad central directory offset: {archive!r}', path=archive)
|
|
header_position -= central_directory_size
|
|
# On just-a-zipfile these values are the same and arc_offset is zero; if
|
|
# the file has some bytes prepended, `arc_offset` is the number of such
|
|
# bytes. This is used for pex as well as self-extracting .exe.
|
|
arc_offset = header_position - central_directory_position
|
|
if arc_offset < 0:
|
|
raise ZipImportError(f'bad central directory size or offset: {archive!r}', path=archive)
|
|
|
|
files = {}
|
|
# Start of Central Directory
|
|
count = 0
|
|
try:
|
|
fp.seek(header_position)
|
|
except OSError:
|
|
raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
|
|
while True:
|
|
buffer = fp.read(46)
|
|
if len(buffer) < 4:
|
|
raise EOFError('EOF read where not expected')
|
|
# Start of file header
|
|
if buffer[:4] != b'PK\x01\x02':
|
|
if count != num_entries:
|
|
raise ZipImportError(
|
|
f"mismatched num_entries: {count} should be {num_entries} in {archive!r}",
|
|
path=archive,
|
|
)
|
|
break # Bad: Central Dir File Header
|
|
if len(buffer) != 46:
|
|
raise EOFError('EOF read where not expected')
|
|
flags = _unpack_uint16(buffer[8:10])
|
|
compress = _unpack_uint16(buffer[10:12])
|
|
time = _unpack_uint16(buffer[12:14])
|
|
date = _unpack_uint16(buffer[14:16])
|
|
crc = _unpack_uint32(buffer[16:20])
|
|
data_size = _unpack_uint32(buffer[20:24])
|
|
file_size = _unpack_uint32(buffer[24:28])
|
|
name_size = _unpack_uint16(buffer[28:30])
|
|
extra_size = _unpack_uint16(buffer[30:32])
|
|
comment_size = _unpack_uint16(buffer[32:34])
|
|
file_offset = _unpack_uint32(buffer[42:46])
|
|
header_size = name_size + extra_size + comment_size
|
|
|
|
try:
|
|
name = fp.read(name_size)
|
|
except OSError:
|
|
raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
|
|
if len(name) != name_size:
|
|
raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
|
|
# On Windows, calling fseek to skip over the fields we don't use is
|
|
# slower than reading the data because fseek flushes stdio's
|
|
# internal buffers. See issue #8745.
|
|
try:
|
|
extra_data_len = header_size - name_size
|
|
extra_data = memoryview(fp.read(extra_data_len))
|
|
|
|
if len(extra_data) != extra_data_len:
|
|
raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
|
|
except OSError:
|
|
raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
|
|
|
|
if flags & 0x800:
|
|
# UTF-8 file names extension
|
|
name = name.decode()
|
|
else:
|
|
# Historical ZIP filename encoding
|
|
try:
|
|
name = name.decode('ascii')
|
|
except UnicodeDecodeError:
|
|
name = name.decode('latin1').translate(cp437_table)
|
|
|
|
name = name.replace('/', path_sep)
|
|
path = _bootstrap_external._path_join(archive, name)
|
|
|
|
# Ordering matches unpacking below.
|
|
if (
|
|
file_size == MAX_UINT32 or
|
|
data_size == MAX_UINT32 or
|
|
file_offset == MAX_UINT32
|
|
):
|
|
# need to decode extra_data looking for a zip64 extra (which might not
|
|
# be present)
|
|
while extra_data:
|
|
if len(extra_data) < 4:
|
|
raise ZipImportError(f"can't read header extra: {archive!r}", path=archive)
|
|
tag = _unpack_uint16(extra_data[:2])
|
|
size = _unpack_uint16(extra_data[2:4])
|
|
if len(extra_data) < 4 + size:
|
|
raise ZipImportError(f"can't read header extra: {archive!r}", path=archive)
|
|
if tag == ZIP64_EXTRA_TAG:
|
|
if (len(extra_data) - 4) % 8 != 0:
|
|
raise ZipImportError(f"can't read header extra: {archive!r}", path=archive)
|
|
num_extra_values = (len(extra_data) - 4) // 8
|
|
if num_extra_values > 3:
|
|
raise ZipImportError(f"can't read header extra: {archive!r}", path=archive)
|
|
import struct
|
|
values = list(struct.unpack_from(f"<{min(num_extra_values, 3)}Q",
|
|
extra_data, offset=4))
|
|
|
|
# N.b. Here be dragons: the ordering of these is different than
|
|
# the header fields, and it's really easy to get it wrong since
|
|
# naturally-occurring zips that use all 3 are >4GB
|
|
if file_size == MAX_UINT32:
|
|
file_size = values.pop(0)
|
|
if data_size == MAX_UINT32:
|
|
data_size = values.pop(0)
|
|
if file_offset == MAX_UINT32:
|
|
file_offset = values.pop(0)
|
|
|
|
break
|
|
|
|
# For a typical zip, this bytes-slicing only happens 2-3 times, on
|
|
# small data like timestamps and filesizes.
|
|
extra_data = extra_data[4+size:]
|
|
else:
|
|
_bootstrap._verbose_message(
|
|
"zipimport: suspected zip64 but no zip64 extra for {!r}",
|
|
path,
|
|
)
|
|
# XXX These two statements seem swapped because `central_directory_position`
|
|
# is a position within the actual file, but `file_offset` (when compared) is
|
|
# as encoded in the entry, not adjusted for this file.
|
|
# N.b. this must be after we've potentially read the zip64 extra which can
|
|
# change `file_offset`.
|
|
if file_offset > central_directory_position:
|
|
raise ZipImportError(f'bad local header offset: {archive!r}', path=archive)
|
|
file_offset += arc_offset
|
|
|
|
t = (path, compress, data_size, file_size, file_offset, time, date, crc)
|
|
files[name] = t
|
|
count += 1
|
|
finally:
|
|
fp.seek(start_offset)
|
|
_bootstrap._verbose_message('zipimport: found {} names in {!r}', count, archive)
|
|
|
|
# Add implicit directories.
|
|
count = 0
|
|
for name in list(files):
|
|
while True:
|
|
i = name.rstrip(path_sep).rfind(path_sep)
|
|
if i < 0:
|
|
break
|
|
name = name[:i + 1]
|
|
if name in files:
|
|
break
|
|
files[name] = None
|
|
count += 1
|
|
if count:
|
|
_bootstrap._verbose_message('zipimport: added {} implicit directories in {!r}',
|
|
count, archive)
|
|
return files
|
|
|
|
# During bootstrap, we may need to load the encodings
|
|
# package from a ZIP file. But the cp437 encoding is implemented
|
|
# in Python in the encodings package.
|
|
#
|
|
# Break out of this dependency by using the translation table for
|
|
# the cp437 encoding.
|
|
cp437_table = (
|
|
# ASCII part, 8 rows x 16 chars
|
|
'\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f'
|
|
'\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f'
|
|
' !"#$%&\'()*+,-./'
|
|
'0123456789:;<=>?'
|
|
'@ABCDEFGHIJKLMNO'
|
|
'PQRSTUVWXYZ[\\]^_'
|
|
'`abcdefghijklmno'
|
|
'pqrstuvwxyz{|}~\x7f'
|
|
# non-ASCII part, 16 rows x 8 chars
|
|
'\xc7\xfc\xe9\xe2\xe4\xe0\xe5\xe7'
|
|
'\xea\xeb\xe8\xef\xee\xec\xc4\xc5'
|
|
'\xc9\xe6\xc6\xf4\xf6\xf2\xfb\xf9'
|
|
'\xff\xd6\xdc\xa2\xa3\xa5\u20a7\u0192'
|
|
'\xe1\xed\xf3\xfa\xf1\xd1\xaa\xba'
|
|
'\xbf\u2310\xac\xbd\xbc\xa1\xab\xbb'
|
|
'\u2591\u2592\u2593\u2502\u2524\u2561\u2562\u2556'
|
|
'\u2555\u2563\u2551\u2557\u255d\u255c\u255b\u2510'
|
|
'\u2514\u2534\u252c\u251c\u2500\u253c\u255e\u255f'
|
|
'\u255a\u2554\u2569\u2566\u2560\u2550\u256c\u2567'
|
|
'\u2568\u2564\u2565\u2559\u2558\u2552\u2553\u256b'
|
|
'\u256a\u2518\u250c\u2588\u2584\u258c\u2590\u2580'
|
|
'\u03b1\xdf\u0393\u03c0\u03a3\u03c3\xb5\u03c4'
|
|
'\u03a6\u0398\u03a9\u03b4\u221e\u03c6\u03b5\u2229'
|
|
'\u2261\xb1\u2265\u2264\u2320\u2321\xf7\u2248'
|
|
'\xb0\u2219\xb7\u221a\u207f\xb2\u25a0\xa0'
|
|
)
|
|
|
|
_importing_zlib = False
|
|
_zlib_decompress = None
|
|
|
|
# Return the zlib.decompress function object, or NULL if zlib couldn't
|
|
# be imported. The function is cached when found, so subsequent calls
|
|
# don't import zlib again.
|
|
def _get_zlib_decompress_func():
|
|
global _zlib_decompress
|
|
if _zlib_decompress:
|
|
return _zlib_decompress
|
|
|
|
global _importing_zlib
|
|
if _importing_zlib:
|
|
# Someone has a zlib.py[co] in their Zip file
|
|
# let's avoid a stack overflow.
|
|
_bootstrap._verbose_message('zipimport: zlib UNAVAILABLE')
|
|
raise ZipImportError("can't decompress data; zlib not available")
|
|
|
|
_importing_zlib = True
|
|
try:
|
|
from zlib import decompress as _zlib_decompress
|
|
except Exception:
|
|
_bootstrap._verbose_message('zipimport: zlib UNAVAILABLE')
|
|
raise ZipImportError("can't decompress data; zlib not available")
|
|
finally:
|
|
_importing_zlib = False
|
|
|
|
_bootstrap._verbose_message('zipimport: zlib available')
|
|
return _zlib_decompress
|
|
|
|
|
|
_importing_zstd = False
|
|
_zstd_decompressor_class = None
|
|
|
|
# Return the _zstd.ZstdDecompressor function object, or NULL if _zstd couldn't
|
|
# be imported. The result is cached when found.
|
|
def _get_zstd_decompressor_class():
|
|
global _zstd_decompressor_class
|
|
if _zstd_decompressor_class:
|
|
return _zstd_decompressor_class
|
|
|
|
global _importing_zstd
|
|
if _importing_zstd:
|
|
# Someone has a _zstd.py[co] in their Zip file
|
|
# let's avoid a stack overflow.
|
|
_bootstrap._verbose_message("zipimport: zstd UNAVAILABLE")
|
|
raise ZipImportError("can't decompress data; zstd not available")
|
|
|
|
_importing_zstd = True
|
|
try:
|
|
from _zstd import ZstdDecompressor as _zstd_decompressor_class
|
|
except Exception:
|
|
_bootstrap._verbose_message("zipimport: zstd UNAVAILABLE")
|
|
raise ZipImportError("can't decompress data; zstd not available")
|
|
finally:
|
|
_importing_zstd = False
|
|
|
|
_bootstrap._verbose_message("zipimport: zstd available")
|
|
return _zstd_decompressor_class
|
|
|
|
|
|
def _zstd_decompress(data):
|
|
# A simple version of compression.zstd.decompress() as we cannot import
|
|
# that here as the stdlib itself could be being zipimported.
|
|
results = []
|
|
while True:
|
|
decomp = _get_zstd_decompressor_class()()
|
|
results.append(decomp.decompress(data))
|
|
if not decomp.eof:
|
|
raise ZipImportError("zipimport: zstd compressed data ended before "
|
|
"the end-of-stream marker")
|
|
data = decomp.unused_data
|
|
if not data:
|
|
break
|
|
return b"".join(results)
|
|
|
|
|
|
# Given a path to a Zip file and a toc_entry, return the (uncompressed) data.
|
|
def _get_data(archive, toc_entry):
|
|
datapath, compress, data_size, file_size, file_offset, time, date, crc = toc_entry
|
|
if data_size < 0:
|
|
raise ZipImportError('negative data size')
|
|
|
|
with _io.open_code(archive) as fp:
|
|
# Check to make sure the local file header is correct
|
|
try:
|
|
fp.seek(file_offset)
|
|
except OSError:
|
|
raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
|
|
buffer = fp.read(30)
|
|
if len(buffer) != 30:
|
|
raise EOFError('EOF read where not expected')
|
|
|
|
if buffer[:4] != b'PK\x03\x04':
|
|
# Bad: Local File Header
|
|
raise ZipImportError(f'bad local file header: {archive!r}', path=archive)
|
|
|
|
name_size = _unpack_uint16(buffer[26:28])
|
|
extra_size = _unpack_uint16(buffer[28:30])
|
|
header_size = 30 + name_size + extra_size
|
|
file_offset += header_size # Start of file data
|
|
try:
|
|
fp.seek(file_offset)
|
|
except OSError:
|
|
raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
|
|
raw_data = fp.read(data_size)
|
|
if len(raw_data) != data_size:
|
|
raise OSError("zipimport: can't read data")
|
|
|
|
match compress:
|
|
case 0: # stored
|
|
return raw_data
|
|
case 8: # deflate aka zlib
|
|
try:
|
|
decompress = _get_zlib_decompress_func()
|
|
except Exception:
|
|
raise ZipImportError("can't decompress data; zlib not available")
|
|
return decompress(raw_data, -15)
|
|
case 93: # zstd
|
|
try:
|
|
return _zstd_decompress(raw_data)
|
|
except Exception:
|
|
raise ZipImportError("could not decompress zstd data")
|
|
# bz2 and lzma could be added, but are largely obsolete.
|
|
case _:
|
|
raise ZipImportError(f"zipimport: unsupported compression {compress}")
|
|
|
|
|
|
# Lenient date/time comparison function. The precision of the mtime
|
|
# in the archive is lower than the mtime stored in a .pyc: we
|
|
# must allow a difference of at most one second.
|
|
def _eq_mtime(t1, t2):
|
|
# dostime only stores even seconds, so be lenient
|
|
return abs(t1 - t2) <= 1
|
|
|
|
|
|
# Given the contents of a .py[co] file, unmarshal the data
|
|
# and return the code object. Raises ImportError it the magic word doesn't
|
|
# match, or if the recorded .py[co] metadata does not match the source.
|
|
def _unmarshal_code(self, pathname, fullpath, fullname, data):
|
|
exc_details = {
|
|
'name': fullname,
|
|
'path': fullpath,
|
|
}
|
|
|
|
flags = _bootstrap_external._classify_pyc(data, fullname, exc_details)
|
|
|
|
hash_based = flags & 0b1 != 0
|
|
if hash_based:
|
|
check_source = flags & 0b10 != 0
|
|
if (_imp.check_hash_based_pycs != 'never' and
|
|
(check_source or _imp.check_hash_based_pycs == 'always')):
|
|
source_bytes = _get_pyc_source(self, fullpath)
|
|
if source_bytes is not None:
|
|
source_hash = _imp.source_hash(
|
|
_imp.pyc_magic_number_token,
|
|
source_bytes,
|
|
)
|
|
|
|
_bootstrap_external._validate_hash_pyc(
|
|
data, source_hash, fullname, exc_details)
|
|
else:
|
|
source_mtime, source_size = \
|
|
_get_mtime_and_size_of_source(self, fullpath)
|
|
|
|
if source_mtime:
|
|
# We don't use _bootstrap_external._validate_timestamp_pyc
|
|
# to allow for a more lenient timestamp check.
|
|
if (not _eq_mtime(_unpack_uint32(data[8:12]), source_mtime) or
|
|
_unpack_uint32(data[12:16]) != source_size):
|
|
_bootstrap._verbose_message(
|
|
f'bytecode is stale for {fullname!r}')
|
|
return None
|
|
|
|
code = marshal.loads(data[16:])
|
|
if not isinstance(code, _code_type):
|
|
raise TypeError(f'compiled module {pathname!r} is not a code object')
|
|
return code
|
|
|
|
_code_type = type(_unmarshal_code.__code__)
|
|
|
|
|
|
# Replace any occurrences of '\r\n?' in the input string with '\n'.
|
|
# This converts DOS and Mac line endings to Unix line endings.
|
|
def _normalize_line_endings(source):
|
|
source = source.replace(b'\r\n', b'\n')
|
|
source = source.replace(b'\r', b'\n')
|
|
return source
|
|
|
|
# Given a string buffer containing Python source code, compile it
|
|
# and return a code object.
|
|
def _compile_source(pathname, source, module):
|
|
source = _normalize_line_endings(source)
|
|
return compile(source, pathname, 'exec', dont_inherit=True, module=module)
|
|
|
|
# Convert the date/time values found in the Zip archive to a value
|
|
# that's compatible with the time stamp stored in .pyc files.
|
|
def _parse_dostime(d, t):
|
|
return time.mktime((
|
|
(d >> 9) + 1980, # bits 9..15: year
|
|
(d >> 5) & 0xF, # bits 5..8: month
|
|
d & 0x1F, # bits 0..4: day
|
|
t >> 11, # bits 11..15: hours
|
|
(t >> 5) & 0x3F, # bits 8..10: minutes
|
|
(t & 0x1F) * 2, # bits 0..7: seconds / 2
|
|
-1, -1, -1))
|
|
|
|
# Given a path to a .pyc file in the archive, return the
|
|
# modification time of the matching .py file and its size,
|
|
# or (0, 0) if no source is available.
|
|
def _get_mtime_and_size_of_source(self, path):
|
|
try:
|
|
# strip 'c' or 'o' from *.py[co]
|
|
assert path[-1:] in ('c', 'o')
|
|
path = path[:-1]
|
|
toc_entry = self._get_files()[path]
|
|
# fetch the time stamp of the .py file for comparison
|
|
# with an embedded pyc time stamp
|
|
time = toc_entry[5]
|
|
date = toc_entry[6]
|
|
uncompressed_size = toc_entry[3]
|
|
return _parse_dostime(date, time), uncompressed_size
|
|
except (KeyError, IndexError, TypeError):
|
|
return 0, 0
|
|
|
|
|
|
# Given a path to a .pyc file in the archive, return the
|
|
# contents of the matching .py file, or None if no source
|
|
# is available.
|
|
def _get_pyc_source(self, path):
|
|
# strip 'c' or 'o' from *.py[co]
|
|
assert path[-1:] in ('c', 'o')
|
|
path = path[:-1]
|
|
|
|
try:
|
|
toc_entry = self._get_files()[path]
|
|
except KeyError:
|
|
return None
|
|
else:
|
|
return _get_data(self.archive, toc_entry)
|
|
|
|
|
|
# Get the code object associated with the module specified by
|
|
# 'fullname'.
|
|
def _get_module_code(self, fullname):
|
|
path = _get_module_path(self, fullname)
|
|
import_error = None
|
|
for suffix, isbytecode, ispackage in _zip_searchorder:
|
|
fullpath = path + suffix
|
|
_bootstrap._verbose_message('trying {}{}{}', self.archive, path_sep, fullpath, verbosity=2)
|
|
try:
|
|
toc_entry = self._get_files()[fullpath]
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
modpath = toc_entry[0]
|
|
data = _get_data(self.archive, toc_entry)
|
|
code = None
|
|
if isbytecode:
|
|
try:
|
|
code = _unmarshal_code(self, modpath, fullpath, fullname, data)
|
|
except ImportError as exc:
|
|
import_error = exc
|
|
else:
|
|
code = _compile_source(modpath, data, fullname)
|
|
if code is None:
|
|
# bad magic number or non-matching mtime
|
|
# in byte code, try next
|
|
continue
|
|
modpath = toc_entry[0]
|
|
return code, ispackage, modpath
|
|
else:
|
|
if import_error:
|
|
msg = f"module load failed: {import_error}"
|
|
raise ZipImportError(msg, name=fullname) from import_error
|
|
else:
|
|
raise ZipImportError(f"can't find module {fullname!r}", name=fullname)
|