|
from typing import * |
|
import io |
|
import os |
|
from zipfile import ( |
|
ZipInfo, BadZipFile, ZipFile, ZipExtFile, |
|
sizeFileHeader, structFileHeader, stringFileHeader, |
|
_FH_SIGNATURE, _FH_FILENAME_LENGTH, _FH_EXTRA_FIELD_LENGTH, _FH_GENERAL_PURPOSE_FLAG_BITS, |
|
_MASK_COMPRESSED_PATCH, _MASK_STRONG_ENCRYPTION, _MASK_UTF_FILENAME, _MASK_ENCRYPTED |
|
) |
|
import struct |
|
from requests import Session |
|
|
|
from .webfile import WebFile |
|
|
|
|
|
class _SharedWebFile(WebFile): |
|
def __init__(self, webfile: WebFile, pos: int): |
|
super().__init__(webfile.url, webfile.session, size=webfile.size) |
|
self.seek(pos) |
|
|
|
|
|
class WebZipFile(ZipFile): |
|
"Lock-free version of ZipFile that reads from a WebFile, allowing for concurrent reads." |
|
def __init__(self, url: str, session: Optional[Session] = None, headers: Optional[Dict[str, str]] = None): |
|
"""Open the ZIP file with mode read 'r', write 'w', exclusive create 'x', |
|
or append 'a'.""" |
|
webf = WebFile(url, session=session, headers=headers) |
|
super().__init__(webf, mode='r') |
|
|
|
def open(self, name, mode="r", pwd=None, *, force_zip64=False): |
|
"""Return file-like object for 'name'. |
|
|
|
name is a string for the file name within the ZIP file, or a ZipInfo |
|
object. |
|
|
|
mode should be 'r' to read a file already in the ZIP file, or 'w' to |
|
write to a file newly added to the archive. |
|
|
|
pwd is the password to decrypt files (only used for reading). |
|
|
|
When writing, if the file size is not known in advance but may exceed |
|
2 GiB, pass force_zip64 to use the ZIP64 format, which can handle large |
|
files. If the size is known in advance, it is best to pass a ZipInfo |
|
instance for name, with zinfo.file_size set. |
|
""" |
|
if mode not in {"r", "w"}: |
|
raise ValueError('open() requires mode "r" or "w"') |
|
if pwd and (mode == "w"): |
|
raise ValueError("pwd is only supported for reading files") |
|
if not self.fp: |
|
raise ValueError( |
|
"Attempt to use ZIP archive that was already closed") |
|
|
|
assert mode == "r", "Only read mode is supported for now" |
|
|
|
|
|
if isinstance(name, ZipInfo): |
|
|
|
zinfo = name |
|
elif mode == 'w': |
|
zinfo = ZipInfo(name) |
|
zinfo.compress_type = self.compression |
|
zinfo._compresslevel = self.compresslevel |
|
else: |
|
|
|
zinfo = self.getinfo(name) |
|
|
|
if mode == 'w': |
|
return self._open_to_write(zinfo, force_zip64=force_zip64) |
|
|
|
if self._writing: |
|
raise ValueError("Can't read from the ZIP file while there " |
|
"is an open writing handle on it. " |
|
"Close the writing handle before trying to read.") |
|
|
|
|
|
self._fileRefCnt += 1 |
|
zef_file = _SharedWebFile(self.fp, zinfo.header_offset) |
|
|
|
try: |
|
|
|
fheader = zef_file.read(sizeFileHeader) |
|
if len(fheader) != sizeFileHeader: |
|
raise BadZipFile("Truncated file header") |
|
fheader = struct.unpack(structFileHeader, fheader) |
|
if fheader[_FH_SIGNATURE] != stringFileHeader: |
|
raise BadZipFile("Bad magic number for file header") |
|
|
|
fname = zef_file.read(fheader[_FH_FILENAME_LENGTH]) |
|
if fheader[_FH_EXTRA_FIELD_LENGTH]: |
|
zef_file.seek(fheader[_FH_EXTRA_FIELD_LENGTH], whence=1) |
|
|
|
if zinfo.flag_bits & _MASK_COMPRESSED_PATCH: |
|
|
|
raise NotImplementedError("compressed patched data (flag bit 5)") |
|
|
|
if zinfo.flag_bits & _MASK_STRONG_ENCRYPTION: |
|
|
|
raise NotImplementedError("strong encryption (flag bit 6)") |
|
|
|
if fheader[_FH_GENERAL_PURPOSE_FLAG_BITS] & _MASK_UTF_FILENAME: |
|
|
|
fname_str = fname.decode("utf-8") |
|
else: |
|
fname_str = fname.decode(self.metadata_encoding or "cp437") |
|
|
|
if fname_str != zinfo.orig_filename: |
|
raise BadZipFile( |
|
'File name in directory %r and header %r differ.' |
|
% (zinfo.orig_filename, fname)) |
|
|
|
|
|
is_encrypted = zinfo.flag_bits & _MASK_ENCRYPTED |
|
if is_encrypted: |
|
if not pwd: |
|
pwd = self.pwd |
|
if pwd and not isinstance(pwd, bytes): |
|
raise TypeError("pwd: expected bytes, got %s" % type(pwd).__name__) |
|
if not pwd: |
|
raise RuntimeError("File %r is encrypted, password " |
|
"required for extraction" % name) |
|
else: |
|
pwd = None |
|
|
|
return ZipExtFile(zef_file, mode, zinfo, pwd, True) |
|
except: |
|
zef_file.close() |
|
raise |