1import io
2
3from structlog import get_logger
4
5from unblob.extractors import Command
6
7from ...file_utils import Endian, InvalidInputFormat, convert_int8
8from ...models import (
9 File,
10 Handler,
11 HandlerDoc,
12 HandlerType,
13 HexString,
14 Reference,
15 ValidChunk,
16)
17
18logger = get_logger()
19
20MAGIC_LEN = 4
21BLOCK_HEADER_LEN = 3
22RAW_BLOCK = 0
23RLE_BLOCK = 1
24COMPRESSED_BLOCK = 2
25DICT_ID_FIELDSIZE_MAP = [0, 1, 2, 4]
26FRAME_CONTENT_FIELDSIZE_MAP = [0, 2, 4, 8]
27
28
29class ZSTDHandler(Handler):
30 NAME = "zstd"
31
32 PATTERNS = [HexString("28 B5 2F FD")]
33
34 EXTRACTOR = Command("zstd", "-d", "{inpath}", "-o", "{outdir}/zstd.uncompressed")
35
36 DOC = HandlerDoc(
37 name="ZSTD",
38 description="Zstandard (ZSTD) is a fast lossless compression algorithm with high compression ratios, designed for modern data storage and transfer. Its file format includes a frame structure with optional dictionary support and checksums for data integrity.",
39 handler_type=HandlerType.COMPRESSION,
40 vendor=None,
41 references=[
42 Reference(
43 title="Zstandard File Format Specification",
44 url="https://facebook.github.io/zstd/zstd_manual.html",
45 ),
46 Reference(
47 title="Zstandard Wikipedia",
48 url="https://en.wikipedia.org/wiki/Zstandard",
49 ),
50 ],
51 limitations=[],
52 )
53
54 def get_frame_header_size(self, frame_header_descriptor: int) -> int:
55 single_segment = (frame_header_descriptor >> 5 & 1) & 0b1
56 dictionary_id = frame_header_descriptor >> 0 & 0b11
57 frame_content_size = (frame_header_descriptor >> 6) & 0b11
58 return (
59 int(not single_segment)
60 + DICT_ID_FIELDSIZE_MAP[dictionary_id]
61 + FRAME_CONTENT_FIELDSIZE_MAP[frame_content_size]
62 + (single_segment and not frame_content_size)
63 )
64
65 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None:
66 file.seek(start_offset, io.SEEK_SET)
67 file.seek(MAGIC_LEN, io.SEEK_CUR)
68
69 frame_header_descriptor = convert_int8(file.read(1), Endian.LITTLE)
70 frame_header_size = self.get_frame_header_size(frame_header_descriptor)
71
72 content_checksum_flag = frame_header_descriptor >> 2 & 1
73 content_checksum_size = 4 if content_checksum_flag else 0
74
75 unused_bit = frame_header_descriptor >> 4 & 1
76 reserved_bit = frame_header_descriptor >> 3 & 1
77
78 # these values MUST be zero per the standard
79 if unused_bit != 0x00 or reserved_bit != 0x0:
80 raise InvalidInputFormat("Invalid frame header format.")
81
82 file.seek(frame_header_size, io.SEEK_CUR)
83
84 last_block = False
85 while not last_block:
86 block_header_val = file.read(BLOCK_HEADER_LEN)
87 # EOF
88 if not block_header_val:
89 raise InvalidInputFormat("Premature end of ZSTD stream.")
90 block_header = int.from_bytes(block_header_val, byteorder="little")
91 last_block = block_header >> 0 & 0b1
92 block_type = block_header >> 1 & 0b11
93
94 if block_type in [RAW_BLOCK, COMPRESSED_BLOCK]:
95 block_size = block_header >> 3
96 elif block_type == RLE_BLOCK:
97 block_size = 1
98 else:
99 raise InvalidInputFormat("Invalid block type")
100 file.seek(block_size, io.SEEK_CUR)
101
102 file.seek(content_checksum_size, io.SEEK_CUR)
103
104 return ValidChunk(
105 start_offset=start_offset,
106 end_offset=file.tell(),
107 )