Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/archive/cpio.py: 89%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

236 statements  

1import io 

2import os 

3import stat 

4from pathlib import Path 

5 

6import attrs 

7from structlog import get_logger 

8 

9from ...file_utils import ( 

10 Endian, 

11 FileSystem, 

12 InvalidInputFormat, 

13 StructParser, 

14 decode_int, 

15 iterate_file, 

16 round_up, 

17 snull, 

18) 

19from ...models import ( 

20 Extractor, 

21 ExtractResult, 

22 File, 

23 Handler, 

24 HandlerDoc, 

25 HandlerType, 

26 HexString, 

27 Reference, 

28 ValidChunk, 

29) 

30 

31logger = get_logger() 

32 

33CPIO_TRAILER_NAME = "TRAILER!!!" 

34MAX_LINUX_PATH_LENGTH = 0x1000 

35 

36C_ISBLK = 0o60000 

37C_ISCHR = 0o20000 

38C_ISDIR = 0o40000 

39C_ISFIFO = 0o10000 

40C_ISSOCK = 0o140000 

41C_ISLNK = 0o120000 

42C_ISCTG = 0o110000 

43C_ISREG = 0o100000 

44 

45C_FILE_TYPES = ( 

46 C_ISBLK, 

47 C_ISCHR, 

48 C_ISDIR, 

49 C_ISFIFO, 

50 C_ISSOCK, 

51 C_ISLNK, 

52 C_ISCTG, 

53 C_ISREG, 

54) 

55 

56C_NONE = 0o00000 

57C_ISUID = 0o04000 

58C_ISGID = 0o02000 

59C_ISVTX = 0o01000 

60C_ISUID_ISGID = 0o06000 

61 

62C_STICKY_BITS = (C_NONE, C_ISUID, C_ISGID, C_ISVTX, C_ISUID_ISGID) 

63 

64C_DEFINITIONS = r""" 

65 typedef struct old_cpio_header 

66 { 

67 uint16 c_magic; 

68 uint16 c_dev; 

69 uint16 c_ino; 

70 uint16 c_mode; 

71 uint16 c_uid; 

72 uint16 c_gid; 

73 uint16 c_nlink; 

74 uint16 c_rdev; 

75 uint16 c_mtimes[2]; 

76 uint16 c_namesize; 

77 uint16 c_filesize[2]; 

78 } old_cpio_header_t; 

79 

80 typedef struct old_ascii_header 

81 { 

82 char c_magic[6]; 

83 char c_dev[6]; 

84 char c_ino[6]; 

85 char c_mode[6]; 

86 char c_uid[6]; 

87 char c_gid[6]; 

88 char c_nlink[6]; 

89 char c_rdev[6]; 

90 char c_mtime[11]; 

91 char c_namesize[6]; 

92 char c_filesize[11]; 

93 } old_ascii_header_t; 

94 

95 typedef struct new_ascii_header 

96 { 

97 char c_magic[6]; 

98 char c_ino[8]; 

99 char c_mode[8]; 

100 char c_uid[8]; 

101 char c_gid[8]; 

102 char c_nlink[8]; 

103 char c_mtime[8]; 

104 char c_filesize[8]; 

105 char c_dev_maj[8]; 

106 char c_dev_min[8]; 

107 char c_rdev_maj[8]; 

108 char c_rdev_min[8]; 

109 char c_namesize[8]; 

110 char c_chksum[8]; 

111 } new_ascii_header_t; 

112""" 

113 

114 

115@attrs.define 

116class CPIOEntry: 

117 start_offset: int 

118 size: int 

119 dev: int 

120 mode: int 

121 rdev: int 

122 path: Path 

123 

124 

125class CPIOParserBase: 

126 _PAD_ALIGN: int 

127 _FILE_PAD_ALIGN: int = 512 

128 HEADER_STRUCT: str 

129 

130 def __init__(self, file: File, start_offset: int): 

131 self.file = file 

132 self.start_offset = start_offset 

133 self.end_offset = -1 

134 self.entries = [] 

135 self.struct_parser = StructParser(C_DEFINITIONS) 

136 

137 def parse(self): # noqa: C901 

138 current_offset = self.start_offset 

139 while True: 

140 self.file.seek(current_offset, io.SEEK_SET) 

141 try: 

142 header = self.struct_parser.parse( 

143 self.HEADER_STRUCT, self.file, Endian.LITTLE 

144 ) 

145 except EOFError: 

146 break 

147 

148 c_filesize = self._calculate_file_size(header) 

149 c_namesize = self._calculate_name_size(header) 

150 

151 # heuristics 1: check the filename 

152 if c_namesize > MAX_LINUX_PATH_LENGTH: 

