Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/archive/cpio.py: 89%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

234 statements  

1import io 

2import os 

3import stat 

4from pathlib import Path 

5 

6import attrs 

7from structlog import get_logger 

8 

9from ...file_utils import ( 

10 Endian, 

11 FileSystem, 

12 InvalidInputFormat, 

13 StructParser, 

14 decode_int, 

15 iterate_file, 

16 round_up, 

17 snull, 

18) 

19from ...models import ( 

20 Extractor, 

21 ExtractResult, 

22 File, 

23 Handler, 

24 HandlerDoc, 

25 HandlerType, 

26 HexString, 

27 Reference, 

28 ValidChunk, 

29) 

30 

31logger = get_logger() 

32 

33CPIO_TRAILER_NAME = "TRAILER!!!" 

34MAX_LINUX_PATH_LENGTH = 0x1000 

35 

36C_ISBLK = 0o60000 

37C_ISCHR = 0o20000 

38C_ISDIR = 0o40000 

39C_ISFIFO = 0o10000 

40C_ISSOCK = 0o140000 

41C_ISLNK = 0o120000 

42C_ISCTG = 0o110000 

43C_ISREG = 0o100000 

44 

45C_FILE_TYPES = ( 

46 C_ISBLK, 

47 C_ISCHR, 

48 C_ISDIR, 

49 C_ISFIFO, 

50 C_ISSOCK, 

51 C_ISLNK, 

52 C_ISCTG, 

53 C_ISREG, 

54) 

55 

56C_NONE = 0o00000 

57C_ISUID = 0o04000 

58C_ISGID = 0o02000 

59C_ISVTX = 0o01000 

60C_ISUID_ISGID = 0o06000 

61 

62C_STICKY_BITS = (C_NONE, C_ISUID, C_ISGID, C_ISVTX, C_ISUID_ISGID) 

63 

64C_DEFINITIONS = r""" 

65 typedef struct old_cpio_header 

66 { 

67 uint16 c_magic; 

68 uint16 c_dev; 

69 uint16 c_ino; 

70 uint16 c_mode; 

71 uint16 c_uid; 

72 uint16 c_gid; 

73 uint16 c_nlink; 

74 uint16 c_rdev; 

75 uint16 c_mtimes[2]; 

76 uint16 c_namesize; 

77 uint16 c_filesize[2]; 

78 } old_cpio_header_t; 

79 

80 typedef struct old_ascii_header 

81 { 

82 char c_magic[6]; 

83 char c_dev[6]; 

84 char c_ino[6]; 

85 char c_mode[6]; 

86 char c_uid[6]; 

87 char c_gid[6]; 

88 char c_nlink[6]; 

89 char c_rdev[6]; 

90 char c_mtime[11]; 

91 char c_namesize[6]; 

92 char c_filesize[11]; 

93 } old_ascii_header_t; 

94 

95 typedef struct new_ascii_header 

96 { 

97 char c_magic[6]; 

98 char c_ino[8]; 

99 char c_mode[8]; 

100 char c_uid[8]; 

101 char c_gid[8]; 

102 char c_nlink[8]; 

103 char c_mtime[8]; 

104 char c_filesize[8]; 

105 char c_dev_maj[8]; 

106 char c_dev_min[8]; 

107 char c_rdev_maj[8]; 

108 char c_rdev_min[8]; 

109 char c_namesize[8]; 

110 char c_chksum[8]; 

111 } new_ascii_header_t; 

112""" 

113 

114 

115@attrs.define 

116class CPIOEntry: 

117 start_offset: int 

118 size: int 

119 dev: int 

120 mode: int 

121 rdev: int 

122 path: Path 

123 

124 

125class CPIOParserBase: 

126 _PAD_ALIGN: int 

127 _FILE_PAD_ALIGN: int = 512 

128 HEADER_STRUCT: str 

129 

130 def __init__(self, file: File, start_offset: int): 

131 self.file = file 

132 self.start_offset = start_offset 

133 self.end_offset = -1 

134 self.entries = [] 

135 self.struct_parser = StructParser(C_DEFINITIONS) 

136 

137 def parse(self): # noqa: C901 

138 current_offset = self.start_offset 

139 while True: 

140 self.file.seek(current_offset, io.SEEK_SET) 

141 try: 

142 header = self.struct_parser.parse( 

143 self.HEADER_STRUCT, self.file, Endian.LITTLE 

144 ) 

145 except EOFError: 

146 break 

147 

148 c_filesize = self._calculate_file_size(header) 

149 c_namesize = self._calculate_name_size(header) 

150 

151 # heuristics 1: check the filename 

