Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/archive/zip.py: 96%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

75 statements  

1import io 

2import struct 

3from typing import Optional 

4 

5from structlog import get_logger 

6 

7from ...extractors import Command 

8from ...file_utils import InvalidInputFormat, iterate_patterns 

9from ...models import ( 

10 File, 

11 HandlerDoc, 

12 HandlerType, 

13 HexString, 

14 Reference, 

15 StructHandler, 

16 ValidChunk, 

17) 

18 

19logger = get_logger() 

20 

21 

22class ZIPHandler(StructHandler): 

23 NAME = "zip" 

24 

25 PATTERNS = [HexString("50 4B 03 04 // Local file header only")] 

26 C_DEFINITIONS = r""" 

27 

28 typedef struct cd_file_header { 

29 uint32 magic; 

30 uint16 version_made_by; 

31 uint16 version_needed; 

32 uint16 flags; 

33 uint16 compression_method; 

34 uint16 dostime; 

35 uint16 dosdate; 

36 uint32 crc32_cs; 

37 uint32 compress_size; 

38 uint32 file_size; 

39 uint16 file_name_length; 

40 uint16 extra_field_length; 

41 uint16 file_comment_length; 

42 uint16 disk_number_start; 

43 uint16 internal_file_attr; 

44 uint32 external_file_attr; 

45 uint32 relative_offset_local_header; 

46 // char file_name[file_name_length]; 

47 // char extra_field[extra_field_length]; 

48 } partial_cd_file_header_t; 

49 

50 typedef struct end_of_central_directory 

51 { 

52 uint32 end_of_central_signature; 

53 uint16 disk_number; 

54 uint16 disk_number_with_cd; 

55 uint16 disk_entries; 

56 uint16 total_entries; 

57 uint32 central_directory_size; 

58 uint32 offset_of_cd; 

59 uint16 comment_len; 

60 char zip_file_comment[comment_len]; 

61 } end_of_central_directory_t; 

62 

63 typedef struct zip64_end_of_central_directory_locator 

64 { 

65 uint32 signature; 

66 uint32 disk_number; 

67 uint64 offset_of_cd; 

68 uint32 total_disk; 

69 } zip64_end_of_central_directory_locator_t; 

70 

71 typedef struct zip64_end_of_central_directory 

72 { 

73 uint32 signature; 

74 uint64 size_of_eocd_record; 

75 uint16 version_made_by; 

76 uint16 version_needed; 

77 uint32 disk_number; 

78 uint32 disk_number_with_cd; 

79 uint64 total_entries_disk; 

80 uint64 total_entries; 

81 uint64 size_of_cd; 

82 uint64 offset_of_cd; 

83 } zip64_end_of_central_directory_t; 

84 

85 """ 

86 HEADER_STRUCT = "end_of_central_directory_t" 

87 

88 # empty password with -p will make sure the command will not hang 

89 EXTRACTOR = Command("7z", "x", "-p", "-y", "{inpath}", "-o{outdir}") 

90 

91 DOC = HandlerDoc( 

92 name="ZIP", 

93 description="ZIP is a widely used archive file format that supports multiple compression methods, file spanning, and optional encryption. It includes metadata such as file names, sizes, and timestamps, and supports both standard and ZIP64 extensions for large files.", 

94 handler_type=HandlerType.ARCHIVE, 

95 vendor=None, 

96 references=[ 

97 Reference( 

98 title="ZIP File Format Specification", 

99 url="https://pkware.com/documents/casestudies/APPNOTE.TXT", 

100 ), 

101 Reference( 

102 title="ZIP64 Format Specification", 

103 url="https://pkware.cachefly.net/webdocs/APPNOTE/APPNOTE-6.3.1.TXT", 

104 ), 

105 ], 

106 limitations=["Does not support encrypted ZIP files."], 

107 ) 

108 

109 ENCRYPTED_FLAG = 0b0001 

110 EOCD_RECORD_HEADER = 0x6054B50 

111 ZIP64_EOCD_SIGNATURE = 0x06064B50 

112 ZIP64_EOCD_LOCATOR_HEADER = 0x07064B50 

113 

114 def has_encrypted_files( 

115 self, 

116 file: File, 

117 start_offset: int, 

118 end_of_central_directory, 

119 ) -> bool: 

120 file.seek(start_offset + end_of_central_directory.offset_of_cd, io.SEEK_SET) 

121 for _ in range(end_of_central_directory.total_entries): 

122 file_header = self.cparser_le.partial_cd_file_header_t(file) 

123 file.seek( 

124 file_header.file_name_length + file_header.extra_field_length, 

125 io.SEEK_CUR, 

126 ) 

127 if file_header.flags & self.ENCRYPTED_FLAG: 

