1import io
2import struct
3from typing import Optional
4
5from structlog import get_logger
6
7from unblob.extractors import Command
8
9from ...file_utils import InvalidInputFormat
10from ...models import (
11 File,
12 HandlerDoc,
13 HandlerType,
14 HexString,
15 Reference,
16 StructHandler,
17 ValidChunk,
18)
19
20FREE_SECTOR = 0xFFFFFFFF
21END_OF_CHAIN = 0xFFFFFFFE
22HEADER_SIZE = 512
23
24logger = get_logger()
25
26
27class MsiHandler(StructHandler):
28 NAME = "msi"
29
30 PATTERNS = [HexString("D0 CF 11 E0 A1 B1 1A E1")]
31 C_DEFINITIONS = r"""
32 typedef struct cfbf_header
33 {
34 // [offset from start (bytes), length (bytes)]
35 uint8 signature[8]; // [00H,08] {0xd0, 0xcf, 0x11, 0xe0, 0xa1, 0xb1,
36 // 0x1a, 0xe1} for current version
37 uint8 clsid[16]; // [08H,16] reserved must be zero (WriteClassStg/
38 // GetClassFile uses root directory class id)
39 uint16 minorVersion; // [18H,02] minor version of the format: 33 is
40 // written by reference implementation
41 uint16 dllVersion; // [1AH,02] major version of the dll/format: 3 for
42 // 512-byte sectors, 4 for 4 KB sectors
43 uint16 byteOrder; // [1CH,02] 0xFFFE: indicates Intel byte-ordering
44 uint16 sectorShift; // [1EH,02] size of sectors in power-of-two;
45 // typically 9 indicating 512-byte sectors
46 uint16 miniSectorShift; // [20H,02] size of mini-sectors in power-of-two;
47 // typically 6 indicating 64-byte mini-sectors
48 uint16 reserved; // [22H,02] reserved, must be zero
49 uint32 reserved1; // [24H,04] reserved, must be zero
50 uint32 csectDir; // [28H,04] must be zero for 512-byte sectors,
51 // number of SECTs in directory chain for 4 KB
52 // sectors
53 uint32 csectFat; // [2CH,04] number of SECTs in the FAT chain
54 uint32 sectDirStart; // [30H,04] first SECT in the directory chain
55 uint32 txSignature; // [34H,04] signature used for transactions; must
56 // be zero. The reference implementation
57 // does not support transactions
58 uint32 miniSectorCutoff; // [38H,04] maximum size for a mini stream;
59 // typically 4096 bytes
60 uint32 sectMiniFatStart; // [3CH,04] first SECT in the MiniFAT chain
61 uint32 csectMiniFat; // [40H,04] number of SECTs in the MiniFAT chain
62 uint32 sectDifStart; // [44H,04] first SECT in the DIFAT chain
63 uint32 csectDif; // [48H,04] number of SECTs in the DIFAT chain
64 uint32 sectFat[109]; // [4CH,436] the SECTs of first 109 FAT sectors
65 } cfbf_header_t;
66 """
67 HEADER_STRUCT = "cfbf_header_t"
68
69 EXTRACTOR = Command("7z", "x", "-p", "-y", "{inpath}", "-o{outdir}")
70
71 DOC = HandlerDoc(
72 name="MSI",
73 description="Microsoft Installer (MSI) files are used for the installation, maintenance, and removal of software.",
74 handler_type=HandlerType.ARCHIVE,
75 vendor="Microsoft",
76 references=[
77 Reference(
78 title="MSI File Format Documentation",
79 url="https://docs.microsoft.com/en-us/windows/win32/msi/overview-of-windows-installer",
80 ),
81 Reference(
82 title="Compound File Binary Format",
83 url="https://en.wikipedia.org/wiki/Compound_File_Binary_Format",
84 ),
85 ],
86 limitations=[
87 "Limited to CFB based extraction, not full-on MSI extraction",
88 "Extracted files have names coming from CFB internal representation, and may not correspond to the one they would have on disk after running the installer",
89 ],
90 )
91
92 def _read_sector(
93 self, file: File, start_offset: int, sector_size: int, sector_id: int
94 ) -> bytes:
95 # All sectors, including the fixed-size header, occupy full sector_size
96 sector_offset = start_offset + sector_size + sector_id * sector_size
97 if sector_offset > file.size():
98 raise InvalidInputFormat("Invalid MSI file, sector offset too large")
99
100 file.seek(sector_offset, io.SEEK_SET)
101 raw_sector = file.read(sector_size)
102 if len(raw_sector) != sector_size:
103 raise InvalidInputFormat("Invalid MSI file, sector shorter than expected")
104
105 return raw_sector
106
107 def _append_fat_sector(
108 self, fat_sectors: list[int], sector_id: int, required_count: int
109 ) -> bool:
110 if sector_id == FREE_SECTOR:
111 return False
112
113 fat_sectors.append(sector_id)
114 return len(fat_sectors) >= required_count
115
116 def _extend_fat_from_difat(
117 self,
118 file: File,
119 header,
120 start_offset: int,
121 sector_size: int,
122 entries_per_sector: int,
123 fat_sectors: list[int],
124 ) -> None:
125 difat_sector = header.sectDifStart
126
127 for _ in range(header.csectDif):
128 if difat_sector in (FREE_SECTOR, END_OF_CHAIN):
129 break
130
131 raw_sector = self._read_sector(
132 file, start_offset, sector_size, difat_sector
133 )
134 entries = struct.unpack(f"<{entries_per_sector}I", raw_sector)
135
136 difat_sector = entries[-1]
137 for entry in entries[:-1]:
138 if self._append_fat_sector(
139 fat_sectors, entry, required_count=header.csectFat
140 ):
141 return
142
143 def _collect_fat_sectors(
144 self,
145 file: File,
146 header,
147 start_offset: int,
148 sector_size: int,
149 entries_per_sector: int,
150 ) -> list[int]:
151 fat_sectors: list[int] = []
152
153 for sect in header.sectFat:
154 if self._append_fat_sector(fat_sectors, sect, header.csectFat):
155 return fat_sectors
156
157 if len(fat_sectors) < header.csectFat:
158 self._extend_fat_from_difat(
159 file, header, start_offset, sector_size, entries_per_sector, fat_sectors
160 )
161
162 if len(fat_sectors) != header.csectFat:
163 raise InvalidInputFormat("Invalid MSI file, incomplete FAT chain")
164
165 return fat_sectors
166
167 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]:
168 file.seek(start_offset, io.SEEK_SET)
169 header = self.parse_header(file)
170
171 sector_size = 2**header.sectorShift
172 entries_per_sector = sector_size // 4
173
174 if sector_size < HEADER_SIZE:
175 raise InvalidInputFormat("Invalid MSI file, sector smaller than header")
176
177 if header.csectFat == 0:
178 raise InvalidInputFormat("Invalid MSI file, FAT chain is empty")
179
180 fat_sectors = self._collect_fat_sectors(
181 file, header, start_offset, sector_size, entries_per_sector
182 )
183
184 max_used_sector = 0
185 for sector_index, sect in enumerate(fat_sectors):
186 raw_sector = self._read_sector(file, start_offset, sector_size, sect)
187 entries = struct.unpack(f"<{entries_per_sector}I", raw_sector)
188
189 base_sector_id = sector_index * entries_per_sector
190 for entry_id in range(len(entries) - 1, -1, -1):
191 if entries[entry_id] == FREE_SECTOR:
192 continue
193
194 max_id = base_sector_id + entry_id
195 max_used_sector = max(max_used_sector, max_id)
196 break
197
198 total_size = sector_size + ((max_used_sector + 1) * sector_size)
199
200 return ValidChunk(
201 start_offset=start_offset,
202 end_offset=start_offset + total_size,
203 )