Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/compression/zstd.py: 100%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

52 statements  

1import io 

2from typing import Optional 

3 

4from structlog import get_logger 

5 

6from unblob.extractors import Command 

7 

8from ...file_utils import Endian, InvalidInputFormat, convert_int8 

9from ...models import ( 

10 File, 

11 Handler, 

12 HandlerDoc, 

13 HandlerType, 

14 HexString, 

15 Reference, 

16 ValidChunk, 

17) 

18 

19logger = get_logger() 

20 

21MAGIC_LEN = 4 

22BLOCK_HEADER_LEN = 3 

23RAW_BLOCK = 0 

24RLE_BLOCK = 1 

25COMPRESSED_BLOCK = 2 

26DICT_ID_FIELDSIZE_MAP = [0, 1, 2, 4] 

27FRAME_CONTENT_FIELDSIZE_MAP = [0, 2, 4, 8] 

28 

29 

30class ZSTDHandler(Handler): 

31 NAME = "zstd" 

32 

33 PATTERNS = [HexString("28 B5 2F FD")] 

34 

35 EXTRACTOR = Command("zstd", "-d", "{inpath}", "-o", "{outdir}/zstd.uncompressed") 

36 

37 DOC = HandlerDoc( 

38 name="ZSTD", 

39 description="Zstandard (ZSTD) is a fast lossless compression algorithm with high compression ratios, designed for modern data storage and transfer. Its file format includes a frame structure with optional dictionary support and checksums for data integrity.", 

40 handler_type=HandlerType.COMPRESSION, 

41 vendor=None, 

42 references=[ 

43 Reference( 

44 title="Zstandard File Format Specification", 

45 url="https://facebook.github.io/zstd/zstd_manual.html", 

46 ), 

47 Reference( 

48 title="Zstandard Wikipedia", 

49 url="https://en.wikipedia.org/wiki/Zstandard", 

50 ), 

51 ], 

52 limitations=[], 

53 ) 

54 

55 def get_frame_header_size(self, frame_header_descriptor: int) -> int: 

56 single_segment = (frame_header_descriptor >> 5 & 1) & 0b1 

57 dictionary_id = frame_header_descriptor >> 0 & 0b11 

58 frame_content_size = (frame_header_descriptor >> 6) & 0b11 

59 return ( 

60 int(not single_segment) 

61 + DICT_ID_FIELDSIZE_MAP[dictionary_id] 

62 + FRAME_CONTENT_FIELDSIZE_MAP[frame_content_size] 

63 + (single_segment and not frame_content_size) 

64 ) 

65 

66 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]: 

67 file.seek(start_offset, io.SEEK_SET) 

68 file.seek(MAGIC_LEN, io.SEEK_CUR) 

69 

70 frame_header_descriptor = convert_int8(file.read(1), Endian.LITTLE) 

71 frame_header_size = self.get_frame_header_size(frame_header_descriptor) 

72 

73 content_checksum_flag = frame_header_descriptor >> 2 & 1 

74 content_checksum_size = 4 if content_checksum_flag else 0 

75 

76 unused_bit = frame_header_descriptor >> 4 & 1 

77 reserved_bit = frame_header_descriptor >> 3 & 1 

78 

79 # these values MUST be zero per the standard 

80 if unused_bit != 0x00 or reserved_bit != 0x0: 

81 raise InvalidInputFormat("Invalid frame header format.") 

82 

83 file.seek(frame_header_size, io.SEEK_CUR) 

84 

85 last_block = False 

86 while not last_block: 

87 block_header_val = file.read(BLOCK_HEADER_LEN) 

88 # EOF 

89 if not block_header_val: 

90 raise InvalidInputFormat("Premature end of ZSTD stream.") 

91 block_header = int.from_bytes(block_header_val, byteorder="little") 

92 last_block = block_header >> 0 & 0b1 

93 block_type = block_header >> 1 & 0b11 

94 

95 if block_type in [RAW_BLOCK, COMPRESSED_BLOCK]: 

96 block_size = block_header >> 3 

97 elif block_type == RLE_BLOCK: 

98 block_size = 1 

99 else: 

100 raise InvalidInputFormat("Invalid block type") 

101 file.seek(block_size, io.SEEK_CUR) 

102 

103 file.seek(content_checksum_size, io.SEEK_CUR) 

104 

105 return ValidChunk( 

106 start_offset=start_offset, 

107 end_offset=file.tell(), 

108 )