152 if c_namesize > MAX_LINUX_PATH_LENGTH: 

153 raise InvalidInputFormat("CPIO entry filename is too long.") 

154 

155 if c_namesize == 0: 

156 raise InvalidInputFormat("CPIO entry filename empty.") 

157 

158 padded_header_size = self._pad_header(header, c_namesize) 

159 current_offset += padded_header_size 

160 

161 tmp_filename = self.file.read(c_namesize) 

162 

163 # heuristics 2: check that filename is null-byte terminated 

164 if not tmp_filename.endswith(b"\x00"): 

165 raise InvalidInputFormat( 

166 "CPIO entry filename is not null-byte terminated" 

167 ) 

168 

169 try: 

170 filename = snull(tmp_filename).decode("utf-8") 

171 except UnicodeDecodeError as e: 

172 raise InvalidInputFormat from e 

173 

174 if filename == CPIO_TRAILER_NAME: 

175 current_offset += self._pad_content(c_filesize) 

176 break 

177 

178 c_mode = self._calculate_mode(header) 

179 

180 file_type = c_mode & 0o770000 

181 sticky_bit = c_mode & 0o7000 

182 

183 # heuristics 3: check mode field 

184 is_valid = file_type in C_FILE_TYPES and sticky_bit in C_STICKY_BITS 

185 if not is_valid: 

186 raise InvalidInputFormat("CPIO entry mode is invalid.") 

187 

188 if self.valid_checksum(header, current_offset): 

189 self.entries.append( 

190 CPIOEntry( 

191 start_offset=current_offset, 

192 size=c_filesize, 

193 dev=self._calculate_dev(header), 

194 mode=c_mode, 

195 rdev=self._calculate_rdev(header), 

196 path=Path(filename), 

197 ) 

198 ) 

199 else: 

200 logger.warning("Invalid CRC for CPIO entry, skipping.", header=header) 

201 

202 current_offset += self._pad_content(c_filesize) 

203 

204 self.end_offset = self._pad_file(current_offset) 

205 if self.start_offset == self.end_offset: 

206 raise InvalidInputFormat("Invalid CPIO archive.") 

207 

208 def dump_entries(self, fs: FileSystem): 

209 for entry in self.entries: 

210 # skip entries with "." as filename 

211 if entry.path.name in ("", "."): 

212 continue 

213 

214 # There are cases where CPIO archives have duplicated entries 

215 # We then unlink the files to overwrite them and avoid an error. 

216 if not stat.S_ISDIR(entry.mode): 

217 fs.unlink(entry.path) 

218 

219 if stat.S_ISREG(entry.mode): 

220 fs.carve(entry.path, self.file, entry.start_offset, entry.size) 

221 elif stat.S_ISDIR(entry.mode): 

222 fs.mkdir( 

223 entry.path, mode=entry.mode & 0o777, parents=True, exist_ok=True 

224 ) 

225 elif stat.S_ISLNK(entry.mode): 

226 link_path = Path( 

227 snull( 

228 self.file[entry.start_offset : entry.start_offset + entry.size] 

229 ).decode("utf-8") 

230 ) 

231 fs.create_symlink(src=link_path, dst=entry.path) 

232 elif ( 

233 stat.S_ISCHR(entry.mode) 

234 or stat.S_ISBLK(entry.mode) 

235 or stat.S_ISSOCK(entry.mode) 

236 or stat.S_ISSOCK(entry.mode) 

237 ): 

238 fs.mknod(entry.path, mode=entry.mode & 0o777, device=entry.rdev) 

239 else: 

240 logger.warning("unknown file type in CPIO archive") 

241 

242 def _pad_file(self, end_offset: int) -> int: 

243 """CPIO archives can have a 512 bytes block padding at the end.""" 

244 self.file.seek(end_offset, io.SEEK_SET) 

245 padded_end_offset = self.start_offset + round_up( 

246 size=end_offset - self.start_offset, alignment=self._FILE_PAD_ALIGN 

247 ) 

248 padding_size = padded_end_offset - end_offset 

249 

250 if self.file.read(padding_size) == bytes([0]) * padding_size: 

251 return padded_end_offset 

252 

253 return end_offset 

254 

255 @classmethod 

256 def _pad_header(cls, header, c_namesize: int) -> int: 

257 return round_up(len(header) + c_namesize, cls._PAD_ALIGN) 

258 

259 @classmethod 

260 def _pad_content(cls, c_filesize: int) -> int: 

261 """Pad header and content with _PAD_ALIGN bytes.""" 

262 return round_up(c_filesize, cls._PAD_ALIGN) 

263 

264 @staticmethod 

