Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/compression/lzma.py: 100%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

46 statements  

1import io 

2import lzma 

3 

4from structlog import get_logger 

5 

6from unblob.extractors import Command 

7 

8from ...file_utils import ( 

9 DEFAULT_BUFSIZE, 

10 Endian, 

11 InvalidInputFormat, 

12 convert_int32, 

13 convert_int64, 

14) 

15from ...models import ( 

16 File, 

17 Handler, 

18 HandlerDoc, 

19 HandlerType, 

20 HexString, 

21 Reference, 

22 ValidChunk, 

23) 

24 

25logger = get_logger() 

26 

27# 256GB 

28MAX_UNCOMPRESSED_SIZE = 256 * 1024 * 1024 * 1024 

29 

30# This an arbitrary value 

31MIN_COMPRESSED_SIZE = 256 

32 

33MIN_READ_RATIO = 0.1 

34 

35 

36class LZMAHandler(Handler): 

37 NAME = "lzma" 

38 

39 PATTERNS = [ 

40 HexString( 

41 """ 

42 // pre-computed valid properties bytes 

43 ( 

44 51 | 5A | 5B | 5C | 5D | 5E | 63 | 64 | 65 | 66 | 6C | 6D | 6E | 75 | 76 | 7E | 

45 87 | 88 | 89 | 8A | 8B | 90 | 91 | 92 | 93 | 99 | 9A | 9B | A2 | A3 | AB | B4 | 

46 B5 | B6 | B7 | B8 | BD | BE | BF | C0 | C6 | C7 | C8 | CF | D0 | D8 

47 ) 

48 // dictionary size 

49 00 00 ( 00 | 01 | 04 | 08 | 10 | 20 | 40 | 80) ( 00 | 01 | 02 | 04 | 08 ) 

50 """ 

51 ) 

52 ] 

53 

54 EXTRACTOR = Command("7z", "x", "-y", "{inpath}", "-so", stdout="lzma.uncompressed") 

55 

56 DOC = HandlerDoc( 

57 name="LZMA", 

58 description="LZMA is a compression format based on the Lempel-Ziv-Markov chain algorithm, offering high compression ratios and efficient decompression. It is commonly used in standalone `.lzma` files and embedded in other formats like 7z.", 

59 handler_type=HandlerType.COMPRESSION, 

60 vendor=None, 

61 references=[ 

62 Reference( 

63 title="LZMA File Format Documentation", 

64 url="https://tukaani.org/xz/lzma.txt", 

65 ), 

66 Reference( 

67 title="LZMA Wikipedia", 

68 url="https://en.wikipedia.org/wiki/Lempel%E2%80%93Ziv%E2%80%93Markov_chain_algorithm", 

69 ), 

70 ], 

71 limitations=[], 

72 ) 

73 

74 def is_valid_stream(self, dictionary_size: int, uncompressed_size: int) -> bool: 

75 # dictionary size is non-zero (section 1.1.2 of format definition) 

76 # dictionary size is a power of two (section 1.1.2 of format definition) 

77 if dictionary_size == 0 or (dictionary_size & (dictionary_size - 1)) != 0: 

78 return False 

79 # uncompressed size is either unknown (0xFFFFFFFFFFFFFFFF) or 

80 # smaller than 256GB (section 1.1.3 of format definition) 

81 if not ( # noqa: SIM103 

82 uncompressed_size == 0xFFFFFFFFFFFFFFFF 

83 or uncompressed_size < MAX_UNCOMPRESSED_SIZE 

84 ): 

85 return False 

86 return True 

87 

88 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None: 

89 read_size = 0 

90 file.seek(start_offset + 1) 

91 dictionary_size = convert_int32(file.read(4), Endian.LITTLE) 

92 uncompressed_size = convert_int64(file.read(8), Endian.LITTLE) 

93 

94 if not self.is_valid_stream(dictionary_size, uncompressed_size): 

95 raise InvalidInputFormat 

96 

97 file.seek(start_offset, io.SEEK_SET) 

98 decompressor = lzma.LZMADecompressor(format=lzma.FORMAT_ALONE) 

99 

100 try: 

101 while read_size < uncompressed_size and not decompressor.eof: 

102 data = file.read(DEFAULT_BUFSIZE) 

103 if not data: 

104 if read_size < (uncompressed_size * MIN_READ_RATIO): 

105 raise InvalidInputFormat("Very early truncated LZMA stream") 

106 

107 logger.debug( 

108 "LZMA stream is truncated.", 

109 read_size=read_size, 

110 uncompressed_size=uncompressed_size, 

111 ) 

112 break 

113 read_size += len(decompressor.decompress(data)) 

114 

115 except lzma.LZMAError as exc: 

116 raise InvalidInputFormat from exc 

117 

118 end_offset = file.tell() - len(decompressor.unused_data) 

119 compressed_size = end_offset - start_offset 

120 

121 if (read_size < compressed_size) or (compressed_size < MIN_COMPRESSED_SIZE): 

122 raise InvalidInputFormat 

123 

124 return ValidChunk( 

125 start_offset=start_offset, 

126 end_offset=end_offset, 

127 )