Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/compression/lzo.py: 56%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

59 statements  

1import io 

2import zlib 

3from enum import IntEnum 

4 

5from structlog import get_logger 

6 

7from unblob.extractors import Command 

8 

9from ...file_utils import Endian, convert_int32 

10from ...models import ( 

11 File, 

12 HandlerDoc, 

13 HandlerType, 

14 HexString, 

15 Reference, 

16 StructHandler, 

17 ValidChunk, 

18) 

19 

20logger = get_logger() 

21 

22MAGIC_LENGTH = 9 

23CHECKSUM_LENGTH = 4 

24 

25 

26# Header flags defined in lzop (http://www.lzop.org/) source in src/conf.h 

27class HeaderFlags(IntEnum): 

28 ADLER32_D = 0x00000001 

29 ADLER32_C = 0x00000002 

30 STDIN = 0x00000004 

31 STDOUT = 0x00000008 

32 NAME_DEFAULT = 0x00000010 

33 DOSISH = 0x00000020 

34 H_EXTRA_FIELD = 0x00000040 

35 H_GMTDIFF = 0x00000080 

36 CRC32_D = 0x00000100 

37 CRC32_C = 0x00000200 

38 MULTIPART = 0x00000400 

39 H_FILTER = 0x00000800 

40 H_CRC32 = 0x00001000 

41 H_PATH = 0x00002000 

42 

43 

44class LZOHandler(StructHandler): 

45 NAME = "lzo" 

46 

47 PATTERNS = [HexString("89 4C 5A 4F 00 0D 0A 1A 0A")] 

48 

49 C_DEFINITIONS = r""" 

50 typedef struct lzo_header_no_filter 

51 { 

52 char magic[9]; 

53 uint16 version; 

54 uint16 libversion; 

55 uint16 reqversion; 

56 uint8 method; 

57 uint8 level; 

58 uint32 flags; 

59 //uint32 filter; // only if flags & F_H_FILTER 

60 uint32 mode; 

61 uint32 mtime; 

62 uint32 gmtdiff; 

63 uint8 filename_len; 

64 char filename[filename_len]; 

65 uint32 header_checksum; // (CRC32 if flags & F_H_CRC32 else Adler32) 

66 } lzo_header_no_filter_t; 

67 

68 typedef struct lzo_header_filter 

69 { 

70 char magic[9]; 

71 uint16 version; 

72 uint16 libversion; 

73 uint16 reqversion; 

74 uint8 method; 

75 uint8 level; 

76 uint32 flags; 

77 uint32 filter; // only if flags & F_H_FILTER 

78 uint32 mode; 

79 uint32 mtime; 

80 uint32 gmtdiff; 

81 uint8 filename_len; 

82 char filename[filename_len]; 

83 uint32 header_checksum; // (CRC32 if flags & F_H_CRC32 else Adler32) 

84 } lzo_header_filter_t; 

85 """ 

86 HEADER_STRUCT = "lzo_header" 

87 

88 EXTRACTOR = Command("lzop", "-d", "-f", "-f", "-N", "-p{outdir}", "{inpath}") 

89 

90 DOC = HandlerDoc( 

91 name="LZO", 

92 description="LZO is a data compression format featuring a simple header structure and optional checksum verification. It is optimized for fast decompression and supports various compression levels and flags for additional metadata.", 

93 handler_type=HandlerType.COMPRESSION, 

94 vendor=None, 

95 references=[ 

96 Reference( 

97 title="LZO File Format Documentation", 

98 url="http://www.lzop.org/", 

99 ), 

100 Reference( 

101 title="LZO Wikipedia", 

102 url="https://en.wikipedia.org/wiki/Lempel%E2%80%93Ziv%E2%80%93Oberhumer", 

103 ), 

104 ], 

105 limitations=[], 

106 ) 

107 

108 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None: 

109 header = self.cparser_be.lzo_header_no_filter_t(file) 

110 # maxmimum compression level is 9 

111 if header.level > 9: 

112 logger.debug("Invalid LZO header level", header=header, _verbosity=3) 

113 return None 

114 

115 if header.flags & HeaderFlags.H_FILTER: 

116 file.seek(start_offset) 

117 header = self.cparser_be.lzo_header_filter_t(file) 

118 

119 logger.debug("LZO header parsed", header=header, _verbosity=3) 

120 

121 # Checksum excludes the magic and the checksum itself 

122 if header.flags & HeaderFlags.H_CRC32: 

123 calculated_checksum = zlib.crc32( 

124 header.dumps()[MAGIC_LENGTH:-CHECKSUM_LENGTH] 

125 ) 

126 else: 

127 calculated_checksum = zlib.adler32( 

128 header.dumps()[MAGIC_LENGTH:-CHECKSUM_LENGTH] 

129 ) 

130 

131 if header.header_checksum != calculated_checksum: 

132 logger.debug("Header checksum verification failed") 

133 return None 

134 

135 uncompressed_size = convert_int32(file.read(4), endian=Endian.BIG) 

136 while uncompressed_size: 

137 compressed_size = convert_int32(file.read(4), endian=Endian.BIG) 

138 

139 checksum_size = 0 

140 if ( 

141 header.flags & HeaderFlags.ADLER32_D 

142 or header.flags & HeaderFlags.CRC32_D 

143 ): 

144 checksum_size += CHECKSUM_LENGTH 

145 

146 if ( 

147 header.flags & HeaderFlags.ADLER32_C 

148 or header.flags & HeaderFlags.CRC32_C 

149 ): 

150 checksum_size += CHECKSUM_LENGTH 

151 

152 file.seek(checksum_size + compressed_size, io.SEEK_CUR) 

153 uncompressed_size = convert_int32(file.read(4), endian=Endian.BIG) 

154 

155 end_offset = file.tell() 

156 

157 return ValidChunk(start_offset=start_offset, end_offset=end_offset)