Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/compression/bzip2.py: 96%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

70 statements  

1import attrs 

2from pyperscan import Flag, Pattern, Scan, StreamDatabase 

3from structlog import get_logger 

4 

5from unblob.extractors import Command 

6 

7from ...file_utils import InvalidInputFormat, SeekError, StructParser, stream_scan 

8from ...models import ( 

9 File, 

10 Handler, 

11 HandlerDoc, 

12 HandlerType, 

13 HexString, 

14 Reference, 

15 Regex, 

16 ValidChunk, 

17) 

18 

19logger = get_logger() 

20 

21C_DEFINITIONS = r""" 

22 typedef struct stream_header { 

23 char magic[2]; // 'BZ' signature/magic number 

24 uint8 version; // 'h' 0x68 for Bzip2 ('H'uffman coding), '0' for Bzip1 (deprecated) 

25 uint8 hundred_k_blocksize; // '1'..'9' block-size 100 kB-900 kB (uncompressed) 

26 } stream_header_t; 

27 

28 typedef struct block_header { 

29 char magic[6]; // 0x314159265359 (BCD (pi)) 

30 uint32 crc; // checksum for this block 

31 uint8 randomised; // 0=>normal, 1=>randomised (deprecated) 

32 } block_header_t; 

33""" 

34 

35 

36STREAM_MAGIC = b"BZ" 

37HUFFMAN_VERSION = ord("h") 

38HUNDRED_K_BLOCK_MIN = ord("1") 

39HUNDRED_K_BLOCK_MAX = ord("9") 

40 

41# 0x314159265359 (BCD (pi)) 

42BLOCK_MAGIC = b"1AY&SY" 

43 

44# Stream ends with a magic 0x177245385090 though it is not aligned 

45# to byte offsets, so we pre-calculated all possible 8 shifts 

46# for bit_shift in range(8): 

47# print(hex(0x1772_4538_5090 << bit_shift)) 

48STREAM_END_MAGIC_PATTERNS = [ 

49 HexString("17 72 45 38 50 90"), 

50 HexString("2e e4 8a 70 a1 2?"), 

51 HexString("5d c9 14 e1 42 4?"), 

52 HexString("bb 92 29 c2 84 8?"), 

53 HexString("?1 77 24 53 85 09"), 

54 HexString("?2 ee 48 a7 0a 12"), 

55 HexString("?5 dc 91 4e 14 24"), 

56 HexString("?b b9 22 9c 28 48"), 

57] 

58 

59# 6 bytes magic + 4 bytes combined CRC 

60STREAM_FOOTER_SIZE = 6 + 4 

61 

62 

63def build_stream_end_scan_db(pattern_list): 

64 return StreamDatabase( 

65 *(Pattern(p.as_regex(), Flag.SOM_LEFTMOST, Flag.DOTALL) for p in pattern_list) 

66 ) 

67 

68 

69hyperscan_stream_end_magic_db = build_stream_end_scan_db(STREAM_END_MAGIC_PATTERNS) 

70parser = StructParser(C_DEFINITIONS) 

71 

72 

73@attrs.define 

74class Bzip2SearchContext: 

75 start_offset: int 

76 file: File 

77 end_block_offset: int 

78 

79 

80def _validate_stream_header(file: File): 

81 try: 

82 header = parser.cparser_be.stream_header_t(file) 

83 except EOFError: 

84 return False 

85 

86 return ( 

87 header.magic == STREAM_MAGIC 

88 and header.version == HUFFMAN_VERSION 

89 and HUNDRED_K_BLOCK_MIN <= header.hundred_k_blocksize <= HUNDRED_K_BLOCK_MAX 

90 ) 

91 

92 

93def _validate_block_header(file: File): 

94 try: 

95 header = parser.cparser_be.block_header_t(file) 

96 except EOFError: 

97 return False 

98 

99 return header.magic == BLOCK_MAGIC 

100 

101 

102def _hyperscan_match( 

103 context: Bzip2SearchContext, pattern_id: int, offset: int, end: int 

104) -> Scan: 

105 del end # unused argument 

106 # Ignore any match before the start of this chunk 

107 if offset < context.start_offset: 

108 return Scan.Continue 

109 

110 last_block_end = offset + STREAM_FOOTER_SIZE 

111 if pattern_id > 3: 

112 last_block_end += 1 

113 

114 # We try seek to the end of the stream 

115 try: 

116 context.file.seek(last_block_end) 

117 except SeekError: 

118 return Scan.Terminate 

119 

120 context.end_block_offset = last_block_end 

121 

122 # Check if there is a next stream starting after the end of this stream 

123 # and try to continue processing that as well 

124 if _validate_stream_header(context.file) and _validate_block_header(context.file): 

125 return Scan.Continue 

126 

127 return Scan.Terminate 

128 

129 

130class BZip2Handler(Handler): 

131 NAME = "bzip2" 

132 

133 # magic + version + block_size + block header magic 

134 PATTERNS = [Regex(r"\x42\x5a\x68[\x31-\x39]\x31\x41\x59\x26\x53\x59")] 

135 

136 EXTRACTOR = Command("7z", "x", "-y", "{inpath}", "-so", stdout="bzip2.uncompressed") 

137 

138 DOC = HandlerDoc( 

139 name=NAME, 

140 description="The bzip2 format is a block-based compression format that uses the Burrows-Wheeler transform and Huffman coding for high compression efficiency. Each stream starts with a header and consists of one or more compressed blocks, ending with a footer containing a checksum.", 

141 handler_type=HandlerType.COMPRESSION, 

142 vendor=None, 

143 references=[ 

144 Reference( 

145 title="bzip2 File Format Documentation", 

146 url="https://sourceware.org/bzip2/manual/manual.html", 

147 ), 

148 Reference( 

149 title="bzip2 Technical Specification", 

150 url="https://en.wikipedia.org/wiki/Bzip2", 

151 ), 

152 ], 

153 limitations=[], 

154 ) 

155 

156 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None: 

157 if not _validate_stream_header(file): 

158 raise InvalidInputFormat("Invalid bzip2 stream header") 

159 

160 if not _validate_block_header(file): 

161 raise InvalidInputFormat("Invalid bzip2 block header") 

162 

163 context = Bzip2SearchContext( 

164 start_offset=start_offset, file=file, end_block_offset=-1 

165 ) 

166 

167 try: 

168 scanner = hyperscan_stream_end_magic_db.build(context, _hyperscan_match) # type: ignore 

169 stream_scan(scanner, file) 

170 except Exception as e: 

171 logger.debug( 

172 "Error scanning for bzip2 patterns", 

173 error=e, 

174 ) 

175 

176 if context.end_block_offset > 0: 

177 return ValidChunk( 

178 start_offset=start_offset, end_offset=context.end_block_offset 

179 ) 

180 

181 raise InvalidInputFormat("No valid bzip2 compression block was detected")