265 def _calculate_file_size(header) -> int: 

266 raise NotImplementedError 

267 

268 @staticmethod 

269 def _calculate_name_size(header) -> int: 

270 raise NotImplementedError 

271 

272 @staticmethod 

273 def _calculate_mode(header) -> int: 

274 raise NotImplementedError 

275 

276 @staticmethod 

277 def _calculate_dev(header) -> int: 

278 raise NotImplementedError 

279 

280 @staticmethod 

281 def _calculate_rdev(header) -> int: 

282 raise NotImplementedError 

283 

284 def valid_checksum(self, header, start_offset: int) -> bool: # noqa: ARG002 

285 return True 

286 

287 

288class BinaryCPIOParser(CPIOParserBase): 

289 _PAD_ALIGN = 2 

290 

291 HEADER_STRUCT = "old_cpio_header_t" 

292 

293 @staticmethod 

294 def _calculate_file_size(header) -> int: 

295 return header.c_filesize[0] << 16 | header.c_filesize[1] 

296 

297 @staticmethod 

298 def _calculate_name_size(header) -> int: 

299 return header.c_namesize + 1 if header.c_namesize % 2 else header.c_namesize 

300 

301 @staticmethod 

302 def _calculate_mode(header) -> int: 

303 return header.c_mode 

304 

305 @staticmethod 

306 def _calculate_dev(header) -> int: 

307 return header.c_dev 

308 

309 @staticmethod 

310 def _calculate_rdev(header) -> int: 

311 return header.c_rdev 

312 

313 

314class PortableOldASCIIParser(CPIOParserBase): 

315 _PAD_ALIGN = 1 

316 

317 HEADER_STRUCT = "old_ascii_header_t" 

318 

319 @staticmethod 

320 def _calculate_file_size(header) -> int: 

321 return decode_int(header.c_filesize, 8) 

322 

323 @staticmethod 

324 def _calculate_name_size(header) -> int: 

325 return decode_int(header.c_namesize, 8) 

326 

327 @staticmethod 

328 def _calculate_mode(header) -> int: 

329 return decode_int(header.c_mode, 8) 

330 

331 @staticmethod 

332 def _calculate_dev(header) -> int: 

333 return decode_int(header.c_dev, 8) 

334 

335 @staticmethod 

336 def _calculate_rdev(header) -> int: 

337 return decode_int(header.c_rdev, 8) 

338 

339 

340class PortableASCIIParser(CPIOParserBase): 

341 _PAD_ALIGN = 4 

342 HEADER_STRUCT = "new_ascii_header_t" 

343 

344 @staticmethod 

345 def _calculate_file_size(header) -> int: 

346 return decode_int(header.c_filesize, 16) 

347 

348 @staticmethod 

349 def _calculate_name_size(header) -> int: 

350 return decode_int(header.c_namesize, 16) 

351 

352 @staticmethod 

353 def _calculate_mode(header) -> int: 

354 return decode_int(header.c_mode, 16) 

355 

356 @staticmethod 

357 def _calculate_dev(header) -> int: 

358 return os.makedev( 

359 decode_int(header.c_dev_maj, 16), decode_int(header.c_dev_min, 16) 

360 ) 

361 

362 @staticmethod 

363 def _calculate_rdev(header) -> int: 

364 return os.makedev( 

365 decode_int(header.c_rdev_maj, 16), decode_int(header.c_rdev_min, 16) 

366 ) 

367 

368 

369class PortableASCIIWithCRCParser(PortableASCIIParser): 

370 def valid_checksum(self, header, start_offset: int) -> bool: 

371 header_checksum = decode_int(header.c_chksum, 16) 

372 calculated_checksum = 0 

373 file_size = self._calculate_file_size(header) 

374 

375 for chunk in iterate_file(self.file, start_offset, file_size): 

376 calculated_checksum += sum(bytearray(chunk)) 

377 return header_checksum == calculated_checksum & 0xFF_FF_FF_FF 

378 

379 

380class _CPIOExtractorBase(Extractor): 

381 PARSER: type[CPIOParserBase] 

382 

383 def extract(self, inpath: Path, outdir: Path) -> ExtractResult | None: 

384 fs = FileSystem(outdir) 

385 

386 with File.from_path(inpath) as file: 

387 parser = self.PARSER(file, 0) 

388 parser.parse() 

389 parser.dump_entries(fs) 

390 

391 

392class BinaryCPIOExtractor(_CPIOExtractorBase): 

393 PARSER = BinaryCPIOParser 

394 

395 

396class PortableOldASCIIExtractor(_CPIOExtractorBase): 

397 PARSER = PortableOldASCIIParser 

