1"""7-zip handlers.
2
37-zip archive file format SHALL consist of three part. 7-zip archive
4file SHALL start with signature header. The data block SHOULD placed
5after the signature header. The data block is shown as Packed
6Streams. A header database SHOULD be placed after the data block.
7The data block MAY be empty when no archived contents exists. So
8Packed Streams is optional. Since Header database CAN be encoded then
9it SHOULD place after data block, that is Packed Streams for Headers.
10When Header database is encoded, Header encode Information SHALL
11placed instead of Header.
12
13[Signature Header] [Data] [Header Database]
14
15https://fastapi.metacpan.org/source/BJOERN/Compress-Deflate7-1.0/7zip/DOC/7zFormat.txt
16"7z uses little endian encoding."
17
18https://py7zr.readthedocs.io/en/latest/archive_format.html
19"""
20
21import binascii
22from pathlib import Path
23
24from structlog import get_logger
25
26from unblob.extractors import Command
27
28from ...extractors.command import MultiFileCommand
29from ...file_utils import Endian, InvalidInputFormat, StructParser
30from ...models import (
31 DirectoryHandler,
32 File,
33 Glob,
34 HandlerDoc,
35 HandlerType,
36 HexString,
37 MultiFile,
38 Reference,
39 StructHandler,
40 ValidChunk,
41)
42
43logger = get_logger()
44
45C_DEFINITIONS = r"""
46 typedef struct sevenzip_header {
47 char magic[6];
48 uint8 version_maj;
49 uint8 version_min;
50 uint32 crc;
51 uint64 next_header_offset;
52 uint64 next_header_size;
53 uint32 next_header_crc;
54 } sevenzip_header_t;
55"""
56HEADER_STRUCT = "sevenzip_header_t"
57HEADER_SIZE = 6 + 1 + 1 + 4 + 8 + 8 + 4
58
59HEADER_PARSER = StructParser(C_DEFINITIONS)
60
61# StartHeader (next_header_offset, next_header_size, next_header_crc)
62START_HEADER_SIZE = 8 + 8 + 4
63
64
65SEVENZIP_MAGIC = b"7z\xbc\xaf\x27\x1c"
66
67
68def check_header_crc(header):
69 # CRC includes the StartHeader (next_header_offset, next_header_size, next_header_crc)
70 # CPP/7zip/Archive/7z/7zOut.cpp COutArchive::WriteStartHeader
71 calculated_crc = binascii.crc32(header.dumps()[-START_HEADER_SIZE:])
72 if header.crc != calculated_crc:
73 raise InvalidInputFormat("Invalid sevenzip header CRC")
74
75
76def calculate_sevenzip_size(header) -> int:
77 return len(header) + header.next_header_offset + header.next_header_size
78
79
80class SevenZipHandler(StructHandler):
81 NAME = "sevenzip"
82
83 PATTERNS = [
84 HexString(
85 """
86 // '7', 'z', 0xBC, 0xAF, 0x27, 0x1C
87 37 7A BC AF 27 1C
88 """
89 )
90 ]
91 C_DEFINITIONS = C_DEFINITIONS
92 HEADER_STRUCT = HEADER_STRUCT
93 EXTRACTOR = Command("7z", "x", "-p", "-y", "{inpath}", "-o{outdir}")
94
95 DOC = HandlerDoc(
96 name="7-Zip",
97 description="The 7-Zip file format is a compressed archive format with high compression ratios, supporting multiple algorithms, CRC checks, and multi-volume archives.",
98 handler_type=HandlerType.ARCHIVE,
99 vendor=None,
100 references=[
101 Reference(
102 title="7-Zip Technical Documentation",
103 url="https://fastapi.metacpan.org/source/BJOERN/Compress-Deflate7-1.0/7zip/DOC/7zFormat.txt",
104 ),
105 ],
106 limitations=[],
107 )
108
109 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None:
110 header = self.parse_header(file)
111
112 check_header_crc(header)
113
114 size = calculate_sevenzip_size(header)
115
116 return ValidChunk(start_offset=start_offset, end_offset=start_offset + size)
117
118
119class MultiVolumeSevenZipHandler(DirectoryHandler):
120 NAME = "multi-sevenzip"
121 EXTRACTOR = MultiFileCommand("7z", "x", "-p", "-y", "{inpath}", "-o{outdir}")
122
123 PATTERN = Glob("*.7z.001")
124
125 DOC = HandlerDoc(
126 name=NAME,
127 description="The 7-Zip file format is a compressed archive format with high compression ratios, supporting multiple algorithms, CRC checks, and multi-volume archives.",
128 handler_type=HandlerType.ARCHIVE,
129 vendor=None,
130 references=[
131 Reference(
132 title="7-Zip Technical Documentation",
133 url="https://fastapi.metacpan.org/source/BJOERN/Compress-Deflate7-1.0/7zip/DOC/7zFormat.txt",
134 ),
135 ],
136 limitations=[],
137 )
138
139 def calculate_multifile(self, file: Path) -> MultiFile | None:
140 paths = sorted(
141 [p for p in file.parent.glob(f"{file.stem}.*") if p.resolve().exists()]
142 )
143 if not paths:
144 return None
145
146 with file.open("rb") as f:
147 header_data = f.read(HEADER_SIZE)
148
149 header = HEADER_PARSER.parse(HEADER_STRUCT, header_data, Endian.LITTLE)
150 if header.magic != SEVENZIP_MAGIC:
151 return None
152
153 check_header_crc(header)
154 size = calculate_sevenzip_size(header)
155 logger.debug("Sevenzip header", header=header, size=size, _verbosity=3)
156
157 files_size = sum(path.stat().st_size for path in paths)
158 logger.debug(
159 "Multi-volume files", paths=paths, files_size=files_size, _verbosity=2
160 )
161 if files_size != size:
162 return None
163
164 return MultiFile(
165 name=file.stem,
166 paths=paths,
167 )