1import io
2import zlib
3from enum import IntEnum
4
5from structlog import get_logger
6
7from unblob.extractors import Command
8
9from ...file_utils import Endian, convert_int32
10from ...models import (
11 File,
12 HandlerDoc,
13 HandlerType,
14 HexString,
15 Reference,
16 StructHandler,
17 ValidChunk,
18)
19
20logger = get_logger()
21
22MAGIC_LENGTH = 9
23CHECKSUM_LENGTH = 4
24
25
26# Header flags defined in lzop (http://www.lzop.org/) source in src/conf.h
27class HeaderFlags(IntEnum):
28 ADLER32_D = 0x00000001
29 ADLER32_C = 0x00000002
30 STDIN = 0x00000004
31 STDOUT = 0x00000008
32 NAME_DEFAULT = 0x00000010
33 DOSISH = 0x00000020
34 H_EXTRA_FIELD = 0x00000040
35 H_GMTDIFF = 0x00000080
36 CRC32_D = 0x00000100
37 CRC32_C = 0x00000200
38 MULTIPART = 0x00000400
39 H_FILTER = 0x00000800
40 H_CRC32 = 0x00001000
41 H_PATH = 0x00002000
42
43
44class LZOHandler(StructHandler):
45 NAME = "lzo"
46
47 PATTERNS = [HexString("89 4C 5A 4F 00 0D 0A 1A 0A")]
48
49 C_DEFINITIONS = r"""
50 typedef struct lzo_header_no_filter
51 {
52 char magic[9];
53 uint16 version;
54 uint16 libversion;
55 uint16 reqversion;
56 uint8 method;
57 uint8 level;
58 uint32 flags;
59 //uint32 filter; // only if flags & F_H_FILTER
60 uint32 mode;
61 uint32 mtime;
62 uint32 gmtdiff;
63 uint8 filename_len;
64 char filename[filename_len];
65 uint32 header_checksum; // (CRC32 if flags & F_H_CRC32 else Adler32)
66 } lzo_header_no_filter_t;
67
68 typedef struct lzo_header_filter
69 {
70 char magic[9];
71 uint16 version;
72 uint16 libversion;
73 uint16 reqversion;
74 uint8 method;
75 uint8 level;
76 uint32 flags;
77 uint32 filter; // only if flags & F_H_FILTER
78 uint32 mode;
79 uint32 mtime;
80 uint32 gmtdiff;
81 uint8 filename_len;
82 char filename[filename_len];
83 uint32 header_checksum; // (CRC32 if flags & F_H_CRC32 else Adler32)
84 } lzo_header_filter_t;
85 """
86 HEADER_STRUCT = "lzo_header"
87
88 EXTRACTOR = Command("lzop", "-d", "-f", "-f", "-N", "-p{outdir}", "{inpath}")
89
90 DOC = HandlerDoc(
91 name="LZO",
92 description="LZO is a data compression format featuring a simple header structure and optional checksum verification. It is optimized for fast decompression and supports various compression levels and flags for additional metadata.",
93 handler_type=HandlerType.COMPRESSION,
94 vendor=None,
95 references=[
96 Reference(
97 title="LZO File Format Documentation",
98 url="http://www.lzop.org/",
99 ),
100 Reference(
101 title="LZO Wikipedia",
102 url="https://en.wikipedia.org/wiki/Lempel%E2%80%93Ziv%E2%80%93Oberhumer",
103 ),
104 ],
105 limitations=[],
106 )
107
108 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None:
109 header = self.cparser_be.lzo_header_no_filter_t(file)
110 # maxmimum compression level is 9
111 if header.level > 9:
112 logger.debug("Invalid LZO header level", header=header, _verbosity=3)
113 return None
114
115 if header.flags & HeaderFlags.H_FILTER:
116 file.seek(start_offset)
117 header = self.cparser_be.lzo_header_filter_t(file)
118
119 logger.debug("LZO header parsed", header=header, _verbosity=3)
120
121 # Checksum excludes the magic and the checksum itself
122 if header.flags & HeaderFlags.H_CRC32:
123 calculated_checksum = zlib.crc32(
124 header.dumps()[MAGIC_LENGTH:-CHECKSUM_LENGTH]
125 )
126 else:
127 calculated_checksum = zlib.adler32(
128 header.dumps()[MAGIC_LENGTH:-CHECKSUM_LENGTH]
129 )
130
131 if header.header_checksum != calculated_checksum:
132 logger.debug("Header checksum verification failed")
133 return None
134
135 uncompressed_size = convert_int32(file.read(4), endian=Endian.BIG)
136 while uncompressed_size:
137 compressed_size = convert_int32(file.read(4), endian=Endian.BIG)
138
139 checksum_size = 0
140 if (
141 header.flags & HeaderFlags.ADLER32_D
142 or header.flags & HeaderFlags.CRC32_D
143 ):
144 checksum_size += CHECKSUM_LENGTH
145
146 if (
147 header.flags & HeaderFlags.ADLER32_C
148 or header.flags & HeaderFlags.CRC32_C
149 ):
150 checksum_size += CHECKSUM_LENGTH
151
152 file.seek(checksum_size + compressed_size, io.SEEK_CUR)
153 uncompressed_size = convert_int32(file.read(4), endian=Endian.BIG)
154
155 end_offset = file.tell()
156
157 return ValidChunk(start_offset=start_offset, end_offset=end_offset)