Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/archive/zip.py: 100%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

74 statements  

1import io 

2import struct 

3 

4from structlog import get_logger 

5 

6from ...extractors import Command 

7from ...file_utils import InvalidInputFormat, iterate_patterns 

8from ...models import ( 

9 File, 

10 HandlerDoc, 

11 HandlerType, 

12 HexString, 

13 Reference, 

14 StructHandler, 

15 ValidChunk, 

16) 

17 

18logger = get_logger() 

19 

20 

21class ZIPHandler(StructHandler): 

22 NAME = "zip" 

23 

24 PATTERNS = [HexString("50 4B 03 04 // Local file header only")] 

25 C_DEFINITIONS = r""" 

26 

27 typedef struct cd_file_header { 

28 uint32 magic; 

29 uint16 version_made_by; 

30 uint16 version_needed; 

31 uint16 flags; 

32 uint16 compression_method; 

33 uint16 dostime; 

34 uint16 dosdate; 

35 uint32 crc32_cs; 

36 uint32 compress_size; 

37 uint32 file_size; 

38 uint16 file_name_length; 

39 uint16 extra_field_length; 

40 uint16 file_comment_length; 

41 uint16 disk_number_start; 

42 uint16 internal_file_attr; 

43 uint32 external_file_attr; 

44 uint32 relative_offset_local_header; 

45 // char file_name[file_name_length]; 

46 // char extra_field[extra_field_length]; 

47 } partial_cd_file_header_t; 

48 

49 typedef struct end_of_central_directory 

50 { 

51 uint32 end_of_central_signature; 

52 uint16 disk_number; 

53 uint16 disk_number_with_cd; 

54 uint16 disk_entries; 

55 uint16 total_entries; 

56 uint32 central_directory_size; 

57 uint32 offset_of_cd; 

58 uint16 comment_len; 

59 char zip_file_comment[comment_len]; 

60 } end_of_central_directory_t; 

61 

62 typedef struct zip64_end_of_central_directory_locator 

63 { 

64 uint32 signature; 

65 uint32 disk_number; 

66 uint64 offset_of_cd; 

67 uint32 total_disk; 

68 } zip64_end_of_central_directory_locator_t; 

69 

70 typedef struct zip64_end_of_central_directory 

71 { 

72 uint32 signature; 

73 uint64 size_of_eocd_record; 

74 uint16 version_made_by; 

75 uint16 version_needed; 

76 uint32 disk_number; 

77 uint32 disk_number_with_cd; 

78 uint64 total_entries_disk; 

79 uint64 total_entries; 

80 uint64 size_of_cd; 

81 uint64 offset_of_cd; 

82 } zip64_end_of_central_directory_t; 

83 

84 """ 

85 HEADER_STRUCT = "end_of_central_directory_t" 

86 

87 # empty password with -p will make sure the command will not hang 

88 EXTRACTOR = Command("7z", "x", "-p", "-y", "{inpath}", "-o{outdir}") 

89 

90 DOC = HandlerDoc( 

91 name="ZIP", 

92 description="ZIP is a widely used archive file format that supports multiple compression methods, file spanning, and optional encryption. It includes metadata such as file names, sizes, and timestamps, and supports both standard and ZIP64 extensions for large files.", 

93 handler_type=HandlerType.ARCHIVE, 

94 vendor=None, 

95 references=[ 

96 Reference( 

97 title="ZIP File Format Specification", 

98 url="https://pkware.com/documents/casestudies/APPNOTE.TXT", 

99 ), 

100 Reference( 

101 title="ZIP64 Format Specification", 

102 url="https://pkware.cachefly.net/webdocs/APPNOTE/APPNOTE-6.3.1.TXT", 

103 ), 

104 ], 

105 limitations=["Does not support encrypted ZIP files."], 

106 ) 

107 

108 ENCRYPTED_FLAG = 0b0001 

109 EOCD_RECORD_HEADER = 0x6054B50 

110 ZIP64_EOCD_SIGNATURE = 0x06064B50 

111 ZIP64_EOCD_LOCATOR_HEADER = 0x07064B50 

112 

113 def has_encrypted_files( 

114 self, 

115 file: File, 

116 start_offset: int, 

117 end_of_central_directory, 

118 ) -> bool: 

119 file.seek(start_offset + end_of_central_directory.offset_of_cd, io.SEEK_SET) 

120 for _ in range(end_of_central_directory.total_entries): 

121 file_header = self.cparser_le.partial_cd_file_header_t(file) 

122 file.seek( 

123 file_header.file_name_length + file_header.extra_field_length, 

124 io.SEEK_CUR, 

125 ) 

126 if file_header.flags & self.ENCRYPTED_FLAG: 

127 return True 

