1"""7-zip handlers.
2
37-zip archive file format SHALL consist of three part. 7-zip archive
4file SHALL start with signature header. The data block SHOULD placed
5after the signature header. The data block is shown as Packed
6Streams. A header database SHOULD be placed after the data block.
7The data block MAY be empty when no archived contents exists. So
8Packed Streams is optional. Since Header database CAN be encoded then
9it SHOULD place after data block, that is Packed Streams for Headers.
10When Header database is encoded, Header encode Information SHALL
11placed instead of Header.
12
13[Signature Header] [Data] [Header Database]
14
15https://fastapi.metacpan.org/source/BJOERN/Compress-Deflate7-1.0/7zip/DOC/7zFormat.txt
16"7z uses little endian encoding."
17
18https://py7zr.readthedocs.io/en/latest/archive_format.html
19"""
20
21import binascii
22from pathlib import Path
23from typing import Optional
24
25from structlog import get_logger
26
27from unblob.extractors import Command
28
29from ...extractors.command import MultiFileCommand
30from ...file_utils import Endian, InvalidInputFormat, StructParser
31from ...models import (
32 DirectoryHandler,
33 File,
34 Glob,
35 HandlerDoc,
36 HandlerType,
37 HexString,
38 MultiFile,
39 Reference,
40 StructHandler,
41 ValidChunk,
42)
43
44logger = get_logger()
45
46C_DEFINITIONS = r"""
47 typedef struct sevenzip_header {
48 char magic[6];
49 uint8 version_maj;
50 uint8 version_min;
51 uint32 crc;
52 uint64 next_header_offset;
53 uint64 next_header_size;
54 uint32 next_header_crc;
55 } sevenzip_header_t;
56"""
57HEADER_STRUCT = "sevenzip_header_t"
58HEADER_SIZE = 6 + 1 + 1 + 4 + 8 + 8 + 4
59
60HEADER_PARSER = StructParser(C_DEFINITIONS)
61
62# StartHeader (next_header_offset, next_header_size, next_header_crc)
63START_HEADER_SIZE = 8 + 8 + 4
64
65
66SEVENZIP_MAGIC = b"7z\xbc\xaf\x27\x1c"
67
68
69def check_header_crc(header):
70 # CRC includes the StartHeader (next_header_offset, next_header_size, next_header_crc)
71 # CPP/7zip/Archive/7z/7zOut.cpp COutArchive::WriteStartHeader
72 calculated_crc = binascii.crc32(header.dumps()[-START_HEADER_SIZE:])
73 if header.crc != calculated_crc:
74 raise InvalidInputFormat("Invalid sevenzip header CRC")
75
76
77def calculate_sevenzip_size(header) -> int:
78 return len(header) + header.next_header_offset + header.next_header_size
79
80
81class SevenZipHandler(StructHandler):
82 NAME = "sevenzip"
83
84 PATTERNS = [
85 HexString(
86 """
87 // '7', 'z', 0xBC, 0xAF, 0x27, 0x1C
88 37 7A BC AF 27 1C
89 """
90 )
91 ]
92 C_DEFINITIONS = C_DEFINITIONS
93 HEADER_STRUCT = HEADER_STRUCT
94 EXTRACTOR = Command("7z", "x", "-p", "-y", "{inpath}", "-o{outdir}")
95
96 DOC = HandlerDoc(
97 name="7-Zip",
98 description="The 7-Zip file format is a compressed archive format with high compression ratios, supporting multiple algorithms, CRC checks, and multi-volume archives.",
99 handler_type=HandlerType.ARCHIVE,
100 vendor=None,
101 references=[
102 Reference(
103 title="7-Zip Technical Documentation",
104 url="https://fastapi.metacpan.org/source/BJOERN/Compress-Deflate7-1.0/7zip/DOC/7zFormat.txt",
105 ),
106 ],
107 limitations=[],
108 )
109
110 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]:
111 header = self.parse_header(file)
112
113 check_header_crc(header)
114
115 size = calculate_sevenzip_size(header)
116
117 return ValidChunk(start_offset=start_offset, end_offset=start_offset + size)
118
119
120class MultiVolumeSevenZipHandler(DirectoryHandler):
121 NAME = "multi-sevenzip"
122 EXTRACTOR = MultiFileCommand("7z", "x", "-p", "-y", "{inpath}", "-o{outdir}")
123
124 PATTERN = Glob("*.7z.001")
125
126 DOC = HandlerDoc(
127 name=NAME,
128 description="The 7-Zip file format is a compressed archive format with high compression ratios, supporting multiple algorithms, CRC checks, and multi-volume archives.",
129 handler_type=HandlerType.ARCHIVE,
130 vendor=None,
131 references=[
132 Reference(
133 title="7-Zip Technical Documentation",
134 url="https://fastapi.metacpan.org/source/BJOERN/Compress-Deflate7-1.0/7zip/DOC/7zFormat.txt",
135 ),
136 ],
137 limitations=[],
138 )
139
140 def calculate_multifile(self, file: Path) -> Optional[MultiFile]:
141 paths = sorted(
142 [p for p in file.parent.glob(f"{file.stem}.*") if p.resolve().exists()]
143 )
144 if not paths:
145 return None
146
147 with file.open("rb") as f:
148 header_data = f.read(HEADER_SIZE)
149
150 header = HEADER_PARSER.parse(HEADER_STRUCT, header_data, Endian.LITTLE)
151 if header.magic != SEVENZIP_MAGIC:
152 return None
153
154 check_header_crc(header)
155 size = calculate_sevenzip_size(header)
156 logger.debug("Sevenzip header", header=header, size=size, _verbosity=3)
157
158 files_size = sum(path.stat().st_size for path in paths)
159 logger.debug(
160 "Multi-volume files", paths=paths, files_size=files_size, _verbosity=2
161 )
162 if files_size != size:
163 return None
164
165 return MultiFile(
166 name=file.stem,
167 paths=paths,
168 )