Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/compression/zstd.py: 100%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

51 statements  

1import io 

2 

3from structlog import get_logger 

4 

5from unblob.extractors import Command 

6 

7from ...file_utils import Endian, InvalidInputFormat, convert_int8 

8from ...models import ( 

9 File, 

10 Handler, 

11 HandlerDoc, 

12 HandlerType, 

13 HexString, 

14 Reference, 

15 ValidChunk, 

16) 

17 

18logger = get_logger() 

19 

20MAGIC_LEN = 4 

21BLOCK_HEADER_LEN = 3 

22RAW_BLOCK = 0 

23RLE_BLOCK = 1 

24COMPRESSED_BLOCK = 2 

25DICT_ID_FIELDSIZE_MAP = [0, 1, 2, 4] 

26FRAME_CONTENT_FIELDSIZE_MAP = [0, 2, 4, 8] 

27 

28 

29class ZSTDHandler(Handler): 

30 NAME = "zstd" 

31 

32 PATTERNS = [HexString("28 B5 2F FD")] 

33 

34 EXTRACTOR = Command("zstd", "-d", "{inpath}", "-o", "{outdir}/zstd.uncompressed") 

35 

36 DOC = HandlerDoc( 

37 name="ZSTD", 

38 description="Zstandard (ZSTD) is a fast lossless compression algorithm with high compression ratios, designed for modern data storage and transfer. Its file format includes a frame structure with optional dictionary support and checksums for data integrity.", 

39 handler_type=HandlerType.COMPRESSION, 

40 vendor=None, 

41 references=[ 

42 Reference( 

43 title="Zstandard File Format Specification", 

44 url="https://facebook.github.io/zstd/zstd_manual.html", 

45 ), 

46 Reference( 

47 title="Zstandard Wikipedia", 

48 url="https://en.wikipedia.org/wiki/Zstandard", 

49 ), 

50 ], 

51 limitations=[], 

52 ) 

53 

54 def get_frame_header_size(self, frame_header_descriptor: int) -> int: 

55 single_segment = (frame_header_descriptor >> 5 & 1) & 0b1 

56 dictionary_id = frame_header_descriptor >> 0 & 0b11 

57 frame_content_size = (frame_header_descriptor >> 6) & 0b11 

58 return ( 

59 int(not single_segment) 

60 + DICT_ID_FIELDSIZE_MAP[dictionary_id] 

61 + FRAME_CONTENT_FIELDSIZE_MAP[frame_content_size] 

62 + (single_segment and not frame_content_size) 

63 ) 

64 

65 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None: 

66 file.seek(start_offset, io.SEEK_SET) 

67 file.seek(MAGIC_LEN, io.SEEK_CUR) 

68 

69 frame_header_descriptor = convert_int8(file.read(1), Endian.LITTLE) 

70 frame_header_size = self.get_frame_header_size(frame_header_descriptor) 

71 

72 content_checksum_flag = frame_header_descriptor >> 2 & 1 

73 content_checksum_size = 4 if content_checksum_flag else 0 

74 

75 unused_bit = frame_header_descriptor >> 4 & 1 

76 reserved_bit = frame_header_descriptor >> 3 & 1 

77 

78 # these values MUST be zero per the standard 

79 if unused_bit != 0x00 or reserved_bit != 0x0: 

80 raise InvalidInputFormat("Invalid frame header format.") 

81 

82 file.seek(frame_header_size, io.SEEK_CUR) 

83 

84 last_block = False 

85 while not last_block: 

86 block_header_val = file.read(BLOCK_HEADER_LEN) 

87 # EOF 

88 if not block_header_val: 

89 raise InvalidInputFormat("Premature end of ZSTD stream.") 

90 block_header = int.from_bytes(block_header_val, byteorder="little") 

91 last_block = block_header >> 0 & 0b1 

92 block_type = block_header >> 1 & 0b11 

93 

94 if block_type in [RAW_BLOCK, COMPRESSED_BLOCK]: 

95 block_size = block_header >> 3 

96 elif block_type == RLE_BLOCK: 

97 block_size = 1 

98 else: 

99 raise InvalidInputFormat("Invalid block type") 

100 file.seek(block_size, io.SEEK_CUR) 

101 

102 file.seek(content_checksum_size, io.SEEK_CUR) 

103 

104 return ValidChunk( 

105 start_offset=start_offset, 

106 end_offset=file.tell(), 

107 )