Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/archive/cpio.py: 89%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

235 statements  

1import io 

2import os 

3import stat 

4from pathlib import Path 

5from typing import Optional 

6 

7import attrs 

8from structlog import get_logger 

9 

10from ...file_utils import ( 

11 Endian, 

12 FileSystem, 

13 InvalidInputFormat, 

14 StructParser, 

15 decode_int, 

16 iterate_file, 

17 round_up, 

18 snull, 

19) 

20from ...models import ( 

21 Extractor, 

22 ExtractResult, 

23 File, 

24 Handler, 

25 HandlerDoc, 

26 HandlerType, 

27 HexString, 

28 Reference, 

29 ValidChunk, 

30) 

31 

32logger = get_logger() 

33 

34CPIO_TRAILER_NAME = "TRAILER!!!" 

35MAX_LINUX_PATH_LENGTH = 0x1000 

36 

37C_ISBLK = 0o60000 

38C_ISCHR = 0o20000 

39C_ISDIR = 0o40000 

40C_ISFIFO = 0o10000 

41C_ISSOCK = 0o140000 

42C_ISLNK = 0o120000 

43C_ISCTG = 0o110000 

44C_ISREG = 0o100000 

45 

46C_FILE_TYPES = ( 

47 C_ISBLK, 

48 C_ISCHR, 

49 C_ISDIR, 

50 C_ISFIFO, 

51 C_ISSOCK, 

52 C_ISLNK, 

53 C_ISCTG, 

54 C_ISREG, 

55) 

56 

57C_NONE = 0o00000 

58C_ISUID = 0o04000 

59C_ISGID = 0o02000 

60C_ISVTX = 0o01000 

61C_ISUID_ISGID = 0o06000 

62 

63C_STICKY_BITS = (C_NONE, C_ISUID, C_ISGID, C_ISVTX, C_ISUID_ISGID) 

64 

65C_DEFINITIONS = r""" 

66 typedef struct old_cpio_header 

67 { 

68 uint16 c_magic; 

69 uint16 c_dev; 

70 uint16 c_ino; 

71 uint16 c_mode; 

72 uint16 c_uid; 

73 uint16 c_gid; 

74 uint16 c_nlink; 

75 uint16 c_rdev; 

76 uint16 c_mtimes[2]; 

77 uint16 c_namesize; 

78 uint16 c_filesize[2]; 

79 } old_cpio_header_t; 

80 

81 typedef struct old_ascii_header 

82 { 

83 char c_magic[6]; 

84 char c_dev[6]; 

85 char c_ino[6]; 

86 char c_mode[6]; 

87 char c_uid[6]; 

88 char c_gid[6]; 

89 char c_nlink[6]; 

90 char c_rdev[6]; 

91 char c_mtime[11]; 

92 char c_namesize[6]; 

93 char c_filesize[11]; 

94 } old_ascii_header_t; 

95 

96 typedef struct new_ascii_header 

97 { 

98 char c_magic[6]; 

99 char c_ino[8]; 

100 char c_mode[8]; 

101 char c_uid[8]; 

102 char c_gid[8]; 

103 char c_nlink[8]; 

104 char c_mtime[8]; 

105 char c_filesize[8]; 

106 char c_dev_maj[8]; 

107 char c_dev_min[8]; 

108 char c_rdev_maj[8]; 

109 char c_rdev_min[8]; 

110 char c_namesize[8]; 

111 char c_chksum[8]; 

112 } new_ascii_header_t; 

113""" 

114 

115 

116@attrs.define 

117class CPIOEntry: 

118 start_offset: int 

119 size: int 

120 dev: int 

121 mode: int 

122 rdev: int 

123 path: Path 

124 

125 

126class CPIOParserBase: 

127 _PAD_ALIGN: int 

128 _FILE_PAD_ALIGN: int = 512 

129 HEADER_STRUCT: str 

130 

131 def __init__(self, file: File, start_offset: int): 

132 self.file = file 

133 self.start_offset = start_offset 

134 self.end_offset = -1 

135 self.entries = [] 

136 self.struct_parser = StructParser(C_DEFINITIONS) 

137 

138 def parse(self): # noqa: C901 

139 current_offset = self.start_offset 

140 while True: 

141 self.file.seek(current_offset, io.SEEK_SET) 

142 try: 

143 header = self.struct_parser.parse( 

144 self.HEADER_STRUCT, self.file, Endian.LITTLE 

145 ) 

146 except EOFError: 

147 break 

148 

149 c_filesize = self._calculate_file_size(header) 

150 c_namesize = self._calculate_name_size(header) 

151 

152 # heuristics 1: check the filename 

