1import io
2import os
3import stat
4from pathlib import Path
5
6import attrs
7from structlog import get_logger
8
9from ...file_utils import (
10 Endian,
11 FileSystem,
12 InvalidInputFormat,
13 StructParser,
14 decode_int,
15 iterate_file,
16 round_up,
17 snull,
18)
19from ...models import (
20 Extractor,
21 ExtractResult,
22 File,
23 Handler,
24 HandlerDoc,
25 HandlerType,
26 HexString,
27 Reference,
28 ValidChunk,
29)
30
31logger = get_logger()
32
33CPIO_TRAILER_NAME = "TRAILER!!!"
34MAX_LINUX_PATH_LENGTH = 0x1000
35
36C_ISBLK = 0o60000
37C_ISCHR = 0o20000
38C_ISDIR = 0o40000
39C_ISFIFO = 0o10000
40C_ISSOCK = 0o140000
41C_ISLNK = 0o120000
42C_ISCTG = 0o110000
43C_ISREG = 0o100000
44
45C_FILE_TYPES = (
46 C_ISBLK,
47 C_ISCHR,
48 C_ISDIR,
49 C_ISFIFO,
50 C_ISSOCK,
51 C_ISLNK,
52 C_ISCTG,
53 C_ISREG,
54)
55
56C_NONE = 0o00000
57C_ISUID = 0o04000
58C_ISGID = 0o02000
59C_ISVTX = 0o01000
60C_ISUID_ISGID = 0o06000
61
62C_STICKY_BITS = (C_NONE, C_ISUID, C_ISGID, C_ISVTX, C_ISUID_ISGID)
63
64C_DEFINITIONS = r"""
65 typedef struct old_cpio_header
66 {
67 uint16 c_magic;
68 uint16 c_dev;
69 uint16 c_ino;
70 uint16 c_mode;
71 uint16 c_uid;
72 uint16 c_gid;
73 uint16 c_nlink;
74 uint16 c_rdev;
75 uint16 c_mtimes[2];
76 uint16 c_namesize;
77 uint16 c_filesize[2];
78 } old_cpio_header_t;
79
80 typedef struct old_ascii_header
81 {
82 char c_magic[6];
83 char c_dev[6];
84 char c_ino[6];
85 char c_mode[6];
86 char c_uid[6];
87 char c_gid[6];
88 char c_nlink[6];
89 char c_rdev[6];
90 char c_mtime[11];
91 char c_namesize[6];
92 char c_filesize[11];
93 } old_ascii_header_t;
94
95 typedef struct new_ascii_header
96 {
97 char c_magic[6];
98 char c_ino[8];
99 char c_mode[8];
100 char c_uid[8];
101 char c_gid[8];
102 char c_nlink[8];
103 char c_mtime[8];
104 char c_filesize[8];
105 char c_dev_maj[8];
106 char c_dev_min[8];
107 char c_rdev_maj[8];
108 char c_rdev_min[8];
109 char c_namesize[8];
110 char c_chksum[8];
111 } new_ascii_header_t;
112"""
113
114
115@attrs.define
116class CPIOEntry:
117 start_offset: int
118 size: int
119 dev: int
120 mode: int
121 rdev: int
122 path: Path
123
124
125class CPIOParserBase:
126 _PAD_ALIGN: int
127 _FILE_PAD_ALIGN: int = 512
128 HEADER_STRUCT: str
129
130 def __init__(self, file: File, start_offset: int):
131 self.file = file
132 self.start_offset = start_offset
133 self.end_offset = -1
134 self.entries = []
135 self.struct_parser = StructParser(C_DEFINITIONS)
136
137 def parse(self): # noqa: C901
138 current_offset = self.start_offset
139 while True:
140 self.file.seek(current_offset, io.SEEK_SET)
141 try:
142 header = self.struct_parser.parse(
143 self.HEADER_STRUCT, self.file, Endian.LITTLE
144 )
145 except EOFError:
146 break
147
148 c_filesize = self._calculate_file_size(header)
149 c_namesize = self._calculate_name_size(header)
150
151 # heuristics 1: check the filename
152 if c_namesize > MAX_LINUX_PATH_LENGTH:
153 raise InvalidInputFormat("CPIO entry filename is too long.")
154
155 if c_namesize == 0:
156 raise InvalidInputFormat("CPIO entry filename empty.")
157
158 padded_header_size = self._pad_header(header, c_namesize)
159 current_offset += padded_header_size
160
161 tmp_filename = self.file.read(c_namesize)
162
163 # heuristics 2: check that filename is null-byte terminated
164 if not tmp_filename.endswith(b"\x00"):
165 raise InvalidInputFormat(
166 "CPIO entry filename is not null-byte terminated"
167 )
168
169 try:
170 filename = snull(tmp_filename).decode("utf-8")
171 except UnicodeDecodeError as e:
172 raise InvalidInputFormat from e
173
174 if filename == CPIO_TRAILER_NAME:
175 current_offset += self._pad_content(c_filesize)
176 break
177
178 c_mode = self._calculate_mode(header)
179
180 file_type = c_mode & 0o770000
181 sticky_bit = c_mode & 0o7000
182
183 # heuristics 3: check mode field
184 is_valid = file_type in C_FILE_TYPES and sticky_bit in C_STICKY_BITS
185 if not is_valid:
186 raise InvalidInputFormat("CPIO entry mode is invalid.")
187
188 if self.valid_checksum(header, current_offset):
189 self.entries.append(
190 CPIOEntry(
191 start_offset=current_offset,
192 size=c_filesize,
193 dev=self._calculate_dev(header),
194 mode=c_mode,
195 rdev=self._calculate_rdev(header),
196 path=Path(filename),
197 )
198 )
199 else:
200 logger.warning("Invalid CRC for CPIO entry, skipping.", header=header)
201
202 current_offset += self._pad_content(c_filesize)
203
204 self.end_offset = self._pad_file(current_offset)
205 if self.start_offset == self.end_offset:
206 raise InvalidInputFormat("Invalid CPIO archive.")
207
208 def dump_entries(self, fs: FileSystem):
209 for entry in self.entries:
210 # skip entries with "." as filename
211 if entry.path.name in ("", "."):
212 continue
213
214 # There are cases where CPIO archives have duplicated entries
215 # We then unlink the files to overwrite them and avoid an error.
216 if not stat.S_ISDIR(entry.mode):
217 fs.unlink(entry.path)
218
219 if stat.S_ISREG(entry.mode):
220 fs.carve(entry.path, self.file, entry.start_offset, entry.size)
221 elif stat.S_ISDIR(entry.mode):
222 fs.mkdir(
223 entry.path, mode=entry.mode & 0o777, parents=True, exist_ok=True
224 )
225 elif stat.S_ISLNK(entry.mode):
226 link_path = Path(
227 snull(
228 self.file[entry.start_offset : entry.start_offset + entry.size]
229 ).decode("utf-8")
230 )
231 fs.create_symlink(src=link_path, dst=entry.path)
232 elif (
233 stat.S_ISCHR(entry.mode)
234 or stat.S_ISBLK(entry.mode)
235 or stat.S_ISSOCK(entry.mode)
236 or stat.S_ISSOCK(entry.mode)
237 ):
238 fs.mknod(entry.path, mode=entry.mode & 0o777, device=entry.rdev)
239 else:
240 logger.warning("unknown file type in CPIO archive")
241
242 def _pad_file(self, end_offset: int) -> int:
243 """CPIO archives can have a 512 bytes block padding at the end."""
244 self.file.seek(end_offset, io.SEEK_SET)
245 padded_end_offset = self.start_offset + round_up(
246 size=end_offset - self.start_offset, alignment=self._FILE_PAD_ALIGN
247 )
248 padding_size = padded_end_offset - end_offset
249
250 if self.file.read(padding_size) == bytes([0]) * padding_size:
251 return padded_end_offset
252
253 return end_offset
254
255 @classmethod
256 def _pad_header(cls, header, c_namesize: int) -> int:
257 return round_up(len(header) + c_namesize, cls._PAD_ALIGN)
258
259 @classmethod
260 def _pad_content(cls, c_filesize: int) -> int:
261 """Pad header and content with _PAD_ALIGN bytes."""
262 return round_up(c_filesize, cls._PAD_ALIGN)
263
264 @staticmethod
265 def _calculate_file_size(header) -> int:
266 raise NotImplementedError
267
268 @staticmethod
269 def _calculate_name_size(header) -> int:
270 raise NotImplementedError
271
272 @staticmethod
273 def _calculate_mode(header) -> int:
274 raise NotImplementedError
275
276 @staticmethod
277 def _calculate_dev(header) -> int:
278 raise NotImplementedError
279
280 @staticmethod
281 def _calculate_rdev(header) -> int:
282 raise NotImplementedError
283
284 def valid_checksum(self, header, start_offset: int) -> bool: # noqa: ARG002
285 return True
286
287
288class BinaryCPIOParser(CPIOParserBase):
289 _PAD_ALIGN = 2
290
291 HEADER_STRUCT = "old_cpio_header_t"
292
293 @staticmethod
294 def _calculate_file_size(header) -> int:
295 return header.c_filesize[0] << 16 | header.c_filesize[1]
296
297 @staticmethod
298 def _calculate_name_size(header) -> int:
299 return header.c_namesize + 1 if header.c_namesize % 2 else header.c_namesize
300
301 @staticmethod
302 def _calculate_mode(header) -> int:
303 return header.c_mode
304
305 @staticmethod
306 def _calculate_dev(header) -> int:
307 return header.c_dev
308
309 @staticmethod
310 def _calculate_rdev(header) -> int:
311 return header.c_rdev
312
313
314class PortableOldASCIIParser(CPIOParserBase):
315 _PAD_ALIGN = 1
316
317 HEADER_STRUCT = "old_ascii_header_t"
318
319 @staticmethod
320 def _calculate_file_size(header) -> int:
321 return decode_int(header.c_filesize, 8)
322
323 @staticmethod
324 def _calculate_name_size(header) -> int:
325 return decode_int(header.c_namesize, 8)
326
327 @staticmethod
328 def _calculate_mode(header) -> int:
329 return decode_int(header.c_mode, 8)
330
331 @staticmethod
332 def _calculate_dev(header) -> int:
333 return decode_int(header.c_dev, 8)
334
335 @staticmethod
336 def _calculate_rdev(header) -> int:
337 return decode_int(header.c_rdev, 8)
338
339
340class PortableASCIIParser(CPIOParserBase):
341 _PAD_ALIGN = 4
342 HEADER_STRUCT = "new_ascii_header_t"
343
344 @staticmethod
345 def _calculate_file_size(header) -> int:
346 return decode_int(header.c_filesize, 16)
347
348 @staticmethod
349 def _calculate_name_size(header) -> int:
350 return decode_int(header.c_namesize, 16)
351
352 @staticmethod
353 def _calculate_mode(header) -> int:
354 return decode_int(header.c_mode, 16)
355
356 @staticmethod
357 def _calculate_dev(header) -> int:
358 return os.makedev(
359 decode_int(header.c_dev_maj, 16), decode_int(header.c_dev_min, 16)
360 )
361
362 @staticmethod
363 def _calculate_rdev(header) -> int:
364 return os.makedev(
365 decode_int(header.c_rdev_maj, 16), decode_int(header.c_rdev_min, 16)
366 )
367
368
369class PortableASCIIWithCRCParser(PortableASCIIParser):
370 def valid_checksum(self, header, start_offset: int) -> bool:
371 header_checksum = decode_int(header.c_chksum, 16)
372 calculated_checksum = 0
373 file_size = self._calculate_file_size(header)
374
375 for chunk in iterate_file(self.file, start_offset, file_size):
376 calculated_checksum += sum(bytearray(chunk))
377 return header_checksum == calculated_checksum & 0xFF_FF_FF_FF
378
379
380class _CPIOExtractorBase(Extractor):
381 PARSER: type[CPIOParserBase]
382
383 def extract(self, inpath: Path, outdir: Path) -> ExtractResult | None:
384 fs = FileSystem(outdir)
385
386 with File.from_path(inpath) as file:
387 parser = self.PARSER(file, 0)
388 parser.parse()
389 parser.dump_entries(fs)
390
391
392class BinaryCPIOExtractor(_CPIOExtractorBase):
393 PARSER = BinaryCPIOParser
394
395
396class PortableOldASCIIExtractor(_CPIOExtractorBase):
397 PARSER = PortableOldASCIIParser
398
399
400class PortableASCIIExtractor(_CPIOExtractorBase):
401 PARSER = PortableASCIIParser
402
403
404class PortableASCIIWithCRCExtractor(_CPIOExtractorBase):
405 PARSER = PortableASCIIWithCRCParser
406
407
408class _CPIOHandlerBase(Handler):
409 """A common base for all CPIO formats.
410
411 The format should be parsed the same, there are small differences how to calculate
412 file and filename sizes padding and conversion from octal / hex.
413 """
414
415 EXTRACTOR: _CPIOExtractorBase
416
417 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None:
418 parser = self.EXTRACTOR.PARSER(file, start_offset)
419 parser.parse()
420 return ValidChunk(
421 start_offset=start_offset,
422 end_offset=parser.end_offset,
423 )
424
425
426class BinaryHandler(_CPIOHandlerBase):
427 NAME = "cpio_binary"
428 PATTERNS = [HexString("c7 71 // (default, bin, hpbin)")]
429
430 EXTRACTOR = BinaryCPIOExtractor()
431
432 DOC = HandlerDoc(
433 name="CPIO (binary)",
434 description="CPIO (Copy In, Copy Out) is an archive file format used for bundling files and directories along with their metadata. It is commonly used in Unix-like systems for creating backups or transferring files, and supports various encoding formats including binary and ASCII.",
435 handler_type=HandlerType.ARCHIVE,
436 vendor=None,
437 references=[
438 Reference(
439 title="GNU CPIO Manual",
440 url="https://www.gnu.org/software/cpio/manual/cpio.html",
441 ),
442 ],
443 limitations=[],
444 )
445
446
447class PortableOldASCIIHandler(_CPIOHandlerBase):
448 NAME = "cpio_portable_old_ascii"
449
450 PATTERNS = [HexString("30 37 30 37 30 37 // 07 07 07")]
451
452 EXTRACTOR = PortableOldASCIIExtractor()
453
454 DOC = HandlerDoc(
455 name="CPIO (portable old ASCII)",
456 description="CPIO (Copy In, Copy Out) is an archive file format used for bundling files and directories along with their metadata. It is commonly used in Unix-like systems for creating backups or transferring files, and supports various encoding formats including binary and ASCII.",
457 handler_type=HandlerType.ARCHIVE,
458 vendor=None,
459 references=[
460 Reference(
461 title="GNU CPIO Manual",
462 url="https://www.gnu.org/software/cpio/manual/cpio.html",
463 ),
464 ],
465 limitations=[],
466 )
467
468
469class PortableASCIIHandler(_CPIOHandlerBase):
470 NAME = "cpio_portable_ascii"
471 PATTERNS = [HexString("30 37 30 37 30 31 // 07 07 01 (newc)")]
472
473 EXTRACTOR = PortableASCIIExtractor()
474
475 DOC = HandlerDoc(
476 name="CPIO (portable ASCII)",
477 description="CPIO (Copy In, Copy Out) is an archive file format used for bundling files and directories along with their metadata. It is commonly used in Unix-like systems for creating backups or transferring files, and supports various encoding formats including binary and ASCII.",
478 handler_type=HandlerType.ARCHIVE,
479 vendor=None,
480 references=[
481 Reference(
482 title="GNU CPIO Manual",
483 url="https://www.gnu.org/software/cpio/manual/cpio.html",
484 ),
485 ],
486 limitations=[],
487 )
488
489
490class PortableASCIIWithCRCHandler(_CPIOHandlerBase):
491 NAME = "cpio_portable_ascii_crc"
492 PATTERNS = [HexString("30 37 30 37 30 32 // 07 07 02")]
493
494 EXTRACTOR = PortableASCIIWithCRCExtractor()
495
496 DOC = HandlerDoc(
497 name="CPIO (portable ASCII CRC)",
498 description="CPIO (Copy In, Copy Out) is an archive file format used for bundling files and directories along with their metadata. It is commonly used in Unix-like systems for creating backups or transferring files, and supports various encoding formats including binary and ASCII.",
499 handler_type=HandlerType.ARCHIVE,
500 vendor=None,
501 references=[
502 Reference(
503 title="GNU CPIO Manual",
504 url="https://www.gnu.org/software/cpio/manual/cpio.html",
505 ),
506 ],
507 limitations=[],
508 )