153 raise InvalidInputFormat("CPIO entry filename is too long.") 

154 

155 if c_namesize <= 0: 

156 raise InvalidInputFormat("CPIO entry filename size is invalid.") 

157 

158 if c_filesize < 0: 

159 raise InvalidInputFormat("CPIO entry file size is invalid.") 

160 

161 padded_header_size = self._pad_header(header, c_namesize) 

162 current_offset += padded_header_size 

163 

164 tmp_filename = self.file.read(c_namesize) 

165 

166 # heuristics 2: check that filename is null-byte terminated 

167 if not tmp_filename.endswith(b"\x00"): 

168 raise InvalidInputFormat( 

169 "CPIO entry filename is not null-byte terminated" 

170 ) 

171 

172 try: 

173 filename = snull(tmp_filename).decode("utf-8") 

174 except UnicodeDecodeError as e: 

175 raise InvalidInputFormat from e 

176 

177 if filename == CPIO_TRAILER_NAME: 

178 current_offset += self._pad_content(c_filesize) 

179 break 

180 

181 c_mode = self._calculate_mode(header) 

182 

183 file_type = c_mode & 0o770000 

184 sticky_bit = c_mode & 0o7000 

185 

186 # heuristics 3: check mode field 

187 is_valid = file_type in C_FILE_TYPES and sticky_bit in C_STICKY_BITS 

188 if not is_valid: 

189 raise InvalidInputFormat("CPIO entry mode is invalid.") 

190 

191 if self.valid_checksum(header, current_offset): 

192 self.entries.append( 

193 CPIOEntry( 

194 start_offset=current_offset, 

195 size=c_filesize, 

196 dev=self._calculate_dev(header), 

197 mode=c_mode, 

198 rdev=self._calculate_rdev(header), 

199 path=Path(filename), 

200 ) 

201 ) 

202 else: 

203 logger.warning("Invalid CRC for CPIO entry, skipping.", header=header) 

204 

205 current_offset += self._pad_content(c_filesize) 

206 

207 self.end_offset = self._pad_file(current_offset) 

208 if self.start_offset == self.end_offset: 

209 raise InvalidInputFormat("Invalid CPIO archive.") 

210 

211 def dump_entries(self, fs: FileSystem): 

212 for entry in self.entries: 

213 # skip entries with "." as filename 

214 if entry.path.name in ("", "."): 

215 continue 

216 

217 # There are cases where CPIO archives have duplicated entries 

218 # We then unlink the files to overwrite them and avoid an error. 

219 if not stat.S_ISDIR(entry.mode): 

220 fs.unlink(entry.path) 

221 

222 if stat.S_ISREG(entry.mode): 

223 fs.carve(entry.path, self.file, entry.start_offset, entry.size) 

224 elif stat.S_ISDIR(entry.mode): 

225 fs.mkdir( 

226 entry.path, mode=entry.mode & 0o777, parents=True, exist_ok=True 

227 ) 

228 elif stat.S_ISLNK(entry.mode): 

229 link_path = Path( 

230 snull( 

231 self.file[entry.start_offset : entry.start_offset + entry.size] 

232 ).decode("utf-8") 

233 ) 

234 fs.create_symlink(src=link_path, dst=entry.path) 

235 elif ( 

236 stat.S_ISCHR(entry.mode) 

237 or stat.S_ISBLK(entry.mode) 

238 or stat.S_ISSOCK(entry.mode) 

239 or stat.S_ISSOCK(entry.mode) 

240 ): 

241 fs.mknod(entry.path, mode=entry.mode & 0o777, device=entry.rdev) 

242 else: 

243 logger.warning("unknown file type in CPIO archive") 

244 

245 def _pad_file(self, end_offset: int) -> int: 

246 """CPIO archives can have a 512 bytes block padding at the end.""" 

247 self.file.seek(end_offset, io.SEEK_SET) 

248 padded_end_offset = self.start_offset + round_up( 

249 size=end_offset - self.start_offset, alignment=self._FILE_PAD_ALIGN 

250 ) 

251 padding_size = padded_end_offset - end_offset 

252 

253 if self.file.read(padding_size) == bytes([0]) * padding_size: 

254 return padded_end_offset 

255 

256 return end_offset 

257 

258 @classmethod 

259 def _pad_header(cls, header, c_namesize: int) -> int: 

260 return round_up(len(header) + c_namesize, cls._PAD_ALIGN) 

261 

262 @classmethod 

263 def _pad_content(cls, c_filesize: int) -> int: 

264 """Pad header and content with _PAD_ALIGN bytes.""" 

265 return round_up(c_filesize, cls._PAD_ALIGN) 