153 if c_namesize > MAX_LINUX_PATH_LENGTH: 

154 raise InvalidInputFormat("CPIO entry filename is too long.") 

155 

156 if c_namesize == 0: 

157 raise InvalidInputFormat("CPIO entry filename empty.") 

158 

159 padded_header_size = self._pad_header(header, c_namesize) 

160 current_offset += padded_header_size 

161 

162 tmp_filename = self.file.read(c_namesize) 

163 

164 # heuristics 2: check that filename is null-byte terminated 

165 if not tmp_filename.endswith(b"\x00"): 

166 raise InvalidInputFormat( 

167 "CPIO entry filename is not null-byte terminated" 

168 ) 

169 

170 try: 

171 filename = snull(tmp_filename).decode("utf-8") 

172 except UnicodeDecodeError as e: 

173 raise InvalidInputFormat from e 

174 

175 if filename == CPIO_TRAILER_NAME: 

176 current_offset += self._pad_content(c_filesize) 

177 break 

178 

179 c_mode = self._calculate_mode(header) 

180 

181 file_type = c_mode & 0o770000 

182 sticky_bit = c_mode & 0o7000 

183 

184 # heuristics 3: check mode field 

185 is_valid = file_type in C_FILE_TYPES and sticky_bit in C_STICKY_BITS 

186 if not is_valid: 

187 raise InvalidInputFormat("CPIO entry mode is invalid.") 

188 

189 if self.valid_checksum(header, current_offset): 

190 self.entries.append( 

191 CPIOEntry( 

192 start_offset=current_offset, 

193 size=c_filesize, 

194 dev=self._calculate_dev(header), 

195 mode=c_mode, 

196 rdev=self._calculate_rdev(header), 

197 path=Path(filename), 

198 ) 

199 ) 

200 else: 

201 logger.warning("Invalid CRC for CPIO entry, skipping.", header=header) 

202 

203 current_offset += self._pad_content(c_filesize) 

204 

205 self.end_offset = self._pad_file(current_offset) 

206 if self.start_offset == self.end_offset: 

207 raise InvalidInputFormat("Invalid CPIO archive.") 

208 

209 def dump_entries(self, fs: FileSystem): 

210 for entry in self.entries: 

211 # skip entries with "." as filename 

212 if entry.path.name in ("", "."): 

213 continue 

214 

215 # There are cases where CPIO archives have duplicated entries 

216 # We then unlink the files to overwrite them and avoid an error. 

217 if not stat.S_ISDIR(entry.mode): 

218 fs.unlink(entry.path) 

219 

220 if stat.S_ISREG(entry.mode): 

221 fs.carve(entry.path, self.file, entry.start_offset, entry.size) 

222 elif stat.S_ISDIR(entry.mode): 

223 fs.mkdir( 

224 entry.path, mode=entry.mode & 0o777, parents=True, exist_ok=True 

225 ) 

226 elif stat.S_ISLNK(entry.mode): 

227 link_path = Path( 

228 snull( 

229 self.file[entry.start_offset : entry.start_offset + entry.size] 

230 ).decode("utf-8") 

231 ) 

232 fs.create_symlink(src=link_path, dst=entry.path) 

233 elif ( 

234 stat.S_ISCHR(entry.mode) 

235 or stat.S_ISBLK(entry.mode) 

236 or stat.S_ISSOCK(entry.mode) 

237 or stat.S_ISSOCK(entry.mode) 

238 ): 

239 fs.mknod(entry.path, mode=entry.mode & 0o777, device=entry.rdev) 

240 else: 

241 logger.warning("unknown file type in CPIO archive") 

242 

243 def _pad_file(self, end_offset: int) -> int: 

244 """CPIO archives can have a 512 bytes block padding at the end.""" 

245 self.file.seek(end_offset, io.SEEK_SET) 

246 padded_end_offset = self.start_offset + round_up( 

247 size=end_offset - self.start_offset, alignment=self._FILE_PAD_ALIGN 

248 ) 

249 padding_size = padded_end_offset - end_offset 

250 

251 if self.file.read(padding_size) == bytes([0]) * padding_size: 

252 return padded_end_offset 

253 

254 return end_offset 

255 

256 @classmethod 

257 def _pad_header(cls, header, c_namesize: int) -> int: 

258 return round_up(len(header) + c_namesize, cls._PAD_ALIGN) 

259 

260 @classmethod 

261 def _pad_content(cls, c_filesize: int) -> int: 

262 """Pad header and content with _PAD_ALIGN bytes.""" 

263 return round_up(c_filesize, cls._PAD_ALIGN) 

264 

265 @staticmethod 

