1import io
2import lzma
3from typing import Optional
4
5from structlog import get_logger
6
7from unblob.extractors import Command
8
9from ...file_utils import (
10 DEFAULT_BUFSIZE,
11 Endian,
12 InvalidInputFormat,
13 convert_int32,
14 convert_int64,
15)
16from ...models import (
17 File,
18 Handler,
19 HandlerDoc,
20 HandlerType,
21 HexString,
22 Reference,
23 ValidChunk,
24)
25
26logger = get_logger()
27
28# 256GB
29MAX_UNCOMPRESSED_SIZE = 256 * 1024 * 1024 * 1024
30
31# This an arbitrary value
32MIN_COMPRESSED_SIZE = 256
33
34MIN_READ_RATIO = 0.1
35
36
37class LZMAHandler(Handler):
38 NAME = "lzma"
39
40 PATTERNS = [
41 HexString(
42 """
43 // pre-computed valid properties bytes
44 (
45 51 | 5A | 5B | 5C | 5D | 5E | 63 | 64 | 65 | 66 | 6C | 6D | 6E | 75 | 76 | 7E |
46 87 | 88 | 89 | 8A | 8B | 90 | 91 | 92 | 93 | 99 | 9A | 9B | A2 | A3 | AB | B4 |
47 B5 | B6 | B7 | B8 | BD | BE | BF | C0 | C6 | C7 | C8 | CF | D0 | D8
48 )
49 // dictionary size
50 00 00 ( 00 | 01 | 04 | 08 | 10 | 20 | 40 | 80) ( 00 | 01 | 02 | 04 | 08 )
51 """
52 )
53 ]
54
55 EXTRACTOR = Command("7z", "x", "-y", "{inpath}", "-so", stdout="lzma.uncompressed")
56
57 DOC = HandlerDoc(
58 name="LZMA",
59 description="LZMA is a compression format based on the Lempel-Ziv-Markov chain algorithm, offering high compression ratios and efficient decompression. It is commonly used in standalone `.lzma` files and embedded in other formats like 7z.",
60 handler_type=HandlerType.COMPRESSION,
61 vendor=None,
62 references=[
63 Reference(
64 title="LZMA File Format Documentation",
65 url="https://tukaani.org/xz/lzma.txt",
66 ),
67 Reference(
68 title="LZMA Wikipedia",
69 url="https://en.wikipedia.org/wiki/Lempel%E2%80%93Ziv%E2%80%93Markov_chain_algorithm",
70 ),
71 ],
72 limitations=[],
73 )
74
75 def is_valid_stream(self, dictionary_size: int, uncompressed_size: int) -> bool:
76 # dictionary size is non-zero (section 1.1.2 of format definition)
77 # dictionary size is a power of two (section 1.1.2 of format definition)
78 if dictionary_size == 0 or (dictionary_size & (dictionary_size - 1)) != 0:
79 return False
80 # uncompressed size is either unknown (0xFFFFFFFFFFFFFFFF) or
81 # smaller than 256GB (section 1.1.3 of format definition)
82 if not ( # noqa: SIM103
83 uncompressed_size == 0xFFFFFFFFFFFFFFFF
84 or uncompressed_size < MAX_UNCOMPRESSED_SIZE
85 ):
86 return False
87 return True
88
89 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]:
90 read_size = 0
91 file.seek(start_offset + 1)
92 dictionary_size = convert_int32(file.read(4), Endian.LITTLE)
93 uncompressed_size = convert_int64(file.read(8), Endian.LITTLE)
94
95 if not self.is_valid_stream(dictionary_size, uncompressed_size):
96 raise InvalidInputFormat
97
98 file.seek(start_offset, io.SEEK_SET)
99 decompressor = lzma.LZMADecompressor(format=lzma.FORMAT_ALONE)
100
101 try:
102 while read_size < uncompressed_size and not decompressor.eof:
103 data = file.read(DEFAULT_BUFSIZE)
104 if not data:
105 if read_size < (uncompressed_size * MIN_READ_RATIO):
106 raise InvalidInputFormat("Very early truncated LZMA stream")
107
108 logger.debug(
109 "LZMA stream is truncated.",
110 read_size=read_size,
111 uncompressed_size=uncompressed_size,
112 )
113 break
114 read_size += len(decompressor.decompress(data))
115
116 except lzma.LZMAError as exc:
117 raise InvalidInputFormat from exc
118
119 end_offset = file.tell() - len(decompressor.unused_data)
120 compressed_size = end_offset - start_offset
121
122 if (read_size < compressed_size) or (compressed_size < MIN_COMPRESSED_SIZE):
123 raise InvalidInputFormat
124
125 return ValidChunk(
126 start_offset=start_offset,
127 end_offset=end_offset,
128 )