266 

267 @staticmethod 

268 def _calculate_file_size(header) -> int: 

269 raise NotImplementedError 

270 

271 @staticmethod 

272 def _calculate_name_size(header) -> int: 

273 raise NotImplementedError 

274 

275 @staticmethod 

276 def _calculate_mode(header) -> int: 

277 raise NotImplementedError 

278 

279 @staticmethod 

280 def _calculate_dev(header) -> int: 

281 raise NotImplementedError 

282 

283 @staticmethod 

284 def _calculate_rdev(header) -> int: 

285 raise NotImplementedError 

286 

287 def valid_checksum(self, header, start_offset: int) -> bool: # noqa: ARG002 

288 return True 

289 

290 

291class BinaryCPIOParser(CPIOParserBase): 

292 _PAD_ALIGN = 2 

293 

294 HEADER_STRUCT = "old_cpio_header_t" 

295 

296 @staticmethod 

297 def _calculate_file_size(header) -> int: 

298 return header.c_filesize[0] << 16 | header.c_filesize[1] 

299 

300 @staticmethod 

301 def _calculate_name_size(header) -> int: 

302 return header.c_namesize + 1 if header.c_namesize % 2 else header.c_namesize 

303 

304 @staticmethod 

305 def _calculate_mode(header) -> int: 

306 return header.c_mode 

307 

308 @staticmethod 

309 def _calculate_dev(header) -> int: 

310 return header.c_dev 

311 

312 @staticmethod 

313 def _calculate_rdev(header) -> int: 

314 return header.c_rdev 

315 

316 

317class PortableOldASCIIParser(CPIOParserBase): 

318 _PAD_ALIGN = 1 

319 

320 HEADER_STRUCT = "old_ascii_header_t" 

321 

322 @staticmethod 

323 def _calculate_file_size(header) -> int: 

324 return decode_int(header.c_filesize, 8) 

325 

326 @staticmethod 

327 def _calculate_name_size(header) -> int: 

328 return decode_int(header.c_namesize, 8) 

329 

330 @staticmethod 

331 def _calculate_mode(header) -> int: 

332 return decode_int(header.c_mode, 8) 

333 

334 @staticmethod 

335 def _calculate_dev(header) -> int: 

336 return decode_int(header.c_dev, 8) 

337 

338 @staticmethod 

339 def _calculate_rdev(header) -> int: 

340 return decode_int(header.c_rdev, 8) 

341 

342 

343class PortableASCIIParser(CPIOParserBase): 

344 _PAD_ALIGN = 4 

345 HEADER_STRUCT = "new_ascii_header_t" 

346 

347 @staticmethod 

348 def _calculate_file_size(header) -> int: 

349 return decode_int(header.c_filesize, 16) 

350 

351 @staticmethod 

352 def _calculate_name_size(header) -> int: 

353 return decode_int(header.c_namesize, 16) 

354 

355 @staticmethod 

356 def _calculate_mode(header) -> int: 

357 return decode_int(header.c_mode, 16) 

358 

359 @staticmethod 

360 def _calculate_dev(header) -> int: 

361 return os.makedev( 

362 decode_int(header.c_dev_maj, 16), decode_int(header.c_dev_min, 16) 

363 ) 

364 

365 @staticmethod 

366 def _calculate_rdev(header) -> int: 

367 return os.makedev( 

368 decode_int(header.c_rdev_maj, 16), decode_int(header.c_rdev_min, 16) 

369 ) 

370 

371 

372class PortableASCIIWithCRCParser(PortableASCIIParser): 

373 def valid_checksum(self, header, start_offset: int) -> bool: 

374 header_checksum = decode_int(header.c_chksum, 16) 

375 calculated_checksum = 0 

376 file_size = self._calculate_file_size(header) 

377 

378 for chunk in iterate_file(self.file, start_offset, file_size): 

379 calculated_checksum += sum(bytearray(chunk)) 

380 return header_checksum == calculated_checksum & 0xFF_FF_FF_FF 

381 

382 

383class _CPIOExtractorBase(Extractor): 

384 PARSER: type[CPIOParserBase] 

385 

386 def extract(self, inpath: Path, outdir: Path) -> ExtractResult | None: 

387 fs = FileSystem(outdir) 

388 

389 with File.from_path(inpath) as file: 

390 parser = self.PARSER(file, 0) 

391 parser.parse() 

392 parser.dump_entries(fs) 

393 

394 

395class BinaryCPIOExtractor(_CPIOExtractorBase): 

396 PARSER = BinaryCPIOParser 

397 

398 

399class PortableOldASCIIExtractor(_CPIOExtractorBase): 

