Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/archive/msi.py: 100%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

75 statements  

1import io 

2import struct 

3 

4from structlog import get_logger 

5 

6from unblob.extractors import Command 

7 

8from ...file_utils import InvalidInputFormat 

9from ...models import ( 

10 File, 

11 HandlerDoc, 

12 HandlerType, 

13 HexString, 

14 Reference, 

15 StructHandler, 

16 ValidChunk, 

17) 

18 

19FREE_SECTOR = 0xFFFFFFFF 

20END_OF_CHAIN = 0xFFFFFFFE 

21HEADER_SIZE = 512 

22 

23logger = get_logger() 

24 

25 

26class MsiHandler(StructHandler): 

27 NAME = "msi" 

28 

29 PATTERNS = [HexString("D0 CF 11 E0 A1 B1 1A E1")] 

30 C_DEFINITIONS = r""" 

31 typedef struct cfbf_header 

32 { 

33 // [offset from start (bytes), length (bytes)] 

34 uint8 signature[8]; // [00H,08] {0xd0, 0xcf, 0x11, 0xe0, 0xa1, 0xb1, 

35 // 0x1a, 0xe1} for current version 

36 uint8 clsid[16]; // [08H,16] reserved must be zero (WriteClassStg/ 

37 // GetClassFile uses root directory class id) 

38 uint16 minorVersion; // [18H,02] minor version of the format: 33 is 

39 // written by reference implementation 

40 uint16 dllVersion; // [1AH,02] major version of the dll/format: 3 for 

41 // 512-byte sectors, 4 for 4 KB sectors 

42 uint16 byteOrder; // [1CH,02] 0xFFFE: indicates Intel byte-ordering 

43 uint16 sectorShift; // [1EH,02] size of sectors in power-of-two; 

44 // typically 9 indicating 512-byte sectors 

45 uint16 miniSectorShift; // [20H,02] size of mini-sectors in power-of-two; 

46 // typically 6 indicating 64-byte mini-sectors 

47 uint16 reserved; // [22H,02] reserved, must be zero 

48 uint32 reserved1; // [24H,04] reserved, must be zero 

49 uint32 csectDir; // [28H,04] must be zero for 512-byte sectors, 

50 // number of SECTs in directory chain for 4 KB 

51 // sectors 

52 uint32 csectFat; // [2CH,04] number of SECTs in the FAT chain 

53 uint32 sectDirStart; // [30H,04] first SECT in the directory chain 

54 uint32 txSignature; // [34H,04] signature used for transactions; must 

55 // be zero. The reference implementation 

56 // does not support transactions 

57 uint32 miniSectorCutoff; // [38H,04] maximum size for a mini stream; 

58 // typically 4096 bytes 

59 uint32 sectMiniFatStart; // [3CH,04] first SECT in the MiniFAT chain 

60 uint32 csectMiniFat; // [40H,04] number of SECTs in the MiniFAT chain 

61 uint32 sectDifStart; // [44H,04] first SECT in the DIFAT chain 

62 uint32 csectDif; // [48H,04] number of SECTs in the DIFAT chain 

63 uint32 sectFat[109]; // [4CH,436] the SECTs of first 109 FAT sectors 

64 } cfbf_header_t; 

65 """ 

66 HEADER_STRUCT = "cfbf_header_t" 

67 

68 EXTRACTOR = Command("7z", "x", "-p", "-y", "{inpath}", "-o{outdir}") 

69 

70 DOC = HandlerDoc( 

71 name="MSI", 

72 description="Microsoft Installer (MSI) files are used for the installation, maintenance, and removal of software.", 

73 handler_type=HandlerType.ARCHIVE, 

74 vendor="Microsoft", 

75 references=[ 

76 Reference( 

77 title="MSI File Format Documentation", 

78 url="https://docs.microsoft.com/en-us/windows/win32/msi/overview-of-windows-installer", 

79 ), 

80 Reference( 

81 title="Compound File Binary Format", 

82 url="https://en.wikipedia.org/wiki/Compound_File_Binary_Format", 

83 ), 

84 ], 

85 limitations=[ 

86 "Limited to CFB based extraction, not full-on MSI extraction", 

87 "Extracted files have names coming from CFB internal representation, and may not correspond to the one they would have on disk after running the installer", 

88 ], 

89 ) 

