Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/compression/uzip.py: 56%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

66 statements  

1import lzma 

2import re 

3import zlib 

4from pathlib import Path 

5from typing import Callable, Optional 

6 

7import pyzstd 

8 

9from unblob.file_utils import ( 

10 Endian, 

11 FileSystem, 

12 InvalidInputFormat, 

13 StructParser, 

14 iterate_file, 

15) 

16from unblob.models import ( 

17 Extractor, 

18 ExtractResult, 

19 File, 

20 HandlerDoc, 

21 HandlerType, 

22 Reference, 

23 Regex, 

24 StructHandler, 

25 ValidChunk, 

26) 

27 

28# [Ref] https://github.com/freebsd/freebsd-src/tree/master/sys/geom/uzip 

29C_DEFINITIONS = r""" 

30 typedef struct uzip_header{ 

31 char magic[16]; 

32 char format[112]; 

33 uint32_t block_size; 

34 uint32_t block_count; 

35 uint64_t toc[block_count]; 

36 } uzip_header_t; 

37""" 

38 

39HEADER_STRUCT = "uzip_header_t" 

40 

41ZLIB_COMPRESSION = "#!/bin/sh\x0a#V2.0\x20" 

42LZMA_COMPRESSION = "#!/bin/sh\x0a#L3.0\x0a" 

43ZSTD_COMPRESSION = "#!/bin/sh\x0a#Z4.0\x20" 

44 

45 

46class Decompressor: 

47 DECOMPRESSOR: Callable 

48 

49 def __init__(self): 

50 self._decompressor = self.DECOMPRESSOR() 

51 

52 def decompress(self, data: bytes) -> bytes: 

53 return self._decompressor.decompress(data) 

54 

55 def flush(self) -> bytes: 

56 return b"" 

57 

58 

59class LZMADecompressor(Decompressor): 

60 DECOMPRESSOR = lzma.LZMADecompressor 

61 

62 

63class ZLIBDecompressor(Decompressor): 

64 DECOMPRESSOR = zlib.decompressobj 

65 

66 def flush(self) -> bytes: 

67 return self._decompressor.flush() 

68 

69 

70class ZSTDDecompressor(Decompressor): 

71 DECOMPRESSOR = pyzstd.EndlessZstdDecompressor 

72 

73 

74DECOMPRESS_METHOD: dict[bytes, type[Decompressor]] = { 

75 ZLIB_COMPRESSION.encode(): ZLIBDecompressor, 

76 LZMA_COMPRESSION.encode(): LZMADecompressor, 

77 ZSTD_COMPRESSION.encode(): ZSTDDecompressor, 

78} 

79 

80 

81class UZIPExtractor(Extractor): 

82 def extract(self, inpath: Path, outdir: Path): 

83 with File.from_path(inpath) as infile: 

84 parser = StructParser(C_DEFINITIONS) 

85 header = parser.parse(HEADER_STRUCT, infile, Endian.BIG) 

86 fs = FileSystem(outdir) 

87 outpath = Path(inpath.stem) 

88 

89 try: 

90 decompressor_cls = DECOMPRESS_METHOD[header.magic] 

91 except LookupError: 

92 raise InvalidInputFormat("unsupported compression format") from None 

93 

94 with fs.open(outpath, "wb+") as outfile: 

95 for current_offset, next_offset in zip(header.toc[:-1], header.toc[1:]): 

96 compressed_len = next_offset - current_offset 

97 if compressed_len == 0: 

98 continue 

99 decompressor = decompressor_cls() 

100 for chunk in iterate_file(infile, current_offset, compressed_len): 

101 outfile.write(decompressor.decompress(chunk)) 

102 outfile.write(decompressor.flush()) 

103 return ExtractResult(reports=fs.problems) 

104 

105 

106class UZIPHandler(StructHandler): 

107 NAME = "uzip" 

108 PATTERNS = [ 

109 Regex(re.escape(ZLIB_COMPRESSION)), 

110 Regex(re.escape(LZMA_COMPRESSION)), 

111 Regex(re.escape(ZSTD_COMPRESSION)), 

112 ] 

113 HEADER_STRUCT = HEADER_STRUCT 

114 C_DEFINITIONS = C_DEFINITIONS 

115 EXTRACTOR = UZIPExtractor() 

116 

117 DOC = HandlerDoc( 

118 name="UZIP", 

119 description="FreeBSD UZIP is a block-based compressed disk image format. It uses a table of contents to index compressed blocks, supporting ZLIB, LZMA, and ZSTD compression algorithms.", 

120 handler_type=HandlerType.COMPRESSION, 

121 vendor="FreeBSD", 

122 references=[ 

123 Reference( 

124 title="FreeBSD UZIP Documentation", 

125 url="https://github.com/freebsd/freebsd-src/tree/master/sys/geom/uzip", 

126 ), 

127 ], 

128 limitations=[], 

129 ) 

130 

131 def is_valid_header(self, header) -> bool: 

132 return ( 

133 header.block_count > 0 

134 and header.block_size > 0 

135 and header.block_size % 512 == 0 

136 ) 

137 

138 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]: 

139 header = self.parse_header(file, Endian.BIG) 

140 

141 if not self.is_valid_header(header): 

142 raise InvalidInputFormat("Invalid uzip header.") 

143 

144 # take the last TOC block offset, end of file is that block offset, 

145 # starting from the start offset 

146 end_offset = start_offset + header.toc[-1] 

147 return ValidChunk( 

148 start_offset=start_offset, 

149 end_offset=end_offset, 

150 )