|
| 1 | +import struct |
| 2 | +import zipfile |
| 3 | + |
| 4 | +# More forgiving implementation of zipfile.ZipFile |
| 5 | +_FH_SIGNATURE = 0 |
| 6 | +_FH_FILENAME_LENGTH = 10 |
| 7 | +_FH_EXTRA_FIELD_LENGTH = 11 |
| 8 | + |
| 9 | +structFileHeader = "<4s2B4HL2L2H" |
| 10 | +stringFileHeader = b"PK\003\004" |
| 11 | +sizeFileHeader = struct.calcsize(structFileHeader) |
| 12 | + |
| 13 | + |
| 14 | +class RelaxedZipFile(zipfile.ZipFile): |
| 15 | + def open(self, name, mode="r", pwd=None, *, force_zip64=False): |
| 16 | + # near copy of zipfile.ZipFile.open with |
| 17 | + """Return file-like object for 'name'. |
| 18 | +
|
| 19 | + name is a string for the file name within the ZIP file, or a ZipInfo |
| 20 | + object. |
| 21 | +
|
| 22 | + mode should be 'r' to read a file already in the ZIP file, or 'w' to |
| 23 | + write to a file newly added to the archive. |
| 24 | +
|
| 25 | + pwd is the password to decrypt files (only used for reading). |
| 26 | +
|
| 27 | + When writing, if the file size is not known in advance but may exceed |
| 28 | + 2 GiB, pass force_zip64 to use the ZIP64 format, which can handle large |
| 29 | + files. If the size is known in advance, it is best to pass a ZipInfo |
| 30 | + instance for name, with zinfo.file_size set. |
| 31 | + """ |
| 32 | + if mode not in {"r", "w"}: |
| 33 | + raise ValueError('open() requires mode "r" or "w"') |
| 34 | + if pwd and not isinstance(pwd, bytes): |
| 35 | + raise TypeError("pwd: expected bytes, got %s" % type(pwd).__name__) |
| 36 | + if pwd and (mode == "w"): |
| 37 | + raise ValueError("pwd is only supported for reading files") |
| 38 | + if not self.fp: |
| 39 | + raise ValueError("Attempt to use ZIP archive that was already closed") |
| 40 | + |
| 41 | + # Make sure we have an info object |
| 42 | + if isinstance(name, zipfile.ZipInfo): |
| 43 | + # 'name' is already an info object |
| 44 | + zinfo = name |
| 45 | + elif mode == "w": |
| 46 | + zinfo = zipfile.ZipInfo(name) |
| 47 | + zinfo.compress_type = self.compression |
| 48 | + zinfo._compresslevel = self.compresslevel |
| 49 | + else: |
| 50 | + # Get info object for name |
| 51 | + zinfo = self.getinfo(name) |
| 52 | + |
| 53 | + if mode == "w": |
| 54 | + return self._open_to_write(zinfo, force_zip64=force_zip64) |
| 55 | + |
| 56 | + if self._writing: |
| 57 | + raise ValueError( |
| 58 | + "Can't read from the ZIP file while there " |
| 59 | + "is an open writing handle on it. " |
| 60 | + "Close the writing handle before trying to read." |
| 61 | + ) |
| 62 | + |
| 63 | + # Open for reading: |
| 64 | + self._fileRefCnt += 1 |
| 65 | + zef_file = zipfile._SharedFile( |
| 66 | + self.fp, |
| 67 | + zinfo.header_offset, |
| 68 | + self._fpclose, |
| 69 | + self._lock, |
| 70 | + lambda: self._writing, |
| 71 | + ) |
| 72 | + try: |
| 73 | + # Skip the file header: |
| 74 | + fheader = zef_file.read(sizeFileHeader) |
| 75 | + if len(fheader) != sizeFileHeader: |
| 76 | + raise zipfile.BadZipFile("Truncated file header") |
| 77 | + fheader = struct.unpack(structFileHeader, fheader) |
| 78 | + if fheader[_FH_SIGNATURE] != stringFileHeader: |
| 79 | + raise zipfile.BadZipFile("Bad magic number for file header") |
| 80 | + |
| 81 | + zef_file.read(fheader[_FH_FILENAME_LENGTH]) |
| 82 | + if fheader[_FH_EXTRA_FIELD_LENGTH]: |
| 83 | + zef_file.read(fheader[_FH_EXTRA_FIELD_LENGTH]) |
| 84 | + |
| 85 | + return zipfile.ZipExtFile(zef_file, mode, zinfo, pwd, True) |
| 86 | + except BaseException: |
| 87 | + zef_file.close() |
| 88 | + raise |
0 commit comments