128 return False 

129 

130 @staticmethod 

131 def is_zip64_eocd(end_of_central_directory): 

132 # see https://pkware.cachefly.net/webdocs/APPNOTE/APPNOTE-6.3.1.TXT section J 

133 return ( 

134 end_of_central_directory.disk_number == 0xFFFF 

135 or end_of_central_directory.disk_number_with_cd == 0xFFFF 

136 or end_of_central_directory.disk_entries == 0xFFFF 

137 or end_of_central_directory.total_entries == 0xFFFF 

138 or end_of_central_directory.central_directory_size == 0xFFFFFFFF 

139 or end_of_central_directory.offset_of_cd == 0xFFFFFFFF 

140 ) 

141 

142 def has_zip64_tag(self, file): 

143 # see https://pkware.cachefly.net/webdocs/casestudies/APPNOTE.TXT section 4.3.9.2 

144 file_header = self.cparser_le.partial_cd_file_header_t(file) 

145 return ( 

146 file_header.file_size == 0xFFFFFFFF 

147 or file_header.compress_size == 0xFFFFFFFF 

148 ) 

149 

150 def _parse_zip64(self, file: File, start_offset: int, offset: int): 

151 file.seek(start_offset, io.SEEK_SET) 

152 for eocd_locator_offset in iterate_patterns( 

153 file, struct.pack("<I", self.ZIP64_EOCD_LOCATOR_HEADER) 

154 ): 

155 file.seek(eocd_locator_offset, io.SEEK_SET) 

156 eocd_locator = self.cparser_le.zip64_end_of_central_directory_locator_t( 

157 file 

158 ) 

159 logger.debug("eocd_locator", eocd_locator=eocd_locator, _verbosity=3) 

160 

161 # ZIP64 EOCD locator is right before the EOCD record 

162 if eocd_locator_offset + len(eocd_locator) == offset: 

163 file.seek(start_offset + eocd_locator.offset_of_cd) 

164 zip64_eocd = self.cparser_le.zip64_end_of_central_directory_t(file) 

165 logger.debug("zip64_eocd", zip64_eocd=zip64_eocd, _verbosity=3) 

166 

167 if zip64_eocd.signature != self.ZIP64_EOCD_SIGNATURE: 

168 raise InvalidInputFormat( 

169 "Missing ZIP64 EOCD header record header in ZIP chunk." 

170 ) 

171 return zip64_eocd 

172 return None 

173 

174 def get_zip64_eocd(self, file, start_offset, offset, end_of_central_directory): 

175 # some values in the CD can be FFFF, indicating its a zip64 

176 # if the offset of the CD is 0xFFFFFFFF, its definitely one 

177 # otherwise we check every other header indicating zip64 

178 if self.is_zip64_eocd(end_of_central_directory): 

179 return self._parse_zip64(file, start_offset, offset) 

180 

181 absolute_offset_of_cd = start_offset + end_of_central_directory.offset_of_cd 

182 

183 if 0 < absolute_offset_of_cd < offset: 

184 file.seek(absolute_offset_of_cd, io.SEEK_SET) 

185 if self.has_zip64_tag(file): 

186 return self._parse_zip64(file, start_offset, offset) 

187 

188 return None 

189 

190 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None: 

191 has_encrypted_files = False 

192 file.seek(start_offset, io.SEEK_SET) 

193 

194 offset = None 

195 for offset in iterate_patterns( 

196 file, struct.pack("<I", self.EOCD_RECORD_HEADER) 

197 ): 

198 file.seek(offset, io.SEEK_SET) 

199 end_of_central_directory = self.parse_header(file) 

200 

201 zip64_eocd = self.get_zip64_eocd( 

202 file, start_offset, offset, end_of_central_directory 

203 ) 

204 if zip64_eocd is not None: 

205 end_of_central_directory = zip64_eocd 

206 break 

207 

208 # the EOCD offset is equal to the offset of CD + size of CD 

209 end_of_central_directory_offset = ( 

210 start_offset 

211 + end_of_central_directory.offset_of_cd 

212 + end_of_central_directory.central_directory_size 

213 ) 

214 

215 if offset == end_of_central_directory_offset: 

216 break 

217 else: 

218 raise InvalidInputFormat("Missing EOCD record header in ZIP chunk.") 

219 

220 has_encrypted_files = self.has_encrypted_files( 

221 file, start_offset, end_of_central_directory 

222 ) 

223 

224 file.seek(offset, io.SEEK_SET) 

225 self.cparser_le.end_of_central_directory_t(file) 

226 

227 return ValidChunk( 

228 start_offset=start_offset, 

229 end_offset=file.tell(), 

230 is_encrypted=has_encrypted_files, 

231 )