1import io
2import struct
3
4from structlog import get_logger
5
6from ...extractors import Command
7from ...file_utils import InvalidInputFormat, iterate_patterns
8from ...models import (
9 File,
10 HandlerDoc,
11 HandlerType,
12 HexString,
13 Reference,
14 StructHandler,
15 ValidChunk,
16)
17
18logger = get_logger()
19
20
21class ZIPHandler(StructHandler):
22 NAME = "zip"
23
24 PATTERNS = [HexString("50 4B 03 04 // Local file header only")]
25 C_DEFINITIONS = r"""
26
27 typedef struct cd_file_header {
28 uint32 magic;
29 uint16 version_made_by;
30 uint16 version_needed;
31 uint16 flags;
32 uint16 compression_method;
33 uint16 dostime;
34 uint16 dosdate;
35 uint32 crc32_cs;
36 uint32 compress_size;
37 uint32 file_size;
38 uint16 file_name_length;
39 uint16 extra_field_length;
40 uint16 file_comment_length;
41 uint16 disk_number_start;
42 uint16 internal_file_attr;
43 uint32 external_file_attr;
44 uint32 relative_offset_local_header;
45 // char file_name[file_name_length];
46 // char extra_field[extra_field_length];
47 } partial_cd_file_header_t;
48
49 typedef struct end_of_central_directory
50 {
51 uint32 end_of_central_signature;
52 uint16 disk_number;
53 uint16 disk_number_with_cd;
54 uint16 disk_entries;
55 uint16 total_entries;
56 uint32 central_directory_size;
57 uint32 offset_of_cd;
58 uint16 comment_len;
59 char zip_file_comment[comment_len];
60 } end_of_central_directory_t;
61
62 typedef struct zip64_end_of_central_directory_locator
63 {
64 uint32 signature;
65 uint32 disk_number;
66 uint64 offset_of_cd;
67 uint32 total_disk;
68 } zip64_end_of_central_directory_locator_t;
69
70 typedef struct zip64_end_of_central_directory
71 {
72 uint32 signature;
73 uint64 size_of_eocd_record;
74 uint16 version_made_by;
75 uint16 version_needed;
76 uint32 disk_number;
77 uint32 disk_number_with_cd;
78 uint64 total_entries_disk;
79 uint64 total_entries;
80 uint64 size_of_cd;
81 uint64 offset_of_cd;
82 } zip64_end_of_central_directory_t;
83
84 """
85 HEADER_STRUCT = "end_of_central_directory_t"
86
87 # empty password with -p will make sure the command will not hang
88 EXTRACTOR = Command("7z", "x", "-p", "-y", "{inpath}", "-o{outdir}")
89
90 DOC = HandlerDoc(
91 name="ZIP",
92 description="ZIP is a widely used archive file format that supports multiple compression methods, file spanning, and optional encryption. It includes metadata such as file names, sizes, and timestamps, and supports both standard and ZIP64 extensions for large files.",
93 handler_type=HandlerType.ARCHIVE,
94 vendor=None,
95 references=[
96 Reference(
97 title="ZIP File Format Specification",
98 url="https://pkware.com/documents/casestudies/APPNOTE.TXT",
99 ),
100 Reference(
101 title="ZIP64 Format Specification",
102 url="https://pkware.cachefly.net/webdocs/APPNOTE/APPNOTE-6.3.1.TXT",
103 ),
104 ],
105 limitations=["Does not support encrypted ZIP files."],
106 )
107
108 ENCRYPTED_FLAG = 0b0001
109 EOCD_RECORD_HEADER = 0x6054B50
110 ZIP64_EOCD_SIGNATURE = 0x06064B50
111 ZIP64_EOCD_LOCATOR_HEADER = 0x07064B50
112
113 def has_encrypted_files(
114 self,
115 file: File,
116 start_offset: int,
117 end_of_central_directory,
118 ) -> bool:
119 file.seek(start_offset + end_of_central_directory.offset_of_cd, io.SEEK_SET)
120 for _ in range(end_of_central_directory.total_entries):
121 file_header = self.cparser_le.partial_cd_file_header_t(file)
122 file.seek(
123 file_header.file_name_length + file_header.extra_field_length,
124 io.SEEK_CUR,
125 )
126 if file_header.flags & self.ENCRYPTED_FLAG:
127 return True
128 return False
129
130 @staticmethod
131 def is_zip64_eocd(end_of_central_directory):
132 # see https://pkware.cachefly.net/webdocs/APPNOTE/APPNOTE-6.3.1.TXT section J
133 return (
134 end_of_central_directory.disk_number == 0xFFFF
135 or end_of_central_directory.disk_number_with_cd == 0xFFFF
136 or end_of_central_directory.disk_entries == 0xFFFF
137 or end_of_central_directory.total_entries == 0xFFFF
138 or end_of_central_directory.central_directory_size == 0xFFFFFFFF
139 or end_of_central_directory.offset_of_cd == 0xFFFFFFFF
140 )
141
142 def has_zip64_tag(self, file):
143 # see https://pkware.cachefly.net/webdocs/casestudies/APPNOTE.TXT section 4.3.9.2
144 file_header = self.cparser_le.partial_cd_file_header_t(file)
145 return (
146 file_header.file_size == 0xFFFFFFFF
147 or file_header.compress_size == 0xFFFFFFFF
148 )
149
150 def _parse_zip64(self, file: File, start_offset: int, offset: int):
151 file.seek(start_offset, io.SEEK_SET)
152 for eocd_locator_offset in iterate_patterns(
153 file, struct.pack("<I", self.ZIP64_EOCD_LOCATOR_HEADER)
154 ):
155 file.seek(eocd_locator_offset, io.SEEK_SET)
156 eocd_locator = self.cparser_le.zip64_end_of_central_directory_locator_t(
157 file
158 )
159 logger.debug("eocd_locator", eocd_locator=eocd_locator, _verbosity=3)
160
161 # ZIP64 EOCD locator is right before the EOCD record
162 if eocd_locator_offset + len(eocd_locator) == offset:
163 file.seek(start_offset + eocd_locator.offset_of_cd)
164 zip64_eocd = self.cparser_le.zip64_end_of_central_directory_t(file)
165 logger.debug("zip64_eocd", zip64_eocd=zip64_eocd, _verbosity=3)
166
167 if zip64_eocd.signature != self.ZIP64_EOCD_SIGNATURE:
168 raise InvalidInputFormat(
169 "Missing ZIP64 EOCD header record header in ZIP chunk."
170 )
171 return zip64_eocd
172 return None
173
174 def get_zip64_eocd(self, file, start_offset, offset, end_of_central_directory):
175 # some values in the CD can be FFFF, indicating its a zip64
176 # if the offset of the CD is 0xFFFFFFFF, its definitely one
177 # otherwise we check every other header indicating zip64
178 if self.is_zip64_eocd(end_of_central_directory):
179 return self._parse_zip64(file, start_offset, offset)
180
181 absolute_offset_of_cd = start_offset + end_of_central_directory.offset_of_cd
182
183 if 0 < absolute_offset_of_cd < offset:
184 file.seek(absolute_offset_of_cd, io.SEEK_SET)
185 if self.has_zip64_tag(file):
186 return self._parse_zip64(file, start_offset, offset)
187
188 return None
189
190 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None:
191 has_encrypted_files = False
192 file.seek(start_offset, io.SEEK_SET)
193
194 offset = None
195 for offset in iterate_patterns(
196 file, struct.pack("<I", self.EOCD_RECORD_HEADER)
197 ):
198 file.seek(offset, io.SEEK_SET)
199 end_of_central_directory = self.parse_header(file)
200
201 zip64_eocd = self.get_zip64_eocd(
202 file, start_offset, offset, end_of_central_directory
203 )
204 if zip64_eocd is not None:
205 end_of_central_directory = zip64_eocd
206 break
207
208 # the EOCD offset is equal to the offset of CD + size of CD
209 end_of_central_directory_offset = (
210 start_offset
211 + end_of_central_directory.offset_of_cd
212 + end_of_central_directory.central_directory_size
213 )
214
215 if offset == end_of_central_directory_offset:
216 break
217 else:
218 raise InvalidInputFormat("Missing EOCD record header in ZIP chunk.")
219
220 has_encrypted_files = self.has_encrypted_files(
221 file, start_offset, end_of_central_directory
222 )
223
224 file.seek(offset, io.SEEK_SET)
225 self.cparser_le.end_of_central_directory_t(file)
226
227 return ValidChunk(
228 start_offset=start_offset,
229 end_offset=file.tell(),
230 is_encrypted=has_encrypted_files,
231 )