1import io
2import os
3import stat
4from pathlib import Path
5
6import attrs
7from structlog import get_logger
8
9from ...file_utils import (
10 Endian,
11 FileSystem,
12 InvalidInputFormat,
13 StructParser,
14 decode_int,
15 iterate_file,
16 round_up,
17 snull,
18)
19from ...models import (
20 Extractor,
21 ExtractResult,
22 File,
23 Handler,
24 HandlerDoc,
25 HandlerType,
26 HexString,
27 Reference,
28 ValidChunk,
29)
30
31logger = get_logger()
32
33CPIO_TRAILER_NAME = "TRAILER!!!"
34MAX_LINUX_PATH_LENGTH = 0x1000
35
36C_ISBLK = 0o60000
37C_ISCHR = 0o20000
38C_ISDIR = 0o40000
39C_ISFIFO = 0o10000
40C_ISSOCK = 0o140000
41C_ISLNK = 0o120000
42C_ISCTG = 0o110000
43C_ISREG = 0o100000
44
45C_FILE_TYPES = (
46 C_ISBLK,
47 C_ISCHR,
48 C_ISDIR,
49 C_ISFIFO,
50 C_ISSOCK,
51 C_ISLNK,
52 C_ISCTG,
53 C_ISREG,
54)
55
56C_NONE = 0o00000
57C_ISUID = 0o04000
58C_ISGID = 0o02000
59C_ISVTX = 0o01000
60C_ISUID_ISGID = 0o06000
61
62C_STICKY_BITS = (C_NONE, C_ISUID, C_ISGID, C_ISVTX, C_ISUID_ISGID)
63
64C_DEFINITIONS = r"""
65 typedef struct old_cpio_header
66 {
67 uint16 c_magic;
68 uint16 c_dev;
69 uint16 c_ino;
70 uint16 c_mode;
71 uint16 c_uid;
72 uint16 c_gid;
73 uint16 c_nlink;
74 uint16 c_rdev;
75 uint16 c_mtimes[2];
76 uint16 c_namesize;
77 uint16 c_filesize[2];
78 } old_cpio_header_t;
79
80 typedef struct old_ascii_header
81 {
82 char c_magic[6];
83 char c_dev[6];
84 char c_ino[6];
85 char c_mode[6];
86 char c_uid[6];
87 char c_gid[6];
88 char c_nlink[6];
89 char c_rdev[6];
90 char c_mtime[11];
91 char c_namesize[6];
92 char c_filesize[11];
93 } old_ascii_header_t;
94
95 typedef struct new_ascii_header
96 {
97 char c_magic[6];
98 char c_ino[8];
99 char c_mode[8];
100 char c_uid[8];
101 char c_gid[8];
102 char c_nlink[8];
103 char c_mtime[8];
104 char c_filesize[8];
105 char c_dev_maj[8];
106 char c_dev_min[8];
107 char c_rdev_maj[8];
108 char c_rdev_min[8];
109 char c_namesize[8];
110 char c_chksum[8];
111 } new_ascii_header_t;
112"""
113
114
115@attrs.define
116class CPIOEntry:
117 start_offset: int
118 size: int
119 dev: int
120 mode: int
121 rdev: int
122 path: Path
123
124
125class CPIOParserBase:
126 _PAD_ALIGN: int
127 _FILE_PAD_ALIGN: int = 512
128 HEADER_STRUCT: str
129
130 def __init__(self, file: File, start_offset: int):
131 self.file = file
132 self.start_offset = start_offset
133 self.end_offset = -1
134 self.entries = []
135 self.struct_parser = StructParser(C_DEFINITIONS)
136
137 def parse(self): # noqa: C901
138 current_offset = self.start_offset
139 while True:
140 self.file.seek(current_offset, io.SEEK_SET)
141 try:
142 header = self.struct_parser.parse(
143 self.HEADER_STRUCT, self.file, Endian.LITTLE
144 )
145 except EOFError:
146 break
147
148 c_filesize = self._calculate_file_size(header)
149 c_namesize = self._calculate_name_size(header)
150
151 # heuristics 1: check the filename
152 if c_namesize > MAX_LINUX_PATH_LENGTH:
153 raise InvalidInputFormat("CPIO entry filename is too long.")
154
155 if c_namesize <= 0:
156 raise InvalidInputFormat("CPIO entry filename size is invalid.")
157
158 if c_filesize < 0:
159 raise InvalidInputFormat("CPIO entry file size is invalid.")
160
161 padded_header_size = self._pad_header(header, c_namesize)
162 current_offset += padded_header_size
163
164 tmp_filename = self.file.read(c_namesize)
165
166 # heuristics 2: check that filename is null-byte terminated
167 if not tmp_filename.endswith(b"\x00"):
168 raise InvalidInputFormat(
169 "CPIO entry filename is not null-byte terminated"
170 )
171
172 try:
173 filename = snull(tmp_filename).decode("utf-8")
174 except UnicodeDecodeError as e:
175 raise InvalidInputFormat from e
176
177 if filename == CPIO_TRAILER_NAME:
178 current_offset += self._pad_content(c_filesize)
179 break
180
181 c_mode = self._calculate_mode(header)
182
183 file_type = c_mode & 0o770000
184 sticky_bit = c_mode & 0o7000
185
186 # heuristics 3: check mode field
187 is_valid = file_type in C_FILE_TYPES and sticky_bit in C_STICKY_BITS
188 if not is_valid:
189 raise InvalidInputFormat("CPIO entry mode is invalid.")
190
191 if self.valid_checksum(header, current_offset):
192 self.entries.append(
193 CPIOEntry(
194 start_offset=current_offset,
195 size=c_filesize,
196 dev=self._calculate_dev(header),
197 mode=c_mode,
198 rdev=self._calculate_rdev(header),
199 path=Path(filename),
200 )
201 )
202 else:
203 logger.warning("Invalid CRC for CPIO entry, skipping.", header=header)
204
205 current_offset += self._pad_content(c_filesize)
206
207 self.end_offset = self._pad_file(current_offset)
208 if self.start_offset == self.end_offset:
209 raise InvalidInputFormat("Invalid CPIO archive.")
210
211 def dump_entries(self, fs: FileSystem):
212 for entry in self.entries:
213 # skip entries with "." as filename
214 if entry.path.name in ("", "."):
215 continue
216
217 # There are cases where CPIO archives have duplicated entries
218 # We then unlink the files to overwrite them and avoid an error.
219 if not stat.S_ISDIR(entry.mode):
220 fs.unlink(entry.path)
221
222 if stat.S_ISREG(entry.mode):
223 fs.carve(entry.path, self.file, entry.start_offset, entry.size)
224 elif stat.S_ISDIR(entry.mode):
225 fs.mkdir(
226 entry.path, mode=entry.mode & 0o777, parents=True, exist_ok=True
227 )
228 elif stat.S_ISLNK(entry.mode):
229 link_path = Path(
230 snull(
231 self.file[entry.start_offset : entry.start_offset + entry.size]
232 ).decode("utf-8")
233 )
234 fs.create_symlink(src=link_path, dst=entry.path)
235 elif (
236 stat.S_ISCHR(entry.mode)
237 or stat.S_ISBLK(entry.mode)
238 or stat.S_ISSOCK(entry.mode)
239 or stat.S_ISSOCK(entry.mode)
240 ):
241 fs.mknod(entry.path, mode=entry.mode & 0o777, device=entry.rdev)
242 else:
243 logger.warning("unknown file type in CPIO archive")
244
245 def _pad_file(self, end_offset: int) -> int:
246 """CPIO archives can have a 512 bytes block padding at the end."""
247 self.file.seek(end_offset, io.SEEK_SET)
248 padded_end_offset = self.start_offset + round_up(
249 size=end_offset - self.start_offset, alignment=self._FILE_PAD_ALIGN
250 )
251 padding_size = padded_end_offset - end_offset
252
253 if self.file.read(padding_size) == bytes([0]) * padding_size:
254 return padded_end_offset
255
256 return end_offset
257
258 @classmethod
259 def _pad_header(cls, header, c_namesize: int) -> int:
260 return round_up(len(header) + c_namesize, cls._PAD_ALIGN)
261
262 @classmethod
263 def _pad_content(cls, c_filesize: int) -> int:
264 """Pad header and content with _PAD_ALIGN bytes."""
265 return round_up(c_filesize, cls._PAD_ALIGN)
266
267 @staticmethod
268 def _calculate_file_size(header) -> int:
269 raise NotImplementedError
270
271 @staticmethod
272 def _calculate_name_size(header) -> int:
273 raise NotImplementedError
274
275 @staticmethod
276 def _calculate_mode(header) -> int:
277 raise NotImplementedError
278
279 @staticmethod
280 def _calculate_dev(header) -> int:
281 raise NotImplementedError
282
283 @staticmethod
284 def _calculate_rdev(header) -> int:
285 raise NotImplementedError
286
287 def valid_checksum(self, header, start_offset: int) -> bool: # noqa: ARG002
288 return True
289
290
291class BinaryCPIOParser(CPIOParserBase):
292 _PAD_ALIGN = 2
293
294 HEADER_STRUCT = "old_cpio_header_t"
295
296 @staticmethod
297 def _calculate_file_size(header) -> int:
298 return header.c_filesize[0] << 16 | header.c_filesize[1]
299
300 @staticmethod
301 def _calculate_name_size(header) -> int:
302 return header.c_namesize + 1 if header.c_namesize % 2 else header.c_namesize
303
304 @staticmethod
305 def _calculate_mode(header) -> int:
306 return header.c_mode
307
308 @staticmethod
309 def _calculate_dev(header) -> int:
310 return header.c_dev
311
312 @staticmethod
313 def _calculate_rdev(header) -> int:
314 return header.c_rdev
315
316
317class PortableOldASCIIParser(CPIOParserBase):
318 _PAD_ALIGN = 1
319
320 HEADER_STRUCT = "old_ascii_header_t"
321
322 @staticmethod
323 def _calculate_file_size(header) -> int:
324 return decode_int(header.c_filesize, 8)
325
326 @staticmethod
327 def _calculate_name_size(header) -> int:
328 return decode_int(header.c_namesize, 8)
329
330 @staticmethod
331 def _calculate_mode(header) -> int:
332 return decode_int(header.c_mode, 8)
333
334 @staticmethod
335 def _calculate_dev(header) -> int:
336 return decode_int(header.c_dev, 8)
337
338 @staticmethod
339 def _calculate_rdev(header) -> int:
340 return decode_int(header.c_rdev, 8)
341
342
343class PortableASCIIParser(CPIOParserBase):
344 _PAD_ALIGN = 4
345 HEADER_STRUCT = "new_ascii_header_t"
346
347 @staticmethod
348 def _calculate_file_size(header) -> int:
349 return decode_int(header.c_filesize, 16)
350
351 @staticmethod
352 def _calculate_name_size(header) -> int:
353 return decode_int(header.c_namesize, 16)
354
355 @staticmethod
356 def _calculate_mode(header) -> int:
357 return decode_int(header.c_mode, 16)
358
359 @staticmethod
360 def _calculate_dev(header) -> int:
361 return os.makedev(
362 decode_int(header.c_dev_maj, 16), decode_int(header.c_dev_min, 16)
363 )
364
365 @staticmethod
366 def _calculate_rdev(header) -> int:
367 return os.makedev(
368 decode_int(header.c_rdev_maj, 16), decode_int(header.c_rdev_min, 16)
369 )
370
371
372class PortableASCIIWithCRCParser(PortableASCIIParser):
373 def valid_checksum(self, header, start_offset: int) -> bool:
374 header_checksum = decode_int(header.c_chksum, 16)
375 calculated_checksum = 0
376 file_size = self._calculate_file_size(header)
377
378 for chunk in iterate_file(self.file, start_offset, file_size):
379 calculated_checksum += sum(bytearray(chunk))
380 return header_checksum == calculated_checksum & 0xFF_FF_FF_FF
381
382
383class _CPIOExtractorBase(Extractor):
384 PARSER: type[CPIOParserBase]
385
386 def extract(self, inpath: Path, outdir: Path) -> ExtractResult | None:
387 fs = FileSystem(outdir)
388
389 with File.from_path(inpath) as file:
390 parser = self.PARSER(file, 0)
391 parser.parse()
392 parser.dump_entries(fs)
393
394
395class BinaryCPIOExtractor(_CPIOExtractorBase):
396 PARSER = BinaryCPIOParser
397
398
399class PortableOldASCIIExtractor(_CPIOExtractorBase):
400 PARSER = PortableOldASCIIParser
401
402
403class PortableASCIIExtractor(_CPIOExtractorBase):
404 PARSER = PortableASCIIParser
405
406
407class PortableASCIIWithCRCExtractor(_CPIOExtractorBase):
408 PARSER = PortableASCIIWithCRCParser
409
410
411class _CPIOHandlerBase(Handler):
412 """A common base for all CPIO formats.
413
414 The format should be parsed the same, there are small differences how to calculate
415 file and filename sizes padding and conversion from octal / hex.
416 """
417
418 EXTRACTOR: _CPIOExtractorBase
419
420 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None:
421 parser = self.EXTRACTOR.PARSER(file, start_offset)
422 parser.parse()
423 return ValidChunk(
424 start_offset=start_offset,
425 end_offset=parser.end_offset,
426 )
427
428
429class BinaryHandler(_CPIOHandlerBase):
430 NAME = "cpio_binary"
431 PATTERNS = [HexString("c7 71 // (default, bin, hpbin)")]
432
433 EXTRACTOR = BinaryCPIOExtractor()
434
435 DOC = HandlerDoc(
436 name="CPIO (binary)",
437 description="CPIO (Copy In, Copy Out) is an archive file format used for bundling files and directories along with their metadata. It is commonly used in Unix-like systems for creating backups or transferring files, and supports various encoding formats including binary and ASCII.",
438 handler_type=HandlerType.ARCHIVE,
439 vendor=None,
440 references=[
441 Reference(
442 title="GNU CPIO Manual",
443 url="https://www.gnu.org/software/cpio/manual/cpio.html",
444 ),
445 ],
446 limitations=[],
447 )
448
449
450class PortableOldASCIIHandler(_CPIOHandlerBase):
451 NAME = "cpio_portable_old_ascii"
452
453 PATTERNS = [HexString("30 37 30 37 30 37 // 07 07 07")]
454
455 EXTRACTOR = PortableOldASCIIExtractor()
456
457 DOC = HandlerDoc(
458 name="CPIO (portable old ASCII)",
459 description="CPIO (Copy In, Copy Out) is an archive file format used for bundling files and directories along with their metadata. It is commonly used in Unix-like systems for creating backups or transferring files, and supports various encoding formats including binary and ASCII.",
460 handler_type=HandlerType.ARCHIVE,
461 vendor=None,
462 references=[
463 Reference(
464 title="GNU CPIO Manual",
465 url="https://www.gnu.org/software/cpio/manual/cpio.html",
466 ),
467 ],
468 limitations=[],
469 )
470
471
472class PortableASCIIHandler(_CPIOHandlerBase):
473 NAME = "cpio_portable_ascii"
474 PATTERNS = [HexString("30 37 30 37 30 31 // 07 07 01 (newc)")]
475
476 EXTRACTOR = PortableASCIIExtractor()
477
478 DOC = HandlerDoc(
479 name="CPIO (portable ASCII)",
480 description="CPIO (Copy In, Copy Out) is an archive file format used for bundling files and directories along with their metadata. It is commonly used in Unix-like systems for creating backups or transferring files, and supports various encoding formats including binary and ASCII.",
481 handler_type=HandlerType.ARCHIVE,
482 vendor=None,
483 references=[
484 Reference(
485 title="GNU CPIO Manual",
486 url="https://www.gnu.org/software/cpio/manual/cpio.html",
487 ),
488 ],
489 limitations=[],
490 )
491
492
493class PortableASCIIWithCRCHandler(_CPIOHandlerBase):
494 NAME = "cpio_portable_ascii_crc"
495 PATTERNS = [HexString("30 37 30 37 30 32 // 07 07 02")]
496
497 EXTRACTOR = PortableASCIIWithCRCExtractor()
498
499 DOC = HandlerDoc(
500 name="CPIO (portable ASCII CRC)",
501 description="CPIO (Copy In, Copy Out) is an archive file format used for bundling files and directories along with their metadata. It is commonly used in Unix-like systems for creating backups or transferring files, and supports various encoding formats including binary and ASCII.",
502 handler_type=HandlerType.ARCHIVE,
503 vendor=None,
504 references=[
505 Reference(
506 title="GNU CPIO Manual",
507 url="https://www.gnu.org/software/cpio/manual/cpio.html",
508 ),
509 ],
510 limitations=[],
511 )