90 

91 def _read_sector( 

92 self, file: File, start_offset: int, sector_size: int, sector_id: int 

93 ) -> bytes: 

94 # All sectors, including the fixed-size header, occupy full sector_size 

95 sector_offset = start_offset + sector_size + sector_id * sector_size 

96 if sector_offset > file.size(): 

97 raise InvalidInputFormat("Invalid MSI file, sector offset too large") 

98 

99 file.seek(sector_offset, io.SEEK_SET) 

100 raw_sector = file.read(sector_size) 

101 if len(raw_sector) != sector_size: 

102 raise InvalidInputFormat("Invalid MSI file, sector shorter than expected") 

103 

104 return raw_sector 

105 

106 def _append_fat_sector( 

107 self, fat_sectors: list[int], sector_id: int, required_count: int 

108 ) -> bool: 

109 if sector_id == FREE_SECTOR: 

110 return False 

111 

112 fat_sectors.append(sector_id) 

113 return len(fat_sectors) >= required_count 

114 

115 def _extend_fat_from_difat( 

116 self, 

117 file: File, 

118 header, 

119 start_offset: int, 

120 sector_size: int, 

121 entries_per_sector: int, 

122 fat_sectors: list[int], 

123 ) -> None: 

124 difat_sector = header.sectDifStart 

125 

126 for _ in range(header.csectDif): 

127 if difat_sector in (FREE_SECTOR, END_OF_CHAIN): 

128 break 

129 

130 raw_sector = self._read_sector( 

131 file, start_offset, sector_size, difat_sector 

132 ) 

133 entries = struct.unpack(f"<{entries_per_sector}I", raw_sector) 

134 

135 difat_sector = entries[-1] 

136 for entry in entries[:-1]: 

137 if self._append_fat_sector( 

138 fat_sectors, entry, required_count=header.csectFat 

139 ): 

140 return 

141 

142 def _collect_fat_sectors( 

143 self, 

144 file: File, 

145 header, 

146 start_offset: int, 

147 sector_size: int, 

148 entries_per_sector: int, 

149 ) -> list[int]: 

150 fat_sectors: list[int] = [] 

151 

152 for sect in header.sectFat: 

153 if self._append_fat_sector(fat_sectors, sect, header.csectFat): 

154 return fat_sectors 

155 

156 if len(fat_sectors) < header.csectFat: 

157 self._extend_fat_from_difat( 

158 file, header, start_offset, sector_size, entries_per_sector, fat_sectors 

159 ) 

160 

161 if len(fat_sectors) != header.csectFat: 

162 raise InvalidInputFormat("Invalid MSI file, incomplete FAT chain") 

163 

164 return fat_sectors 

165 

166 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None: 

167 file.seek(start_offset, io.SEEK_SET) 

168 header = self.parse_header(file) 

169 

170 sector_size = 2**header.sectorShift 

171 entries_per_sector = sector_size // 4 

172 

173 if sector_size < HEADER_SIZE: 

174 raise InvalidInputFormat("Invalid MSI file, sector smaller than header") 

175 

176 if header.csectFat == 0: 

177 raise InvalidInputFormat("Invalid MSI file, FAT chain is empty") 

178 

179 fat_sectors = self._collect_fat_sectors( 

180 file, header, start_offset, sector_size, entries_per_sector 

181 ) 

182 

183 max_used_sector = 0 

184 for sector_index, sect in enumerate(fat_sectors): 

185 raw_sector = self._read_sector(file, start_offset, sector_size, sect) 

186 entries = struct.unpack(f"<{entries_per_sector}I", raw_sector) 

187 

188 base_sector_id = sector_index * entries_per_sector 

189 for entry_id in range(len(entries) - 1, -1, -1): 

190 if entries[entry_id] == FREE_SECTOR: 

191 continue 

192 

193 max_id = base_sector_id + entry_id 

194 max_used_sector = max(max_used_sector, max_id) 

195 break 

196 

197 total_size = sector_size + ((max_used_sector + 1) * sector_size) 

198 

199 return ValidChunk( 

200 start_offset=start_offset, 

201 end_offset=start_offset + total_size, 

202 )