Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/compression/bzip2.py: 96%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

71 statements  

1from typing import Optional 

2 

3import attrs 

4from pyperscan import Flag, Pattern, Scan, StreamDatabase 

5from structlog import get_logger 

6 

7from unblob.extractors import Command 

8 

9from ...file_utils import InvalidInputFormat, SeekError, StructParser, stream_scan 

10from ...models import ( 

11 File, 

12 Handler, 

13 HandlerDoc, 

14 HandlerType, 

15 HexString, 

16 Reference, 

17 Regex, 

18 ValidChunk, 

19) 

20 

21logger = get_logger() 

22 

23C_DEFINITIONS = r""" 

24 typedef struct stream_header { 

25 char magic[2]; // 'BZ' signature/magic number 

26 uint8 version; // 'h' 0x68 for Bzip2 ('H'uffman coding), '0' for Bzip1 (deprecated) 

27 uint8 hundred_k_blocksize; // '1'..'9' block-size 100 kB-900 kB (uncompressed) 

28 } stream_header_t; 

29 

30 typedef struct block_header { 

31 char magic[6]; // 0x314159265359 (BCD (pi)) 

32 uint32 crc; // checksum for this block 

33 uint8 randomised; // 0=>normal, 1=>randomised (deprecated) 

34 } block_header_t; 

35""" 

36 

37 

38STREAM_MAGIC = b"BZ" 

39HUFFMAN_VERSION = ord("h") 

40HUNDRED_K_BLOCK_MIN = ord("1") 

41HUNDRED_K_BLOCK_MAX = ord("9") 

42 

43# 0x314159265359 (BCD (pi)) 

44BLOCK_MAGIC = b"1AY&SY" 

45 

46# Stream ends with a magic 0x177245385090 though it is not aligned 

47# to byte offsets, so we pre-calculated all possible 8 shifts 

48# for bit_shift in range(8): 

49# print(hex(0x1772_4538_5090 << bit_shift)) 

50STREAM_END_MAGIC_PATTERNS = [ 

51 HexString("17 72 45 38 50 90"), 

52 HexString("2e e4 8a 70 a1 2?"), 

53 HexString("5d c9 14 e1 42 4?"), 

54 HexString("bb 92 29 c2 84 8?"), 

55 HexString("?1 77 24 53 85 09"), 

56 HexString("?2 ee 48 a7 0a 12"), 

57 HexString("?5 dc 91 4e 14 24"), 

58 HexString("?b b9 22 9c 28 48"), 

59] 

60 

61# 6 bytes magic + 4 bytes combined CRC 

62STREAM_FOOTER_SIZE = 6 + 4 

63 

64 

65def build_stream_end_scan_db(pattern_list): 

66 return StreamDatabase( 

67 *(Pattern(p.as_regex(), Flag.SOM_LEFTMOST, Flag.DOTALL) for p in pattern_list) 

68 ) 

69 

70 

71hyperscan_stream_end_magic_db = build_stream_end_scan_db(STREAM_END_MAGIC_PATTERNS) 

72parser = StructParser(C_DEFINITIONS) 

73 

74 

75@attrs.define 

76class Bzip2SearchContext: 

77 start_offset: int 

78 file: File 

79 end_block_offset: int 

80 

81 

82def _validate_stream_header(file: File): 

83 try: 

84 header = parser.cparser_be.stream_header_t(file) 

85 except EOFError: 

86 return False 

87 

88 return ( 

89 header.magic == STREAM_MAGIC 

90 and header.version == HUFFMAN_VERSION 

91 and HUNDRED_K_BLOCK_MIN <= header.hundred_k_blocksize <= HUNDRED_K_BLOCK_MAX 

92 ) 

93 

94 

95def _validate_block_header(file: File): 

96 try: 

97 header = parser.cparser_be.block_header_t(file) 

98 except EOFError: 

99 return False 

100 

101 return header.magic == BLOCK_MAGIC 

102 

103 

104def _hyperscan_match( 

105 context: Bzip2SearchContext, pattern_id: int, offset: int, end: int 

106) -> Scan: 

107 del end # unused argument 

108 # Ignore any match before the start of this chunk 

109 if offset < context.start_offset: 

110 return Scan.Continue 

111 

112 last_block_end = offset + STREAM_FOOTER_SIZE 

113 if pattern_id > 3: 

114 last_block_end += 1 

115 

116 # We try seek to the end of the stream 

117 try: 

118 context.file.seek(last_block_end) 

119 except SeekError: 

120 return Scan.Terminate 

121 

122 context.end_block_offset = last_block_end 

123 

124 # Check if there is a next stream starting after the end of this stream 

125 # and try to continue processing that as well 

126 if _validate_stream_header(context.file) and _validate_block_header(context.file): 

127 return Scan.Continue 

128 

129 return Scan.Terminate 

130 

131 

132class BZip2Handler(Handler): 

133 NAME = "bzip2" 

134 

135 # magic + version + block_size + block header magic 

136 PATTERNS = [Regex(r"\x42\x5a\x68[\x31-\x39]\x31\x41\x59\x26\x53\x59")] 

137 

138 EXTRACTOR = Command("7z", "x", "-y", "{inpath}", "-so", stdout="bzip2.uncompressed") 

139 

140 DOC = HandlerDoc( 

141 name=NAME, 

142 description="The bzip2 format is a block-based compression format that uses the Burrows-Wheeler transform and Huffman coding for high compression efficiency. Each stream starts with a header and consists of one or more compressed blocks, ending with a footer containing a checksum.", 

143 handler_type=HandlerType.COMPRESSION, 

144 vendor=None, 

145 references=[ 

146 Reference( 

147 title="bzip2 File Format Documentation", 

148 url="https://sourceware.org/bzip2/manual/manual.html", 

149 ), 

150 Reference( 

151 title="bzip2 Technical Specification", 

152 url="https://en.wikipedia.org/wiki/Bzip2", 

153 ), 

154 ], 

155 limitations=[], 

156 ) 

157 

158 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]: 

159 if not _validate_stream_header(file): 

160 raise InvalidInputFormat("Invalid bzip2 stream header") 

161 

162 if not _validate_block_header(file): 

163 raise InvalidInputFormat("Invalid bzip2 block header") 

164 

165 context = Bzip2SearchContext( 

166 start_offset=start_offset, file=file, end_block_offset=-1 

167 ) 

168 

169 try: 

170 scanner = hyperscan_stream_end_magic_db.build(context, _hyperscan_match) # type: ignore 

171 stream_scan(scanner, file) 

172 except Exception as e: 

173 logger.debug( 

174 "Error scanning for bzip2 patterns", 

175 error=e, 

176 ) 

177 

178 if context.end_block_offset > 0: 

179 return ValidChunk( 

180 start_offset=start_offset, end_offset=context.end_block_offset 

181 ) 

182 

183 raise InvalidInputFormat("No valid bzip2 compression block was detected")