1import io
2import struct
3from typing import Optional
4
5from structlog import get_logger
6
7from ...extractors import Command
8from ...file_utils import InvalidInputFormat, iterate_patterns
9from ...models import (
10 File,
11 HandlerDoc,
12 HandlerType,
13 HexString,
14 Reference,
15 StructHandler,
16 ValidChunk,
17)
18
19logger = get_logger()
20
21
22class ZIPHandler(StructHandler):
23 NAME = "zip"
24
25 PATTERNS = [HexString("50 4B 03 04 // Local file header only")]
26 C_DEFINITIONS = r"""
27
28 typedef struct cd_file_header {
29 uint32 magic;
30 uint16 version_made_by;
31 uint16 version_needed;
32 uint16 flags;
33 uint16 compression_method;
34 uint16 dostime;
35 uint16 dosdate;
36 uint32 crc32_cs;
37 uint32 compress_size;
38 uint32 file_size;
39 uint16 file_name_length;
40 uint16 extra_field_length;
41 uint16 file_comment_length;
42 uint16 disk_number_start;
43 uint16 internal_file_attr;
44 uint32 external_file_attr;
45 uint32 relative_offset_local_header;
46 // char file_name[file_name_length];
47 // char extra_field[extra_field_length];
48 } partial_cd_file_header_t;
49
50 typedef struct end_of_central_directory
51 {
52 uint32 end_of_central_signature;
53 uint16 disk_number;
54 uint16 disk_number_with_cd;
55 uint16 disk_entries;
56 uint16 total_entries;
57 uint32 central_directory_size;
58 uint32 offset_of_cd;
59 uint16 comment_len;
60 char zip_file_comment[comment_len];
61 } end_of_central_directory_t;
62
63 typedef struct zip64_end_of_central_directory_locator
64 {
65 uint32 signature;
66 uint32 disk_number;
67 uint64 offset_of_cd;
68 uint32 total_disk;
69 } zip64_end_of_central_directory_locator_t;
70
71 typedef struct zip64_end_of_central_directory
72 {
73 uint32 signature;
74 uint64 size_of_eocd_record;
75 uint16 version_made_by;
76 uint16 version_needed;
77 uint32 disk_number;
78 uint32 disk_number_with_cd;
79 uint64 total_entries_disk;
80 uint64 total_entries;
81 uint64 size_of_cd;
82 uint64 offset_of_cd;
83 } zip64_end_of_central_directory_t;
84
85 """
86 HEADER_STRUCT = "end_of_central_directory_t"
87
88 # empty password with -p will make sure the command will not hang
89 EXTRACTOR = Command("7z", "x", "-p", "-y", "{inpath}", "-o{outdir}")
90
91 DOC = HandlerDoc(
92 name="ZIP",
93 description="ZIP is a widely used archive file format that supports multiple compression methods, file spanning, and optional encryption. It includes metadata such as file names, sizes, and timestamps, and supports both standard and ZIP64 extensions for large files.",
94 handler_type=HandlerType.ARCHIVE,
95 vendor=None,
96 references=[
97 Reference(
98 title="ZIP File Format Specification",
99 url="https://pkware.com/documents/casestudies/APPNOTE.TXT",
100 ),
101 Reference(
102 title="ZIP64 Format Specification",
103 url="https://pkware.cachefly.net/webdocs/APPNOTE/APPNOTE-6.3.1.TXT",
104 ),
105 ],
106 limitations=["Does not support encrypted ZIP files."],
107 )
108
109 ENCRYPTED_FLAG = 0b0001
110 EOCD_RECORD_HEADER = 0x6054B50
111 ZIP64_EOCD_SIGNATURE = 0x06064B50
112 ZIP64_EOCD_LOCATOR_HEADER = 0x07064B50
113
114 def has_encrypted_files(
115 self,
116 file: File,
117 start_offset: int,
118 end_of_central_directory,
119 ) -> bool:
120 file.seek(start_offset + end_of_central_directory.offset_of_cd, io.SEEK_SET)
121 for _ in range(end_of_central_directory.total_entries):
122 file_header = self.cparser_le.partial_cd_file_header_t(file)
123 file.seek(
124 file_header.file_name_length + file_header.extra_field_length,
125 io.SEEK_CUR,
126 )
127 if file_header.flags & self.ENCRYPTED_FLAG:
128 return True
129 return False
130
131 @staticmethod
132 def is_zip64_eocd(end_of_central_directory):
133 # see https://pkware.cachefly.net/webdocs/APPNOTE/APPNOTE-6.3.1.TXT section J
134 return (
135 end_of_central_directory.disk_number == 0xFFFF
136 or end_of_central_directory.disk_number_with_cd == 0xFFFF
137 or end_of_central_directory.disk_entries == 0xFFFF
138 or end_of_central_directory.total_entries == 0xFFFF
139 or end_of_central_directory.central_directory_size == 0xFFFFFFFF
140 or end_of_central_directory.offset_of_cd == 0xFFFFFFFF
141 )
142
143 def has_zip64_tag(self, file):
144 # see https://pkware.cachefly.net/webdocs/casestudies/APPNOTE.TXT section 4.3.9.2
145 file_header = self.cparser_le.partial_cd_file_header_t(file)
146 return (
147 file_header.file_size == 0xFFFFFFFF
148 or file_header.compress_size == 0xFFFFFFFF
149 )
150
151 def _parse_zip64(self, file: File, start_offset: int, offset: int):
152 file.seek(start_offset, io.SEEK_SET)
153 for eocd_locator_offset in iterate_patterns(
154 file, struct.pack("<I", self.ZIP64_EOCD_LOCATOR_HEADER)
155 ):
156 file.seek(eocd_locator_offset, io.SEEK_SET)
157 eocd_locator = self.cparser_le.zip64_end_of_central_directory_locator_t(
158 file
159 )
160 logger.debug("eocd_locator", eocd_locator=eocd_locator, _verbosity=3)
161
162 # ZIP64 EOCD locator is right before the EOCD record
163 if eocd_locator_offset + len(eocd_locator) == offset:
164 file.seek(start_offset + eocd_locator.offset_of_cd)
165 zip64_eocd = self.cparser_le.zip64_end_of_central_directory_t(file)
166 logger.debug("zip64_eocd", zip64_eocd=zip64_eocd, _verbosity=3)
167
168 if zip64_eocd.signature != self.ZIP64_EOCD_SIGNATURE:
169 raise InvalidInputFormat(
170 "Missing ZIP64 EOCD header record header in ZIP chunk."
171 )
172 return zip64_eocd
173 return None
174
175 def get_zip64_eocd(self, file, start_offset, offset, end_of_central_directory):
176 # some values in the CD can be FFFF, indicating its a zip64
177 # if the offset of the CD is 0xFFFFFFFF, its definitely one
178 # otherwise we check every other header indicating zip64
179 if self.is_zip64_eocd(end_of_central_directory):
180 return self._parse_zip64(file, start_offset, offset)
181
182 absolute_offset_of_cd = start_offset + end_of_central_directory.offset_of_cd
183
184 if 0 < absolute_offset_of_cd < offset:
185 file.seek(absolute_offset_of_cd, io.SEEK_SET)
186 if self.has_zip64_tag(file):
187 return self._parse_zip64(file, start_offset, offset)
188
189 return None
190
191 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]:
192 has_encrypted_files = False
193 file.seek(start_offset, io.SEEK_SET)
194
195 offset = None
196 for offset in iterate_patterns(
197 file, struct.pack("<I", self.EOCD_RECORD_HEADER)
198 ):
199 file.seek(offset, io.SEEK_SET)
200 end_of_central_directory = self.parse_header(file)
201
202 zip64_eocd = self.get_zip64_eocd(
203 file, start_offset, offset, end_of_central_directory
204 )
205 if zip64_eocd is not None:
206 end_of_central_directory = zip64_eocd
207 break
208
209 # the EOCD offset is equal to the offset of CD + size of CD
210 end_of_central_directory_offset = (
211 start_offset
212 + end_of_central_directory.offset_of_cd
213 + end_of_central_directory.central_directory_size
214 )
215
216 if offset == end_of_central_directory_offset:
217 break
218 else:
219 raise InvalidInputFormat("Missing EOCD record header in ZIP chunk.")
220
221 has_encrypted_files = self.has_encrypted_files(
222 file, start_offset, end_of_central_directory
223 )
224
225 file.seek(offset, io.SEEK_SET)
226 self.cparser_le.end_of_central_directory_t(file)
227
228 return ValidChunk(
229 start_offset=start_offset,
230 end_offset=file.tell(),
231 is_encrypted=has_encrypted_files,
232 )