128 return True 

129 return False 

130 

131 @staticmethod 

132 def is_zip64_eocd(end_of_central_directory): 

133 # see https://pkware.cachefly.net/webdocs/APPNOTE/APPNOTE-6.3.1.TXT section J 

134 return ( 

135 end_of_central_directory.disk_number == 0xFFFF 

136 or end_of_central_directory.disk_number_with_cd == 0xFFFF 

137 or end_of_central_directory.disk_entries == 0xFFFF 

138 or end_of_central_directory.total_entries == 0xFFFF 

139 or end_of_central_directory.central_directory_size == 0xFFFFFFFF 

140 or end_of_central_directory.offset_of_cd == 0xFFFFFFFF 

141 ) 

142 

143 def has_zip64_tag(self, file): 

144 # see https://pkware.cachefly.net/webdocs/casestudies/APPNOTE.TXT section 4.3.9.2 

145 file_header = self.cparser_le.partial_cd_file_header_t(file) 

146 return ( 

147 file_header.file_size == 0xFFFFFFFF 

148 or file_header.compress_size == 0xFFFFFFFF 

149 ) 

150 

151 def _parse_zip64(self, file: File, start_offset: int, offset: int): 

152 file.seek(start_offset, io.SEEK_SET) 

153 for eocd_locator_offset in iterate_patterns( 

154 file, struct.pack("<I", self.ZIP64_EOCD_LOCATOR_HEADER) 

155 ): 

156 file.seek(eocd_locator_offset, io.SEEK_SET) 

157 eocd_locator = self.cparser_le.zip64_end_of_central_directory_locator_t( 

158 file 

159 ) 

160 logger.debug("eocd_locator", eocd_locator=eocd_locator, _verbosity=3) 

161 

162 # ZIP64 EOCD locator is right before the EOCD record 

163 if eocd_locator_offset + len(eocd_locator) == offset: 

164 file.seek(start_offset + eocd_locator.offset_of_cd) 

165 zip64_eocd = self.cparser_le.zip64_end_of_central_directory_t(file) 

166 logger.debug("zip64_eocd", zip64_eocd=zip64_eocd, _verbosity=3) 

167 

168 if zip64_eocd.signature != self.ZIP64_EOCD_SIGNATURE: 

169 raise InvalidInputFormat( 

170 "Missing ZIP64 EOCD header record header in ZIP chunk." 

171 ) 

172 return zip64_eocd 

173 return None 

174 

175 def get_zip64_eocd(self, file, start_offset, offset, end_of_central_directory): 

176 # some values in the CD can be FFFF, indicating its a zip64 

177 # if the offset of the CD is 0xFFFFFFFF, its definitely one 

178 # otherwise we check every other header indicating zip64 

179 if self.is_zip64_eocd(end_of_central_directory): 

180 return self._parse_zip64(file, start_offset, offset) 

181 

182 absolute_offset_of_cd = start_offset + end_of_central_directory.offset_of_cd 

183 

184 if 0 < absolute_offset_of_cd < offset: 

185 file.seek(absolute_offset_of_cd, io.SEEK_SET) 

186 if self.has_zip64_tag(file): 

187 return self._parse_zip64(file, start_offset, offset) 

188 

189 return None 

190 

191 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]: 

192 has_encrypted_files = False 

193 file.seek(start_offset, io.SEEK_SET) 

194 

195 offset = None 

196 for offset in iterate_patterns( 

197 file, struct.pack("<I", self.EOCD_RECORD_HEADER) 

198 ): 

199 file.seek(offset, io.SEEK_SET) 

200 end_of_central_directory = self.parse_header(file) 

201 

202 zip64_eocd = self.get_zip64_eocd( 

203 file, start_offset, offset, end_of_central_directory 

204 ) 

205 if zip64_eocd is not None: 

206 end_of_central_directory = zip64_eocd 

207 break 

208 

209 # the EOCD offset is equal to the offset of CD + size of CD 

210 end_of_central_directory_offset = ( 

211 start_offset 

212 + end_of_central_directory.offset_of_cd 

213 + end_of_central_directory.central_directory_size 

214 ) 

215 

216 if offset == end_of_central_directory_offset: 

217 break 

218 else: 

219 raise InvalidInputFormat("Missing EOCD record header in ZIP chunk.") 

220 

221 has_encrypted_files = self.has_encrypted_files( 

222 file, start_offset, end_of_central_directory 

223 ) 

224 

225 file.seek(offset, io.SEEK_SET) 

226 self.cparser_le.end_of_central_directory_t(file) 

227 

228 return ValidChunk( 

229 start_offset=start_offset, 

230 end_offset=file.tell(), 

231 is_encrypted=has_encrypted_files, 

232 )