1import io
2import zlib
3from enum import IntEnum
4from typing import Optional
5
6from structlog import get_logger
7
8from unblob.extractors import Command
9
10from ...file_utils import Endian, convert_int32
11from ...models import (
12 File,
13 HandlerDoc,
14 HandlerType,
15 HexString,
16 Reference,
17 StructHandler,
18 ValidChunk,
19)
20
21logger = get_logger()
22
23MAGIC_LENGTH = 9
24CHECKSUM_LENGTH = 4
25
26
27# Header flags defined in lzop (http://www.lzop.org/) source in src/conf.h
28class HeaderFlags(IntEnum):
29 ADLER32_D = 0x00000001
30 ADLER32_C = 0x00000002
31 STDIN = 0x00000004
32 STDOUT = 0x00000008
33 NAME_DEFAULT = 0x00000010
34 DOSISH = 0x00000020
35 H_EXTRA_FIELD = 0x00000040
36 H_GMTDIFF = 0x00000080
37 CRC32_D = 0x00000100
38 CRC32_C = 0x00000200
39 MULTIPART = 0x00000400
40 H_FILTER = 0x00000800
41 H_CRC32 = 0x00001000
42 H_PATH = 0x00002000
43
44
45class LZOHandler(StructHandler):
46 NAME = "lzo"
47
48 PATTERNS = [HexString("89 4C 5A 4F 00 0D 0A 1A 0A")]
49
50 C_DEFINITIONS = r"""
51 typedef struct lzo_header_no_filter
52 {
53 char magic[9];
54 uint16 version;
55 uint16 libversion;
56 uint16 reqversion;
57 uint8 method;
58 uint8 level;
59 uint32 flags;
60 //uint32 filter; // only if flags & F_H_FILTER
61 uint32 mode;
62 uint32 mtime;
63 uint32 gmtdiff;
64 uint8 filename_len;
65 char filename[filename_len];
66 uint32 header_checksum; // (CRC32 if flags & F_H_CRC32 else Adler32)
67 } lzo_header_no_filter_t;
68
69 typedef struct lzo_header_filter
70 {
71 char magic[9];
72 uint16 version;
73 uint16 libversion;
74 uint16 reqversion;
75 uint8 method;
76 uint8 level;
77 uint32 flags;
78 uint32 filter; // only if flags & F_H_FILTER
79 uint32 mode;
80 uint32 mtime;
81 uint32 gmtdiff;
82 uint8 filename_len;
83 char filename[filename_len];
84 uint32 header_checksum; // (CRC32 if flags & F_H_CRC32 else Adler32)
85 } lzo_header_filter_t;
86 """
87 HEADER_STRUCT = "lzo_header"
88
89 EXTRACTOR = Command("lzop", "-d", "-f", "-f", "-N", "-p{outdir}", "{inpath}")
90
91 DOC = HandlerDoc(
92 name="LZO",
93 description="LZO is a data compression format featuring a simple header structure and optional checksum verification. It is optimized for fast decompression and supports various compression levels and flags for additional metadata.",
94 handler_type=HandlerType.COMPRESSION,
95 vendor=None,
96 references=[
97 Reference(
98 title="LZO File Format Documentation",
99 url="http://www.lzop.org/",
100 ),
101 Reference(
102 title="LZO Wikipedia",
103 url="https://en.wikipedia.org/wiki/Lempel%E2%80%93Ziv%E2%80%93Oberhumer",
104 ),
105 ],
106 limitations=[],
107 )
108
109 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]:
110 header = self.cparser_be.lzo_header_no_filter_t(file)
111 # maxmimum compression level is 9
112 if header.level > 9:
113 logger.debug("Invalid LZO header level", header=header, _verbosity=3)
114 return None
115
116 if header.flags & HeaderFlags.H_FILTER:
117 file.seek(start_offset)
118 header = self.cparser_be.lzo_header_filter_t(file)
119
120 logger.debug("LZO header parsed", header=header, _verbosity=3)
121
122 # Checksum excludes the magic and the checksum itself
123 if header.flags & HeaderFlags.H_CRC32:
124 calculated_checksum = zlib.crc32(
125 header.dumps()[MAGIC_LENGTH:-CHECKSUM_LENGTH]
126 )
127 else:
128 calculated_checksum = zlib.adler32(
129 header.dumps()[MAGIC_LENGTH:-CHECKSUM_LENGTH]
130 )
131
132 if header.header_checksum != calculated_checksum:
133 logger.debug("Header checksum verification failed")
134 return None
135
136 uncompressed_size = convert_int32(file.read(4), endian=Endian.BIG)
137 while uncompressed_size:
138 compressed_size = convert_int32(file.read(4), endian=Endian.BIG)
139
140 checksum_size = 0
141 if (
142 header.flags & HeaderFlags.ADLER32_D
143 or header.flags & HeaderFlags.CRC32_D
144 ):
145 checksum_size += CHECKSUM_LENGTH
146
147 if (
148 header.flags & HeaderFlags.ADLER32_C
149 or header.flags & HeaderFlags.CRC32_C
150 ):
151 checksum_size += CHECKSUM_LENGTH
152
153 file.seek(checksum_size + compressed_size, io.SEEK_CUR)
154 uncompressed_size = convert_int32(file.read(4), endian=Endian.BIG)
155
156 end_offset = file.tell()
157
158 return ValidChunk(start_offset=start_offset, end_offset=end_offset)