1import io
2import struct
3
4from structlog import get_logger
5
6from unblob.extractors import Command
7
8from ...file_utils import InvalidInputFormat
9from ...models import (
10 File,
11 HandlerDoc,
12 HandlerType,
13 HexString,
14 Reference,
15 StructHandler,
16 ValidChunk,
17)
18
19FREE_SECTOR = 0xFFFFFFFF
20END_OF_CHAIN = 0xFFFFFFFE
21HEADER_SIZE = 512
22
23logger = get_logger()
24
25
26class MsiHandler(StructHandler):
27 NAME = "msi"
28
29 PATTERNS = [HexString("D0 CF 11 E0 A1 B1 1A E1")]
30 C_DEFINITIONS = r"""
31 typedef struct cfbf_header
32 {
33 // [offset from start (bytes), length (bytes)]
34 uint8 signature[8]; // [00H,08] {0xd0, 0xcf, 0x11, 0xe0, 0xa1, 0xb1,
35 // 0x1a, 0xe1} for current version
36 uint8 clsid[16]; // [08H,16] reserved must be zero (WriteClassStg/
37 // GetClassFile uses root directory class id)
38 uint16 minorVersion; // [18H,02] minor version of the format: 33 is
39 // written by reference implementation
40 uint16 dllVersion; // [1AH,02] major version of the dll/format: 3 for
41 // 512-byte sectors, 4 for 4 KB sectors
42 uint16 byteOrder; // [1CH,02] 0xFFFE: indicates Intel byte-ordering
43 uint16 sectorShift; // [1EH,02] size of sectors in power-of-two;
44 // typically 9 indicating 512-byte sectors
45 uint16 miniSectorShift; // [20H,02] size of mini-sectors in power-of-two;
46 // typically 6 indicating 64-byte mini-sectors
47 uint16 reserved; // [22H,02] reserved, must be zero
48 uint32 reserved1; // [24H,04] reserved, must be zero
49 uint32 csectDir; // [28H,04] must be zero for 512-byte sectors,
50 // number of SECTs in directory chain for 4 KB
51 // sectors
52 uint32 csectFat; // [2CH,04] number of SECTs in the FAT chain
53 uint32 sectDirStart; // [30H,04] first SECT in the directory chain
54 uint32 txSignature; // [34H,04] signature used for transactions; must
55 // be zero. The reference implementation
56 // does not support transactions
57 uint32 miniSectorCutoff; // [38H,04] maximum size for a mini stream;
58 // typically 4096 bytes
59 uint32 sectMiniFatStart; // [3CH,04] first SECT in the MiniFAT chain
60 uint32 csectMiniFat; // [40H,04] number of SECTs in the MiniFAT chain
61 uint32 sectDifStart; // [44H,04] first SECT in the DIFAT chain
62 uint32 csectDif; // [48H,04] number of SECTs in the DIFAT chain
63 uint32 sectFat[109]; // [4CH,436] the SECTs of first 109 FAT sectors
64 } cfbf_header_t;
65 """
66 HEADER_STRUCT = "cfbf_header_t"
67
68 EXTRACTOR = Command("7z", "x", "-p", "-y", "{inpath}", "-o{outdir}")
69
70 DOC = HandlerDoc(
71 name="MSI",
72 description="Microsoft Installer (MSI) files are used for the installation, maintenance, and removal of software.",
73 handler_type=HandlerType.ARCHIVE,
74 vendor="Microsoft",
75 references=[
76 Reference(
77 title="MSI File Format Documentation",
78 url="https://docs.microsoft.com/en-us/windows/win32/msi/overview-of-windows-installer",
79 ),
80 Reference(
81 title="Compound File Binary Format",
82 url="https://en.wikipedia.org/wiki/Compound_File_Binary_Format",
83 ),
84 ],
85 limitations=[
86 "Limited to CFB based extraction, not full-on MSI extraction",
87 "Extracted files have names coming from CFB internal representation, and may not correspond to the one they would have on disk after running the installer",
88 ],
89 )
90
91 def _read_sector(
92 self, file: File, start_offset: int, sector_size: int, sector_id: int
93 ) -> bytes:
94 # All sectors, including the fixed-size header, occupy full sector_size
95 sector_offset = start_offset + sector_size + sector_id * sector_size
96 if sector_offset > file.size():
97 raise InvalidInputFormat("Invalid MSI file, sector offset too large")
98
99 file.seek(sector_offset, io.SEEK_SET)
100 raw_sector = file.read(sector_size)
101 if len(raw_sector) != sector_size:
102 raise InvalidInputFormat("Invalid MSI file, sector shorter than expected")
103
104 return raw_sector
105
106 def _append_fat_sector(
107 self, fat_sectors: list[int], sector_id: int, required_count: int
108 ) -> bool:
109 if sector_id == FREE_SECTOR:
110 return False
111
112 fat_sectors.append(sector_id)
113 return len(fat_sectors) >= required_count
114
115 def _extend_fat_from_difat(
116 self,
117 file: File,
118 header,
119 start_offset: int,
120 sector_size: int,
121 entries_per_sector: int,
122 fat_sectors: list[int],
123 ) -> None:
124 difat_sector = header.sectDifStart
125
126 for _ in range(header.csectDif):
127 if difat_sector in (FREE_SECTOR, END_OF_CHAIN):
128 break
129
130 raw_sector = self._read_sector(
131 file, start_offset, sector_size, difat_sector
132 )
133 entries = struct.unpack(f"<{entries_per_sector}I", raw_sector)
134
135 difat_sector = entries[-1]
136 for entry in entries[:-1]:
137 if self._append_fat_sector(
138 fat_sectors, entry, required_count=header.csectFat
139 ):
140 return
141
142 def _collect_fat_sectors(
143 self,
144 file: File,
145 header,
146 start_offset: int,
147 sector_size: int,
148 entries_per_sector: int,
149 ) -> list[int]:
150 fat_sectors: list[int] = []
151
152 for sect in header.sectFat:
153 if self._append_fat_sector(fat_sectors, sect, header.csectFat):
154 return fat_sectors
155
156 if len(fat_sectors) < header.csectFat:
157 self._extend_fat_from_difat(
158 file, header, start_offset, sector_size, entries_per_sector, fat_sectors
159 )
160
161 if len(fat_sectors) != header.csectFat:
162 raise InvalidInputFormat("Invalid MSI file, incomplete FAT chain")
163
164 return fat_sectors
165
166 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None:
167 file.seek(start_offset, io.SEEK_SET)
168 header = self.parse_header(file)
169
170 sector_size = 2**header.sectorShift
171 entries_per_sector = sector_size // 4
172
173 if sector_size < HEADER_SIZE:
174 raise InvalidInputFormat("Invalid MSI file, sector smaller than header")
175
176 if header.csectFat == 0:
177 raise InvalidInputFormat("Invalid MSI file, FAT chain is empty")
178
179 fat_sectors = self._collect_fat_sectors(
180 file, header, start_offset, sector_size, entries_per_sector
181 )
182
183 max_used_sector = 0
184 for sector_index, sect in enumerate(fat_sectors):
185 raw_sector = self._read_sector(file, start_offset, sector_size, sect)
186 entries = struct.unpack(f"<{entries_per_sector}I", raw_sector)
187
188 base_sector_id = sector_index * entries_per_sector
189 for entry_id in range(len(entries) - 1, -1, -1):
190 if entries[entry_id] == FREE_SECTOR:
191 continue
192
193 max_id = base_sector_id + entry_id
194 max_used_sector = max(max_used_sector, max_id)
195 break
196
197 total_size = sector_size + ((max_used_sector + 1) * sector_size)
198
199 return ValidChunk(
200 start_offset=start_offset,
201 end_offset=start_offset + total_size,
202 )