266 def _calculate_file_size(header) -> int: 

267 raise NotImplementedError 

268 

269 @staticmethod 

270 def _calculate_name_size(header) -> int: 

271 raise NotImplementedError 

272 

273 @staticmethod 

274 def _calculate_mode(header) -> int: 

275 raise NotImplementedError 

276 

277 @staticmethod 

278 def _calculate_dev(header) -> int: 

279 raise NotImplementedError 

280 

281 @staticmethod 

282 def _calculate_rdev(header) -> int: 

283 raise NotImplementedError 

284 

285 def valid_checksum(self, header, start_offset: int) -> bool: # noqa: ARG002 

286 return True 

287 

288 

289class BinaryCPIOParser(CPIOParserBase): 

290 _PAD_ALIGN = 2 

291 

292 HEADER_STRUCT = "old_cpio_header_t" 

293 

294 @staticmethod 

295 def _calculate_file_size(header) -> int: 

296 return header.c_filesize[0] << 16 | header.c_filesize[1] 

297 

298 @staticmethod 

299 def _calculate_name_size(header) -> int: 

300 return header.c_namesize + 1 if header.c_namesize % 2 else header.c_namesize 

301 

302 @staticmethod 

303 def _calculate_mode(header) -> int: 

304 return header.c_mode 

305 

306 @staticmethod 

307 def _calculate_dev(header) -> int: 

308 return header.c_dev 

309 

310 @staticmethod 

311 def _calculate_rdev(header) -> int: 

312 return header.c_rdev 

313 

314 

315class PortableOldASCIIParser(CPIOParserBase): 

316 _PAD_ALIGN = 1 

317 

318 HEADER_STRUCT = "old_ascii_header_t" 

319 

320 @staticmethod 

321 def _calculate_file_size(header) -> int: 

322 return decode_int(header.c_filesize, 8) 

323 

324 @staticmethod 

325 def _calculate_name_size(header) -> int: 

326 return decode_int(header.c_namesize, 8) 

327 

328 @staticmethod 

329 def _calculate_mode(header) -> int: 

330 return decode_int(header.c_mode, 8) 

331 

332 @staticmethod 

333 def _calculate_dev(header) -> int: 

334 return decode_int(header.c_dev, 8) 

335 

336 @staticmethod 

337 def _calculate_rdev(header) -> int: 

338 return decode_int(header.c_rdev, 8) 

339 

340 

341class PortableASCIIParser(CPIOParserBase): 

342 _PAD_ALIGN = 4 

343 HEADER_STRUCT = "new_ascii_header_t" 

344 

345 @staticmethod 

346 def _calculate_file_size(header) -> int: 

347 return decode_int(header.c_filesize, 16) 

348 

349 @staticmethod 

350 def _calculate_name_size(header) -> int: 

351 return decode_int(header.c_namesize, 16) 

352 

353 @staticmethod 

354 def _calculate_mode(header) -> int: 

355 return decode_int(header.c_mode, 16) 

356 

357 @staticmethod 

358 def _calculate_dev(header) -> int: 

359 return os.makedev( 

360 decode_int(header.c_dev_maj, 16), decode_int(header.c_dev_min, 16) 

361 ) 

362 

363 @staticmethod 

364 def _calculate_rdev(header) -> int: 

365 return os.makedev( 

366 decode_int(header.c_rdev_maj, 16), decode_int(header.c_rdev_min, 16) 

367 ) 

368 

369 

370class PortableASCIIWithCRCParser(PortableASCIIParser): 

371 def valid_checksum(self, header, start_offset: int) -> bool: 

372 header_checksum = decode_int(header.c_chksum, 16) 

373 calculated_checksum = 0 

374 file_size = self._calculate_file_size(header) 

375 

376 for chunk in iterate_file(self.file, start_offset, file_size): 

377 calculated_checksum += sum(bytearray(chunk)) 

378 return header_checksum == calculated_checksum & 0xFF_FF_FF_FF 

379 

380 

381class _CPIOExtractorBase(Extractor): 

382 PARSER: type[CPIOParserBase] 

383 

384 def extract(self, inpath: Path, outdir: Path) -> Optional[ExtractResult]: 

385 fs = FileSystem(outdir) 

386 

387 with File.from_path(inpath) as file: 

388 parser = self.PARSER(file, 0) 

389 parser.parse() 

390 parser.dump_entries(fs) 

391 

392 

393class BinaryCPIOExtractor(_CPIOExtractorBase): 

394 PARSER = BinaryCPIOParser 

395 

396 

397class PortableOldASCIIExtractor(_CPIOExtractorBase): 

398 PARSER = PortableOldASCIIParser 