400 PARSER = PortableOldASCIIParser 

401 

402 

403class PortableASCIIExtractor(_CPIOExtractorBase): 

404 PARSER = PortableASCIIParser 

405 

406 

407class PortableASCIIWithCRCExtractor(_CPIOExtractorBase): 

408 PARSER = PortableASCIIWithCRCParser 

409 

410 

411class _CPIOHandlerBase(Handler): 

412 """A common base for all CPIO formats. 

413 

414 The format should be parsed the same, there are small differences how to calculate 

415 file and filename sizes padding and conversion from octal / hex. 

416 """ 

417 

418 EXTRACTOR: _CPIOExtractorBase 

419 

420 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None: 

421 parser = self.EXTRACTOR.PARSER(file, start_offset) 

422 parser.parse() 

423 return ValidChunk( 

424 start_offset=start_offset, 

425 end_offset=parser.end_offset, 

426 ) 

427 

428 

429class BinaryHandler(_CPIOHandlerBase): 

430 NAME = "cpio_binary" 

431 PATTERNS = [HexString("c7 71 // (default, bin, hpbin)")] 

432 

433 EXTRACTOR = BinaryCPIOExtractor() 

434 

435 DOC = HandlerDoc( 

436 name="CPIO (binary)", 

437 description="CPIO (Copy In, Copy Out) is an archive file format used for bundling files and directories along with their metadata. It is commonly used in Unix-like systems for creating backups or transferring files, and supports various encoding formats including binary and ASCII.", 

438 handler_type=HandlerType.ARCHIVE, 

439 vendor=None, 

440 references=[ 

441 Reference( 

442 title="GNU CPIO Manual", 

443 url="https://www.gnu.org/software/cpio/manual/cpio.html", 

444 ), 

445 ], 

446 limitations=[], 

447 ) 

448 

449 

450class PortableOldASCIIHandler(_CPIOHandlerBase): 

451 NAME = "cpio_portable_old_ascii" 

452 

453 PATTERNS = [HexString("30 37 30 37 30 37 // 07 07 07")] 

454 

455 EXTRACTOR = PortableOldASCIIExtractor() 

456 

457 DOC = HandlerDoc( 

458 name="CPIO (portable old ASCII)", 

459 description="CPIO (Copy In, Copy Out) is an archive file format used for bundling files and directories along with their metadata. It is commonly used in Unix-like systems for creating backups or transferring files, and supports various encoding formats including binary and ASCII.", 

460 handler_type=HandlerType.ARCHIVE, 

461 vendor=None, 

462 references=[ 

463 Reference( 

464 title="GNU CPIO Manual", 

465 url="https://www.gnu.org/software/cpio/manual/cpio.html", 

466 ), 

467 ], 

468 limitations=[], 

469 ) 

470 

471 

472class PortableASCIIHandler(_CPIOHandlerBase): 

473 NAME = "cpio_portable_ascii" 

474 PATTERNS = [HexString("30 37 30 37 30 31 // 07 07 01 (newc)")] 

475 

476 EXTRACTOR = PortableASCIIExtractor() 

477 

478 DOC = HandlerDoc( 

479 name="CPIO (portable ASCII)", 

480 description="CPIO (Copy In, Copy Out) is an archive file format used for bundling files and directories along with their metadata. It is commonly used in Unix-like systems for creating backups or transferring files, and supports various encoding formats including binary and ASCII.", 

481 handler_type=HandlerType.ARCHIVE, 

482 vendor=None, 

483 references=[ 

484 Reference( 

485 title="GNU CPIO Manual", 

486 url="https://www.gnu.org/software/cpio/manual/cpio.html", 

487 ), 

488 ], 

489 limitations=[], 

490 ) 

491 

492 

493class PortableASCIIWithCRCHandler(_CPIOHandlerBase): 

494 NAME = "cpio_portable_ascii_crc" 

495 PATTERNS = [HexString("30 37 30 37 30 32 // 07 07 02")] 

496 

497 EXTRACTOR = PortableASCIIWithCRCExtractor() 

498 

499 DOC = HandlerDoc( 

500 name="CPIO (portable ASCII CRC)", 

501 description="CPIO (Copy In, Copy Out) is an archive file format used for bundling files and directories along with their metadata. It is commonly used in Unix-like systems for creating backups or transferring files, and supports various encoding formats including binary and ASCII.", 

502 handler_type=HandlerType.ARCHIVE, 

503 vendor=None, 

504 references=[ 

505 Reference( 

506 title="GNU CPIO Manual", 

507 url="https://www.gnu.org/software/cpio/manual/cpio.html", 

508 ), 

509 ], 

510 limitations=[], 

511 )