1import io
2import lzma
3
4from structlog import get_logger
5
6from unblob.extractors import Command
7
8from ...file_utils import (
9 DEFAULT_BUFSIZE,
10 Endian,
11 InvalidInputFormat,
12 convert_int32,
13 convert_int64,
14)
15from ...models import (
16 File,
17 Handler,
18 HandlerDoc,
19 HandlerType,
20 HexString,
21 Reference,
22 ValidChunk,
23)
24
25logger = get_logger()
26
27# 256GB
28MAX_UNCOMPRESSED_SIZE = 256 * 1024 * 1024 * 1024
29
30# This an arbitrary value
31MIN_COMPRESSED_SIZE = 256
32
33MIN_READ_RATIO = 0.1
34
35
36class LZMAHandler(Handler):
37 NAME = "lzma"
38
39 PATTERNS = [
40 HexString(
41 """
42 // pre-computed valid properties bytes
43 (
44 51 | 5A | 5B | 5C | 5D | 5E | 63 | 64 | 65 | 66 | 6C | 6D | 6E | 75 | 76 | 7E |
45 87 | 88 | 89 | 8A | 8B | 90 | 91 | 92 | 93 | 99 | 9A | 9B | A2 | A3 | AB | B4 |
46 B5 | B6 | B7 | B8 | BD | BE | BF | C0 | C6 | C7 | C8 | CF | D0 | D8
47 )
48 // dictionary size
49 00 00 ( 00 | 01 | 04 | 08 | 10 | 20 | 40 | 80) ( 00 | 01 | 02 | 04 | 08 )
50 """
51 )
52 ]
53
54 EXTRACTOR = Command("7z", "x", "-y", "{inpath}", "-so", stdout="lzma.uncompressed")
55
56 DOC = HandlerDoc(
57 name="LZMA",
58 description="LZMA is a compression format based on the Lempel-Ziv-Markov chain algorithm, offering high compression ratios and efficient decompression. It is commonly used in standalone `.lzma` files and embedded in other formats like 7z.",
59 handler_type=HandlerType.COMPRESSION,
60 vendor=None,
61 references=[
62 Reference(
63 title="LZMA File Format Documentation",
64 url="https://tukaani.org/xz/lzma.txt",
65 ),
66 Reference(
67 title="LZMA Wikipedia",
68 url="https://en.wikipedia.org/wiki/Lempel%E2%80%93Ziv%E2%80%93Markov_chain_algorithm",
69 ),
70 ],
71 limitations=[],
72 )
73
74 def is_valid_stream(self, dictionary_size: int, uncompressed_size: int) -> bool:
75 # dictionary size is non-zero (section 1.1.2 of format definition)
76 # dictionary size is a power of two (section 1.1.2 of format definition)
77 if dictionary_size == 0 or (dictionary_size & (dictionary_size - 1)) != 0:
78 return False
79 # uncompressed size is either unknown (0xFFFFFFFFFFFFFFFF) or
80 # smaller than 256GB (section 1.1.3 of format definition)
81 if not ( # noqa: SIM103
82 uncompressed_size == 0xFFFFFFFFFFFFFFFF
83 or uncompressed_size < MAX_UNCOMPRESSED_SIZE
84 ):
85 return False
86 return True
87
88 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None:
89 read_size = 0
90 file.seek(start_offset + 1)
91 dictionary_size = convert_int32(file.read(4), Endian.LITTLE)
92 uncompressed_size = convert_int64(file.read(8), Endian.LITTLE)
93
94 if not self.is_valid_stream(dictionary_size, uncompressed_size):
95 raise InvalidInputFormat
96
97 file.seek(start_offset, io.SEEK_SET)
98 decompressor = lzma.LZMADecompressor(format=lzma.FORMAT_ALONE)
99
100 try:
101 while read_size < uncompressed_size and not decompressor.eof:
102 data = file.read(DEFAULT_BUFSIZE)
103 if not data:
104 if read_size < (uncompressed_size * MIN_READ_RATIO):
105 raise InvalidInputFormat("Very early truncated LZMA stream")
106
107 logger.debug(
108 "LZMA stream is truncated.",
109 read_size=read_size,
110 uncompressed_size=uncompressed_size,
111 )
112 break
113 read_size += len(decompressor.decompress(data))
114
115 except lzma.LZMAError as exc:
116 raise InvalidInputFormat from exc
117
118 end_offset = file.tell() - len(decompressor.unused_data)
119 compressed_size = end_offset - start_offset
120
121 if (read_size < compressed_size) or (compressed_size < MIN_COMPRESSED_SIZE):
122 raise InvalidInputFormat
123
124 return ValidChunk(
125 start_offset=start_offset,
126 end_offset=end_offset,
127 )