1import lzma
2import re
3import zlib
4from pathlib import Path
5from typing import Callable, Optional
6
7import pyzstd
8
9from unblob.file_utils import (
10 Endian,
11 FileSystem,
12 InvalidInputFormat,
13 StructParser,
14 iterate_file,
15)
16from unblob.models import (
17 Extractor,
18 ExtractResult,
19 File,
20 HandlerDoc,
21 HandlerType,
22 Reference,
23 Regex,
24 StructHandler,
25 ValidChunk,
26)
27
28# [Ref] https://github.com/freebsd/freebsd-src/tree/master/sys/geom/uzip
29C_DEFINITIONS = r"""
30 typedef struct uzip_header{
31 char magic[16];
32 char format[112];
33 uint32_t block_size;
34 uint32_t block_count;
35 uint64_t toc[block_count];
36 } uzip_header_t;
37"""
38
39HEADER_STRUCT = "uzip_header_t"
40
41ZLIB_COMPRESSION = "#!/bin/sh\x0a#V2.0\x20"
42LZMA_COMPRESSION = "#!/bin/sh\x0a#L3.0\x0a"
43ZSTD_COMPRESSION = "#!/bin/sh\x0a#Z4.0\x20"
44
45
46class Decompressor:
47 DECOMPRESSOR: Callable
48
49 def __init__(self):
50 self._decompressor = self.DECOMPRESSOR()
51
52 def decompress(self, data: bytes) -> bytes:
53 return self._decompressor.decompress(data)
54
55 def flush(self) -> bytes:
56 return b""
57
58
59class LZMADecompressor(Decompressor):
60 DECOMPRESSOR = lzma.LZMADecompressor
61
62
63class ZLIBDecompressor(Decompressor):
64 DECOMPRESSOR = zlib.decompressobj
65
66 def flush(self) -> bytes:
67 return self._decompressor.flush()
68
69
70class ZSTDDecompressor(Decompressor):
71 DECOMPRESSOR = pyzstd.EndlessZstdDecompressor
72
73
74DECOMPRESS_METHOD: dict[bytes, type[Decompressor]] = {
75 ZLIB_COMPRESSION.encode(): ZLIBDecompressor,
76 LZMA_COMPRESSION.encode(): LZMADecompressor,
77 ZSTD_COMPRESSION.encode(): ZSTDDecompressor,
78}
79
80
81class UZIPExtractor(Extractor):
82 def extract(self, inpath: Path, outdir: Path):
83 with File.from_path(inpath) as infile:
84 parser = StructParser(C_DEFINITIONS)
85 header = parser.parse(HEADER_STRUCT, infile, Endian.BIG)
86 fs = FileSystem(outdir)
87 outpath = Path(inpath.stem)
88
89 try:
90 decompressor_cls = DECOMPRESS_METHOD[header.magic]
91 except LookupError:
92 raise InvalidInputFormat("unsupported compression format") from None
93
94 with fs.open(outpath, "wb+") as outfile:
95 for current_offset, next_offset in zip(header.toc[:-1], header.toc[1:]):
96 compressed_len = next_offset - current_offset
97 if compressed_len == 0:
98 continue
99 decompressor = decompressor_cls()
100 for chunk in iterate_file(infile, current_offset, compressed_len):
101 outfile.write(decompressor.decompress(chunk))
102 outfile.write(decompressor.flush())
103 return ExtractResult(reports=fs.problems)
104
105
106class UZIPHandler(StructHandler):
107 NAME = "uzip"
108 PATTERNS = [
109 Regex(re.escape(ZLIB_COMPRESSION)),
110 Regex(re.escape(LZMA_COMPRESSION)),
111 Regex(re.escape(ZSTD_COMPRESSION)),
112 ]
113 HEADER_STRUCT = HEADER_STRUCT
114 C_DEFINITIONS = C_DEFINITIONS
115 EXTRACTOR = UZIPExtractor()
116
117 DOC = HandlerDoc(
118 name="UZIP",
119 description="FreeBSD UZIP is a block-based compressed disk image format. It uses a table of contents to index compressed blocks, supporting ZLIB, LZMA, and ZSTD compression algorithms.",
120 handler_type=HandlerType.COMPRESSION,
121 vendor="FreeBSD",
122 references=[
123 Reference(
124 title="FreeBSD UZIP Documentation",
125 url="https://github.com/freebsd/freebsd-src/tree/master/sys/geom/uzip",
126 ),
127 ],
128 limitations=[],
129 )
130
131 def is_valid_header(self, header) -> bool:
132 return (
133 header.block_count > 0
134 and header.block_size > 0
135 and header.block_size % 512 == 0
136 )
137
138 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]:
139 header = self.parse_header(file, Endian.BIG)
140
141 if not self.is_valid_header(header):
142 raise InvalidInputFormat("Invalid uzip header.")
143
144 # take the last TOC block offset, end of file is that block offset,
145 # starting from the start offset
146 end_offset = start_offset + header.toc[-1]
147 return ValidChunk(
148 start_offset=start_offset,
149 end_offset=end_offset,
150 )