Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/compression/uzip.py: 56%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

66 statements  

1import lzma 

2import re 

3import zlib 

4from collections.abc import Callable 

5from pathlib import Path 

6 

7import pyzstd 

8 

9from unblob.file_utils import ( 

10 Endian, 

11 FileSystem, 

12 InvalidInputFormat, 

13 StructParser, 

14 iterate_file, 

15) 

16from unblob.models import ( 

17 Extractor, 

18 ExtractResult, 

19 File, 

20 HandlerDoc, 

21 HandlerType, 

22 Reference, 

23 Regex, 

24 StructHandler, 

25 ValidChunk, 

26) 

27 

28# [Ref] https://github.com/freebsd/freebsd-src/tree/master/sys/geom/uzip 

29C_DEFINITIONS = r""" 

30 typedef struct uzip_header{ 

31 char magic[16]; 

32 char format[112]; 

33 uint32_t block_size; 

34 uint32_t block_count; 

35 uint64_t toc[block_count]; 

36 } uzip_header_t; 

37""" 

38 

39HEADER_STRUCT = "uzip_header_t" 

40 

41ZLIB_COMPRESSION = "#!/bin/sh\x0a#V2.0\x20" 

42LZMA_COMPRESSION = "#!/bin/sh\x0a#L3.0\x0a" 

43ZSTD_COMPRESSION = "#!/bin/sh\x0a#Z4.0\x20" 

44 

45 

46class Decompressor: 

47 DECOMPRESSOR: Callable 

48 

49 def __init__(self): 

50 self._decompressor = self.DECOMPRESSOR() 

51 

52 def decompress(self, data: bytes) -> bytes: 

53 return self._decompressor.decompress(data) 

54 

55 def flush(self) -> bytes: 

56 return b"" 

57 

58 

59class LZMADecompressor(Decompressor): 

60 DECOMPRESSOR = lzma.LZMADecompressor 

61 

62 

63class ZLIBDecompressor(Decompressor): 

64 DECOMPRESSOR = zlib.decompressobj 

65 

66 def flush(self) -> bytes: 

67 return self._decompressor.flush() 

68 

69 

70class ZSTDDecompressor(Decompressor): 

71 DECOMPRESSOR = pyzstd.EndlessZstdDecompressor 

72 

73 

74DECOMPRESS_METHOD: dict[bytes, type[Decompressor]] = { 

75 ZLIB_COMPRESSION.encode(): ZLIBDecompressor, 

76 LZMA_COMPRESSION.encode(): LZMADecompressor, 

77 ZSTD_COMPRESSION.encode(): ZSTDDecompressor, 

78} 

79 

80 

81class UZIPExtractor(Extractor): 

82 def extract(self, inpath: Path, outdir: Path): 

83 with File.from_path(inpath) as infile: 

84 parser = StructParser(C_DEFINITIONS) 

85 header = parser.parse(HEADER_STRUCT, infile, Endian.BIG) 

86 fs = FileSystem(outdir) 

87 outpath = Path(inpath.stem) 

88 

89 try: 

90 decompressor_cls = DECOMPRESS_METHOD[header.magic] 

91 except LookupError: 

92 raise InvalidInputFormat("unsupported compression format") from None 

93 

94 with fs.open(outpath, "wb+") as outfile: 

95 for current_offset, next_offset in zip( 

96 header.toc[:-1], header.toc[1:], strict=False 

97 ): 

98 compressed_len = next_offset - current_offset 

99 if compressed_len == 0: 

100 continue 

101 decompressor = decompressor_cls() 

102 for chunk in iterate_file(infile, current_offset, compressed_len): 

103 outfile.write(decompressor.decompress(chunk)) 

104 outfile.write(decompressor.flush()) 

105 return ExtractResult(reports=fs.problems) 

106 

107 

108class UZIPHandler(StructHandler): 

109 NAME = "uzip" 

110 PATTERNS = [ 

111 Regex(re.escape(ZLIB_COMPRESSION)), 

112 Regex(re.escape(LZMA_COMPRESSION)), 

113 Regex(re.escape(ZSTD_COMPRESSION)), 

114 ] 

115 HEADER_STRUCT = HEADER_STRUCT 

116 C_DEFINITIONS = C_DEFINITIONS 

117 EXTRACTOR = UZIPExtractor() 

118 

119 DOC = HandlerDoc( 

120 name="UZIP", 

121 description="FreeBSD UZIP is a block-based compressed disk image format. It uses a table of contents to index compressed blocks, supporting ZLIB, LZMA, and ZSTD compression algorithms.", 

122 handler_type=HandlerType.COMPRESSION, 

123 vendor="FreeBSD", 

124 references=[ 

125 Reference( 

126 title="FreeBSD UZIP Documentation", 

127 url="https://github.com/freebsd/freebsd-src/tree/master/sys/geom/uzip", 

128 ), 

129 ], 

130 limitations=[], 

131 ) 

132 

133 def is_valid_header(self, header) -> bool: 

134 return ( 

135 header.block_count > 0 

136 and header.block_size > 0 

137 and header.block_size % 512 == 0 

138 ) 

139 

140 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None: 

141 header = self.parse_header(file, Endian.BIG) 

142 

143 if not self.is_valid_header(header): 

144 raise InvalidInputFormat("Invalid uzip header.") 

145 

146 # take the last TOC block offset, end of file is that block offset, 

147 # starting from the start offset 

148 end_offset = start_offset + header.toc[-1] 

149 return ValidChunk( 

150 start_offset=start_offset, 

151 end_offset=end_offset, 

152 )