1import io
2from typing import Optional
3
4from structlog import get_logger
5
6from unblob.extractors import Command
7
8from ...file_utils import Endian, InvalidInputFormat, convert_int8
9from ...models import (
10 File,
11 Handler,
12 HandlerDoc,
13 HandlerType,
14 HexString,
15 Reference,
16 ValidChunk,
17)
18
19logger = get_logger()
20
21MAGIC_LEN = 4
22BLOCK_HEADER_LEN = 3
23RAW_BLOCK = 0
24RLE_BLOCK = 1
25COMPRESSED_BLOCK = 2
26DICT_ID_FIELDSIZE_MAP = [0, 1, 2, 4]
27FRAME_CONTENT_FIELDSIZE_MAP = [0, 2, 4, 8]
28
29
30class ZSTDHandler(Handler):
31 NAME = "zstd"
32
33 PATTERNS = [HexString("28 B5 2F FD")]
34
35 EXTRACTOR = Command("zstd", "-d", "{inpath}", "-o", "{outdir}/zstd.uncompressed")
36
37 DOC = HandlerDoc(
38 name="ZSTD",
39 description="Zstandard (ZSTD) is a fast lossless compression algorithm with high compression ratios, designed for modern data storage and transfer. Its file format includes a frame structure with optional dictionary support and checksums for data integrity.",
40 handler_type=HandlerType.COMPRESSION,
41 vendor=None,
42 references=[
43 Reference(
44 title="Zstandard File Format Specification",
45 url="https://facebook.github.io/zstd/zstd_manual.html",
46 ),
47 Reference(
48 title="Zstandard Wikipedia",
49 url="https://en.wikipedia.org/wiki/Zstandard",
50 ),
51 ],
52 limitations=[],
53 )
54
55 def get_frame_header_size(self, frame_header_descriptor: int) -> int:
56 single_segment = (frame_header_descriptor >> 5 & 1) & 0b1
57 dictionary_id = frame_header_descriptor >> 0 & 0b11
58 frame_content_size = (frame_header_descriptor >> 6) & 0b11
59 return (
60 int(not single_segment)
61 + DICT_ID_FIELDSIZE_MAP[dictionary_id]
62 + FRAME_CONTENT_FIELDSIZE_MAP[frame_content_size]
63 + (single_segment and not frame_content_size)
64 )
65
66 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]:
67 file.seek(start_offset, io.SEEK_SET)
68 file.seek(MAGIC_LEN, io.SEEK_CUR)
69
70 frame_header_descriptor = convert_int8(file.read(1), Endian.LITTLE)
71 frame_header_size = self.get_frame_header_size(frame_header_descriptor)
72
73 content_checksum_flag = frame_header_descriptor >> 2 & 1
74 content_checksum_size = 4 if content_checksum_flag else 0
75
76 unused_bit = frame_header_descriptor >> 4 & 1
77 reserved_bit = frame_header_descriptor >> 3 & 1
78
79 # these values MUST be zero per the standard
80 if unused_bit != 0x00 or reserved_bit != 0x0:
81 raise InvalidInputFormat("Invalid frame header format.")
82
83 file.seek(frame_header_size, io.SEEK_CUR)
84
85 last_block = False
86 while not last_block:
87 block_header_val = file.read(BLOCK_HEADER_LEN)
88 # EOF
89 if not block_header_val:
90 raise InvalidInputFormat("Premature end of ZSTD stream.")
91 block_header = int.from_bytes(block_header_val, byteorder="little")
92 last_block = block_header >> 0 & 0b1
93 block_type = block_header >> 1 & 0b11
94
95 if block_type in [RAW_BLOCK, COMPRESSED_BLOCK]:
96 block_size = block_header >> 3
97 elif block_type == RLE_BLOCK:
98 block_size = 1
99 else:
100 raise InvalidInputFormat("Invalid block type")
101 file.seek(block_size, io.SEEK_CUR)
102
103 file.seek(content_checksum_size, io.SEEK_CUR)
104
105 return ValidChunk(
106 start_offset=start_offset,
107 end_offset=file.tell(),
108 )