399 

400 

401class PortableASCIIExtractor(_CPIOExtractorBase): 

402 PARSER = PortableASCIIParser 

403 

404 

405class PortableASCIIWithCRCExtractor(_CPIOExtractorBase): 

406 PARSER = PortableASCIIWithCRCParser 

407 

408 

409class _CPIOHandlerBase(Handler): 

410 """A common base for all CPIO formats. 

411 

412 The format should be parsed the same, there are small differences how to calculate 

413 file and filename sizes padding and conversion from octal / hex. 

414 """ 

415 

416 EXTRACTOR: _CPIOExtractorBase 

417 

418 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]: 

419 parser = self.EXTRACTOR.PARSER(file, start_offset) 

420 parser.parse() 

421 return ValidChunk( 

422 start_offset=start_offset, 

423 end_offset=parser.end_offset, 

424 ) 

425 

426 

427class BinaryHandler(_CPIOHandlerBase): 

428 NAME = "cpio_binary" 

429 PATTERNS = [HexString("c7 71 // (default, bin, hpbin)")] 

430 

431 EXTRACTOR = BinaryCPIOExtractor() 

432 

433 DOC = HandlerDoc( 

434 name="CPIO (binary)", 

435 description="CPIO (Copy In, Copy Out) is an archive file format used for bundling files and directories along with their metadata. It is commonly used in Unix-like systems for creating backups or transferring files, and supports various encoding formats including binary and ASCII.", 

436 handler_type=HandlerType.ARCHIVE, 

437 vendor=None, 

438 references=[ 

439 Reference( 

440 title="GNU CPIO Manual", 

441 url="https://www.gnu.org/software/cpio/manual/cpio.html", 

442 ), 

443 ], 

444 limitations=[], 

445 ) 

446 

447 

448class PortableOldASCIIHandler(_CPIOHandlerBase): 

449 NAME = "cpio_portable_old_ascii" 

450 

451 PATTERNS = [HexString("30 37 30 37 30 37 // 07 07 07")] 

452 

453 EXTRACTOR = PortableOldASCIIExtractor() 

454 

455 DOC = HandlerDoc( 

456 name="CPIO (portable old ASCII)", 

457 description="CPIO (Copy In, Copy Out) is an archive file format used for bundling files and directories along with their metadata. It is commonly used in Unix-like systems for creating backups or transferring files, and supports various encoding formats including binary and ASCII.", 

458 handler_type=HandlerType.ARCHIVE, 

459 vendor=None, 

460 references=[ 

461 Reference( 

462 title="GNU CPIO Manual", 

463 url="https://www.gnu.org/software/cpio/manual/cpio.html", 

464 ), 

465 ], 

466 limitations=[], 

467 ) 

468 

469 

470class PortableASCIIHandler(_CPIOHandlerBase): 

471 NAME = "cpio_portable_ascii" 

472 PATTERNS = [HexString("30 37 30 37 30 31 // 07 07 01 (newc)")] 

473 

474 EXTRACTOR = PortableASCIIExtractor() 

475 

476 DOC = HandlerDoc( 

477 name="CPIO (portable ASCII)", 

478 description="CPIO (Copy In, Copy Out) is an archive file format used for bundling files and directories along with their metadata. It is commonly used in Unix-like systems for creating backups or transferring files, and supports various encoding formats including binary and ASCII.", 

479 handler_type=HandlerType.ARCHIVE, 

480 vendor=None, 

481 references=[ 

482 Reference( 

483 title="GNU CPIO Manual", 

484 url="https://www.gnu.org/software/cpio/manual/cpio.html", 

485 ), 

486 ], 

487 limitations=[], 

488 ) 

489 

490 

491class PortableASCIIWithCRCHandler(_CPIOHandlerBase): 

492 NAME = "cpio_portable_ascii_crc" 

493 PATTERNS = [HexString("30 37 30 37 30 32 // 07 07 02")] 

494 

495 EXTRACTOR = PortableASCIIWithCRCExtractor() 

496 

497 DOC = HandlerDoc( 

498 name="CPIO (portable ASCII CRC)", 

499 description="CPIO (Copy In, Copy Out) is an archive file format used for bundling files and directories along with their metadata. It is commonly used in Unix-like systems for creating backups or transferring files, and supports various encoding formats including binary and ASCII.", 

500 handler_type=HandlerType.ARCHIVE, 

501 vendor=None, 

502 references=[ 

503 Reference( 

504 title="GNU CPIO Manual", 

505 url="https://www.gnu.org/software/cpio/manual/cpio.html", 

506 ), 

507 ], 

508 limitations=[], 

509 )