Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/compression/lzo.py: 57%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

60 statements  

1import io 

2import zlib 

3from enum import IntEnum 

4from typing import Optional 

5 

6from structlog import get_logger 

7 

8from unblob.extractors import Command 

9 

10from ...file_utils import Endian, convert_int32 

11from ...models import ( 

12 File, 

13 HandlerDoc, 

14 HandlerType, 

15 HexString, 

16 Reference, 

17 StructHandler, 

18 ValidChunk, 

19) 

20 

21logger = get_logger() 

22 

23MAGIC_LENGTH = 9 

24CHECKSUM_LENGTH = 4 

25 

26 

27# Header flags defined in lzop (http://www.lzop.org/) source in src/conf.h 

28class HeaderFlags(IntEnum): 

29 ADLER32_D = 0x00000001 

30 ADLER32_C = 0x00000002 

31 STDIN = 0x00000004 

32 STDOUT = 0x00000008 

33 NAME_DEFAULT = 0x00000010 

34 DOSISH = 0x00000020 

35 H_EXTRA_FIELD = 0x00000040 

36 H_GMTDIFF = 0x00000080 

37 CRC32_D = 0x00000100 

38 CRC32_C = 0x00000200 

39 MULTIPART = 0x00000400 

40 H_FILTER = 0x00000800 

41 H_CRC32 = 0x00001000 

42 H_PATH = 0x00002000 

43 

44 

45class LZOHandler(StructHandler): 

46 NAME = "lzo" 

47 

48 PATTERNS = [HexString("89 4C 5A 4F 00 0D 0A 1A 0A")] 

49 

50 C_DEFINITIONS = r""" 

51 typedef struct lzo_header_no_filter 

52 { 

53 char magic[9]; 

54 uint16 version; 

55 uint16 libversion; 

56 uint16 reqversion; 

57 uint8 method; 

58 uint8 level; 

59 uint32 flags; 

60 //uint32 filter; // only if flags & F_H_FILTER 

61 uint32 mode; 

62 uint32 mtime; 

63 uint32 gmtdiff; 

64 uint8 filename_len; 

65 char filename[filename_len]; 

66 uint32 header_checksum; // (CRC32 if flags & F_H_CRC32 else Adler32) 

67 } lzo_header_no_filter_t; 

68 

69 typedef struct lzo_header_filter 

70 { 

71 char magic[9]; 

72 uint16 version; 

73 uint16 libversion; 

74 uint16 reqversion; 

75 uint8 method; 

76 uint8 level; 

77 uint32 flags; 

78 uint32 filter; // only if flags & F_H_FILTER 

79 uint32 mode; 

80 uint32 mtime; 

81 uint32 gmtdiff; 

82 uint8 filename_len; 

83 char filename[filename_len]; 

84 uint32 header_checksum; // (CRC32 if flags & F_H_CRC32 else Adler32) 

85 } lzo_header_filter_t; 

86 """ 

87 HEADER_STRUCT = "lzo_header" 

88 

89 EXTRACTOR = Command("lzop", "-d", "-f", "-f", "-N", "-p{outdir}", "{inpath}") 

90 

91 DOC = HandlerDoc( 

92 name="LZO", 

93 description="LZO is a data compression format featuring a simple header structure and optional checksum verification. It is optimized for fast decompression and supports various compression levels and flags for additional metadata.", 

94 handler_type=HandlerType.COMPRESSION, 

95 vendor=None, 

96 references=[ 

97 Reference( 

98 title="LZO File Format Documentation", 

99 url="http://www.lzop.org/", 

100 ), 

101 Reference( 

102 title="LZO Wikipedia", 

103 url="https://en.wikipedia.org/wiki/Lempel%E2%80%93Ziv%E2%80%93Oberhumer", 

104 ), 

105 ], 

106 limitations=[], 

107 ) 

108 

109 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]: 

110 header = self.cparser_be.lzo_header_no_filter_t(file) 

111 # maxmimum compression level is 9 

112 if header.level > 9: 

113 logger.debug("Invalid LZO header level", header=header, _verbosity=3) 

114 return None 

115 

116 if header.flags & HeaderFlags.H_FILTER: 

117 file.seek(start_offset) 

118 header = self.cparser_be.lzo_header_filter_t(file) 

119 

120 logger.debug("LZO header parsed", header=header, _verbosity=3) 

121 

122 # Checksum excludes the magic and the checksum itself 

123 if header.flags & HeaderFlags.H_CRC32: 

124 calculated_checksum = zlib.crc32( 

125 header.dumps()[MAGIC_LENGTH:-CHECKSUM_LENGTH] 

126 ) 

127 else: 

128 calculated_checksum = zlib.adler32( 

129 header.dumps()[MAGIC_LENGTH:-CHECKSUM_LENGTH] 

130 ) 

131 

132 if header.header_checksum != calculated_checksum: 

133 logger.debug("Header checksum verification failed") 

134 return None 

135 

136 uncompressed_size = convert_int32(file.read(4), endian=Endian.BIG) 

137 while uncompressed_size: 

138 compressed_size = convert_int32(file.read(4), endian=Endian.BIG) 

139 

140 checksum_size = 0 

141 if ( 

142 header.flags & HeaderFlags.ADLER32_D 

143 or header.flags & HeaderFlags.CRC32_D 

144 ): 

145 checksum_size += CHECKSUM_LENGTH 

146 

147 if ( 

148 header.flags & HeaderFlags.ADLER32_C 

149 or header.flags & HeaderFlags.CRC32_C 

150 ): 

151 checksum_size += CHECKSUM_LENGTH 

152 

153 file.seek(checksum_size + compressed_size, io.SEEK_CUR) 

154 uncompressed_size = convert_int32(file.read(4), endian=Endian.BIG) 

155 

156 end_offset = file.tell() 

157 

158 return ValidChunk(start_offset=start_offset, end_offset=end_offset)