398 

399 

400class PortableASCIIExtractor(_CPIOExtractorBase): 

401 PARSER = PortableASCIIParser 

402 

403 

404class PortableASCIIWithCRCExtractor(_CPIOExtractorBase): 

405 PARSER = PortableASCIIWithCRCParser 

406 

407 

408class _CPIOHandlerBase(Handler): 

409 """A common base for all CPIO formats. 

410 

411 The format should be parsed the same, there are small differences how to calculate 

412 file and filename sizes padding and conversion from octal / hex. 

413 """ 

414 

415 EXTRACTOR: _CPIOExtractorBase 

416 

417 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None: 

418 parser = self.EXTRACTOR.PARSER(file, start_offset) 

419 parser.parse() 

420 return ValidChunk( 

421 start_offset=start_offset, 

422 end_offset=parser.end_offset, 

423 ) 

424 

425 

426class BinaryHandler(_CPIOHandlerBase): 

427 NAME = "cpio_binary" 

428 PATTERNS = [HexString("c7 71 // (default, bin, hpbin)")] 

429 

430 EXTRACTOR = BinaryCPIOExtractor() 

431 

432 DOC = HandlerDoc( 

433 name="CPIO (binary)", 

434 description="CPIO (Copy In, Copy Out) is an archive file format used for bundling files and directories along with their metadata. It is commonly used in Unix-like systems for creating backups or transferring files, and supports various encoding formats including binary and ASCII.", 

435 handler_type=HandlerType.ARCHIVE, 

436 vendor=None, 

437 references=[ 

438 Reference( 

439 title="GNU CPIO Manual", 

440 url="https://www.gnu.org/software/cpio/manual/cpio.html", 

441 ), 

442 ], 

443 limitations=[], 

444 ) 

445 

446 

447class PortableOldASCIIHandler(_CPIOHandlerBase): 

448 NAME = "cpio_portable_old_ascii" 

449 

450 PATTERNS = [HexString("30 37 30 37 30 37 // 07 07 07")] 

451 

452 EXTRACTOR = PortableOldASCIIExtractor() 

453 

454 DOC = HandlerDoc( 

455 name="CPIO (portable old ASCII)", 

456 description="CPIO (Copy In, Copy Out) is an archive file format used for bundling files and directories along with their metadata. It is commonly used in Unix-like systems for creating backups or transferring files, and supports various encoding formats including binary and ASCII.", 

457 handler_type=HandlerType.ARCHIVE, 

458 vendor=None, 

459 references=[ 

460 Reference( 

461 title="GNU CPIO Manual", 

462 url="https://www.gnu.org/software/cpio/manual/cpio.html", 

463 ), 

464 ], 

465 limitations=[], 

466 ) 

467 

468 

469class PortableASCIIHandler(_CPIOHandlerBase): 

470 NAME = "cpio_portable_ascii" 

471 PATTERNS = [HexString("30 37 30 37 30 31 // 07 07 01 (newc)")] 

472 

473 EXTRACTOR = PortableASCIIExtractor() 

474 

475 DOC = HandlerDoc( 

476 name="CPIO (portable ASCII)", 

477 description="CPIO (Copy In, Copy Out) is an archive file format used for bundling files and directories along with their metadata. It is commonly used in Unix-like systems for creating backups or transferring files, and supports various encoding formats including binary and ASCII.", 

478 handler_type=HandlerType.ARCHIVE, 

479 vendor=None, 

480 references=[ 

481 Reference( 

482 title="GNU CPIO Manual", 

483 url="https://www.gnu.org/software/cpio/manual/cpio.html", 

484 ), 

485 ], 

486 limitations=[], 

487 ) 

488 

489 

490class PortableASCIIWithCRCHandler(_CPIOHandlerBase): 

491 NAME = "cpio_portable_ascii_crc" 

492 PATTERNS = [HexString("30 37 30 37 30 32 // 07 07 02")] 

493 

494 EXTRACTOR = PortableASCIIWithCRCExtractor() 

495 

496 DOC = HandlerDoc( 

497 name="CPIO (portable ASCII CRC)", 

498 description="CPIO (Copy In, Copy Out) is an archive file format used for bundling files and directories along with their metadata. It is commonly used in Unix-like systems for creating backups or transferring files, and supports various encoding formats including binary and ASCII.", 

499 handler_type=HandlerType.ARCHIVE, 

500 vendor=None, 

501 references=[ 

502 Reference( 

503 title="GNU CPIO Manual", 

504 url="https://www.gnu.org/software/cpio/manual/cpio.html", 

505 ), 

506 ], 

507 limitations=[], 

508 )