1import io
2import os
3import stat
4from pathlib import Path
5from typing import Optional
6
7import attrs
8from structlog import get_logger
9
10from ...file_utils import (
11 Endian,
12 FileSystem,
13 InvalidInputFormat,
14 StructParser,
15 decode_int,
16 iterate_file,
17 round_up,
18 snull,
19)
20from ...models import (
21 Extractor,
22 ExtractResult,
23 File,
24 Handler,
25 HandlerDoc,
26 HandlerType,
27 HexString,
28 Reference,
29 ValidChunk,
30)
31
32logger = get_logger()
33
34CPIO_TRAILER_NAME = "TRAILER!!!"
35MAX_LINUX_PATH_LENGTH = 0x1000
36
37C_ISBLK = 0o60000
38C_ISCHR = 0o20000
39C_ISDIR = 0o40000
40C_ISFIFO = 0o10000
41C_ISSOCK = 0o140000
42C_ISLNK = 0o120000
43C_ISCTG = 0o110000
44C_ISREG = 0o100000
45
46C_FILE_TYPES = (
47 C_ISBLK,
48 C_ISCHR,
49 C_ISDIR,
50 C_ISFIFO,
51 C_ISSOCK,
52 C_ISLNK,
53 C_ISCTG,
54 C_ISREG,
55)
56
57C_NONE = 0o00000
58C_ISUID = 0o04000
59C_ISGID = 0o02000
60C_ISVTX = 0o01000
61C_ISUID_ISGID = 0o06000
62
63C_STICKY_BITS = (C_NONE, C_ISUID, C_ISGID, C_ISVTX, C_ISUID_ISGID)
64
65C_DEFINITIONS = r"""
66 typedef struct old_cpio_header
67 {
68 uint16 c_magic;
69 uint16 c_dev;
70 uint16 c_ino;
71 uint16 c_mode;
72 uint16 c_uid;
73 uint16 c_gid;
74 uint16 c_nlink;
75 uint16 c_rdev;
76 uint16 c_mtimes[2];
77 uint16 c_namesize;
78 uint16 c_filesize[2];
79 } old_cpio_header_t;
80
81 typedef struct old_ascii_header
82 {
83 char c_magic[6];
84 char c_dev[6];
85 char c_ino[6];
86 char c_mode[6];
87 char c_uid[6];
88 char c_gid[6];
89 char c_nlink[6];
90 char c_rdev[6];
91 char c_mtime[11];
92 char c_namesize[6];
93 char c_filesize[11];
94 } old_ascii_header_t;
95
96 typedef struct new_ascii_header
97 {
98 char c_magic[6];
99 char c_ino[8];
100 char c_mode[8];
101 char c_uid[8];
102 char c_gid[8];
103 char c_nlink[8];
104 char c_mtime[8];
105 char c_filesize[8];
106 char c_dev_maj[8];
107 char c_dev_min[8];
108 char c_rdev_maj[8];
109 char c_rdev_min[8];
110 char c_namesize[8];
111 char c_chksum[8];
112 } new_ascii_header_t;
113"""
114
115
116@attrs.define
117class CPIOEntry:
118 start_offset: int
119 size: int
120 dev: int
121 mode: int
122 rdev: int
123 path: Path
124
125
126class CPIOParserBase:
127 _PAD_ALIGN: int
128 _FILE_PAD_ALIGN: int = 512
129 HEADER_STRUCT: str
130
131 def __init__(self, file: File, start_offset: int):
132 self.file = file
133 self.start_offset = start_offset
134 self.end_offset = -1
135 self.entries = []
136 self.struct_parser = StructParser(C_DEFINITIONS)
137
138 def parse(self): # noqa: C901
139 current_offset = self.start_offset
140 while True:
141 self.file.seek(current_offset, io.SEEK_SET)
142 try:
143 header = self.struct_parser.parse(
144 self.HEADER_STRUCT, self.file, Endian.LITTLE
145 )
146 except EOFError:
147 break
148
149 c_filesize = self._calculate_file_size(header)
150 c_namesize = self._calculate_name_size(header)
151
152 # heuristics 1: check the filename
153 if c_namesize > MAX_LINUX_PATH_LENGTH:
154 raise InvalidInputFormat("CPIO entry filename is too long.")
155
156 if c_namesize == 0:
157 raise InvalidInputFormat("CPIO entry filename empty.")
158
159 padded_header_size = self._pad_header(header, c_namesize)
160 current_offset += padded_header_size
161
162 tmp_filename = self.file.read(c_namesize)
163
164 # heuristics 2: check that filename is null-byte terminated
165 if not tmp_filename.endswith(b"\x00"):
166 raise InvalidInputFormat(
167 "CPIO entry filename is not null-byte terminated"
168 )
169
170 try:
171 filename = snull(tmp_filename).decode("utf-8")
172 except UnicodeDecodeError as e:
173 raise InvalidInputFormat from e
174
175 if filename == CPIO_TRAILER_NAME:
176 current_offset += self._pad_content(c_filesize)
177 break
178
179 c_mode = self._calculate_mode(header)
180
181 file_type = c_mode & 0o770000
182 sticky_bit = c_mode & 0o7000
183
184 # heuristics 3: check mode field
185 is_valid = file_type in C_FILE_TYPES and sticky_bit in C_STICKY_BITS
186 if not is_valid:
187 raise InvalidInputFormat("CPIO entry mode is invalid.")
188
189 if self.valid_checksum(header, current_offset):
190 self.entries.append(
191 CPIOEntry(
192 start_offset=current_offset,
193 size=c_filesize,
194 dev=self._calculate_dev(header),
195 mode=c_mode,
196 rdev=self._calculate_rdev(header),
197 path=Path(filename),
198 )
199 )
200 else:
201 logger.warning("Invalid CRC for CPIO entry, skipping.", header=header)
202
203 current_offset += self._pad_content(c_filesize)
204
205 self.end_offset = self._pad_file(current_offset)
206 if self.start_offset == self.end_offset:
207 raise InvalidInputFormat("Invalid CPIO archive.")
208
209 def dump_entries(self, fs: FileSystem):
210 for entry in self.entries:
211 # skip entries with "." as filename
212 if entry.path.name in ("", "."):
213 continue
214
215 # There are cases where CPIO archives have duplicated entries
216 # We then unlink the files to overwrite them and avoid an error.
217 if not stat.S_ISDIR(entry.mode):
218 fs.unlink(entry.path)
219
220 if stat.S_ISREG(entry.mode):
221 fs.carve(entry.path, self.file, entry.start_offset, entry.size)
222 elif stat.S_ISDIR(entry.mode):
223 fs.mkdir(
224 entry.path, mode=entry.mode & 0o777, parents=True, exist_ok=True
225 )
226 elif stat.S_ISLNK(entry.mode):
227 link_path = Path(
228 snull(
229 self.file[entry.start_offset : entry.start_offset + entry.size]
230 ).decode("utf-8")
231 )
232 fs.create_symlink(src=link_path, dst=entry.path)
233 elif (
234 stat.S_ISCHR(entry.mode)
235 or stat.S_ISBLK(entry.mode)
236 or stat.S_ISSOCK(entry.mode)
237 or stat.S_ISSOCK(entry.mode)
238 ):
239 fs.mknod(entry.path, mode=entry.mode & 0o777, device=entry.rdev)
240 else:
241 logger.warning("unknown file type in CPIO archive")
242
243 def _pad_file(self, end_offset: int) -> int:
244 """CPIO archives can have a 512 bytes block padding at the end."""
245 self.file.seek(end_offset, io.SEEK_SET)
246 padded_end_offset = self.start_offset + round_up(
247 size=end_offset - self.start_offset, alignment=self._FILE_PAD_ALIGN
248 )
249 padding_size = padded_end_offset - end_offset
250
251 if self.file.read(padding_size) == bytes([0]) * padding_size:
252 return padded_end_offset
253
254 return end_offset
255
256 @classmethod
257 def _pad_header(cls, header, c_namesize: int) -> int:
258 return round_up(len(header) + c_namesize, cls._PAD_ALIGN)
259
260 @classmethod
261 def _pad_content(cls, c_filesize: int) -> int:
262 """Pad header and content with _PAD_ALIGN bytes."""
263 return round_up(c_filesize, cls._PAD_ALIGN)
264
265 @staticmethod
266 def _calculate_file_size(header) -> int:
267 raise NotImplementedError
268
269 @staticmethod
270 def _calculate_name_size(header) -> int:
271 raise NotImplementedError
272
273 @staticmethod
274 def _calculate_mode(header) -> int:
275 raise NotImplementedError
276
277 @staticmethod
278 def _calculate_dev(header) -> int:
279 raise NotImplementedError
280
281 @staticmethod
282 def _calculate_rdev(header) -> int:
283 raise NotImplementedError
284
285 def valid_checksum(self, header, start_offset: int) -> bool: # noqa: ARG002
286 return True
287
288
289class BinaryCPIOParser(CPIOParserBase):
290 _PAD_ALIGN = 2
291
292 HEADER_STRUCT = "old_cpio_header_t"
293
294 @staticmethod
295 def _calculate_file_size(header) -> int:
296 return header.c_filesize[0] << 16 | header.c_filesize[1]
297
298 @staticmethod
299 def _calculate_name_size(header) -> int:
300 return header.c_namesize + 1 if header.c_namesize % 2 else header.c_namesize
301
302 @staticmethod
303 def _calculate_mode(header) -> int:
304 return header.c_mode
305
306 @staticmethod
307 def _calculate_dev(header) -> int:
308 return header.c_dev
309
310 @staticmethod
311 def _calculate_rdev(header) -> int:
312 return header.c_rdev
313
314
315class PortableOldASCIIParser(CPIOParserBase):
316 _PAD_ALIGN = 1
317
318 HEADER_STRUCT = "old_ascii_header_t"
319
320 @staticmethod
321 def _calculate_file_size(header) -> int:
322 return decode_int(header.c_filesize, 8)
323
324 @staticmethod
325 def _calculate_name_size(header) -> int:
326 return decode_int(header.c_namesize, 8)
327
328 @staticmethod
329 def _calculate_mode(header) -> int:
330 return decode_int(header.c_mode, 8)
331
332 @staticmethod
333 def _calculate_dev(header) -> int:
334 return decode_int(header.c_dev, 8)
335
336 @staticmethod
337 def _calculate_rdev(header) -> int:
338 return decode_int(header.c_rdev, 8)
339
340
341class PortableASCIIParser(CPIOParserBase):
342 _PAD_ALIGN = 4
343 HEADER_STRUCT = "new_ascii_header_t"
344
345 @staticmethod
346 def _calculate_file_size(header) -> int:
347 return decode_int(header.c_filesize, 16)
348
349 @staticmethod
350 def _calculate_name_size(header) -> int:
351 return decode_int(header.c_namesize, 16)
352
353 @staticmethod
354 def _calculate_mode(header) -> int:
355 return decode_int(header.c_mode, 16)
356
357 @staticmethod
358 def _calculate_dev(header) -> int:
359 return os.makedev(
360 decode_int(header.c_dev_maj, 16), decode_int(header.c_dev_min, 16)
361 )
362
363 @staticmethod
364 def _calculate_rdev(header) -> int:
365 return os.makedev(
366 decode_int(header.c_rdev_maj, 16), decode_int(header.c_rdev_min, 16)
367 )
368
369
370class PortableASCIIWithCRCParser(PortableASCIIParser):
371 def valid_checksum(self, header, start_offset: int) -> bool:
372 header_checksum = decode_int(header.c_chksum, 16)
373 calculated_checksum = 0
374 file_size = self._calculate_file_size(header)
375
376 for chunk in iterate_file(self.file, start_offset, file_size):
377 calculated_checksum += sum(bytearray(chunk))
378 return header_checksum == calculated_checksum & 0xFF_FF_FF_FF
379
380
381class _CPIOExtractorBase(Extractor):
382 PARSER: type[CPIOParserBase]
383
384 def extract(self, inpath: Path, outdir: Path) -> Optional[ExtractResult]:
385 fs = FileSystem(outdir)
386
387 with File.from_path(inpath) as file:
388 parser = self.PARSER(file, 0)
389 parser.parse()
390 parser.dump_entries(fs)
391
392
393class BinaryCPIOExtractor(_CPIOExtractorBase):
394 PARSER = BinaryCPIOParser
395
396
397class PortableOldASCIIExtractor(_CPIOExtractorBase):
398 PARSER = PortableOldASCIIParser
399
400
401class PortableASCIIExtractor(_CPIOExtractorBase):
402 PARSER = PortableASCIIParser
403
404
405class PortableASCIIWithCRCExtractor(_CPIOExtractorBase):
406 PARSER = PortableASCIIWithCRCParser
407
408
409class _CPIOHandlerBase(Handler):
410 """A common base for all CPIO formats.
411
412 The format should be parsed the same, there are small differences how to calculate
413 file and filename sizes padding and conversion from octal / hex.
414 """
415
416 EXTRACTOR: _CPIOExtractorBase
417
418 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]:
419 parser = self.EXTRACTOR.PARSER(file, start_offset)
420 parser.parse()
421 return ValidChunk(
422 start_offset=start_offset,
423 end_offset=parser.end_offset,
424 )
425
426
427class BinaryHandler(_CPIOHandlerBase):
428 NAME = "cpio_binary"
429 PATTERNS = [HexString("c7 71 // (default, bin, hpbin)")]
430
431 EXTRACTOR = BinaryCPIOExtractor()
432
433 DOC = HandlerDoc(
434 name="CPIO (binary)",
435 description="CPIO (Copy In, Copy Out) is an archive file format used for bundling files and directories along with their metadata. It is commonly used in Unix-like systems for creating backups or transferring files, and supports various encoding formats including binary and ASCII.",
436 handler_type=HandlerType.ARCHIVE,
437 vendor=None,
438 references=[
439 Reference(
440 title="GNU CPIO Manual",
441 url="https://www.gnu.org/software/cpio/manual/cpio.html",
442 ),
443 ],
444 limitations=[],
445 )
446
447
448class PortableOldASCIIHandler(_CPIOHandlerBase):
449 NAME = "cpio_portable_old_ascii"
450
451 PATTERNS = [HexString("30 37 30 37 30 37 // 07 07 07")]
452
453 EXTRACTOR = PortableOldASCIIExtractor()
454
455 DOC = HandlerDoc(
456 name="CPIO (portable old ASCII)",
457 description="CPIO (Copy In, Copy Out) is an archive file format used for bundling files and directories along with their metadata. It is commonly used in Unix-like systems for creating backups or transferring files, and supports various encoding formats including binary and ASCII.",
458 handler_type=HandlerType.ARCHIVE,
459 vendor=None,
460 references=[
461 Reference(
462 title="GNU CPIO Manual",
463 url="https://www.gnu.org/software/cpio/manual/cpio.html",
464 ),
465 ],
466 limitations=[],
467 )
468
469
470class PortableASCIIHandler(_CPIOHandlerBase):
471 NAME = "cpio_portable_ascii"
472 PATTERNS = [HexString("30 37 30 37 30 31 // 07 07 01 (newc)")]
473
474 EXTRACTOR = PortableASCIIExtractor()
475
476 DOC = HandlerDoc(
477 name="CPIO (portable ASCII)",
478 description="CPIO (Copy In, Copy Out) is an archive file format used for bundling files and directories along with their metadata. It is commonly used in Unix-like systems for creating backups or transferring files, and supports various encoding formats including binary and ASCII.",
479 handler_type=HandlerType.ARCHIVE,
480 vendor=None,
481 references=[
482 Reference(
483 title="GNU CPIO Manual",
484 url="https://www.gnu.org/software/cpio/manual/cpio.html",
485 ),
486 ],
487 limitations=[],
488 )
489
490
491class PortableASCIIWithCRCHandler(_CPIOHandlerBase):
492 NAME = "cpio_portable_ascii_crc"
493 PATTERNS = [HexString("30 37 30 37 30 32 // 07 07 02")]
494
495 EXTRACTOR = PortableASCIIWithCRCExtractor()
496
497 DOC = HandlerDoc(
498 name="CPIO (portable ASCII CRC)",
499 description="CPIO (Copy In, Copy Out) is an archive file format used for bundling files and directories along with their metadata. It is commonly used in Unix-like systems for creating backups or transferring files, and supports various encoding formats including binary and ASCII.",
500 handler_type=HandlerType.ARCHIVE,
501 vendor=None,
502 references=[
503 Reference(
504 title="GNU CPIO Manual",
505 url="https://www.gnu.org/software/cpio/manual/cpio.html",
506 ),
507 ],
508 limitations=[],
509 )