1END_OF_STREAM_MARKER = 0xFF_FF_FF_FF
2
3
4class BitReader:
5 def __init__(self, data: bytes, start: int = 0):
6 self.data = data
7 self.index = start # index in the data (byte-wise)
8 self.bb = 0 # bit buffer
9 self.bc = 0 # number of bits remaining in the buffer
10
11 def get_bit(self) -> int:
12 """Return the next bit from the data stream (0 or 1). When the bit buffer is empty, loads the next byte."""
13 if self.bc == 0:
14 if self.index >= len(self.data):
15 raise ValueError("Unexpected end of data while reading bit")
16 self.bb = self.data[self.index]
17 self.index += 1
18 self.bc = 8
19 self.bc -= 1
20 return (self.bb >> self.bc) & 1
21
22 def read_byte(self) -> int:
23 """Read and return the next full byte from the data stream. This does not take into account any bits already buffered."""
24 if self.index >= len(self.data):
25 raise ValueError("Unexpected end of data while reading byte")
26 b = self.data[self.index]
27 self.index += 1
28 return b
29
30
31class UCLDecompressor:
32 def __init__(self):
33 self._reader: BitReader = BitReader(b"")
34 self._output: bytearray = bytearray()
35 self._last_match_offset: int = -1
36 self._match_offset: int = -1
37 self._match_length: int = -1
38
39 def _process_literal_run(self) -> None:
40 """Process a run of literal bytes while the next bit is 1."""
41 while self._reader.get_bit():
42 self._output.append(self._reader.read_byte())
43
44 def _decode_match_offset(self) -> int:
45 """Decode the match offset value from the bit stream."""
46 match_offset = 1
47 while True:
48 match_offset = (match_offset << 1) + self._reader.get_bit()
49 if self._reader.get_bit() == 1:
50 break
51 return match_offset
52
53 def _decode_match_length(self) -> int:
54 """Decode the match length value from the bit stream."""
55 # Get a two-bit base for the match length
56 match_length = (self._reader.get_bit() << 1) + self._reader.get_bit()
57
58 if match_length == 0:
59 match_length += 1
60 # Read extra bits until a terminating 1 is encountered
61 while True:
62 match_length = (match_length << 1) + self._reader.get_bit()
63 if self._reader.get_bit() == 1:
64 break
65 match_length += 2
66
67 # If the match offset is large, add an extra byte to the length.
68 if self._match_offset > 0xD00:
69 match_length += 1
70
71 return match_length
72
73 def _copy_match_data(self) -> None:
74 """Copy match data from the already decompressed output."""
75 match_index = len(self._output) - self._match_offset
76 if match_index < 0:
77 raise ValueError("Invalid match offset")
78
79 # Copy one byte unconditionally
80 self._output.append(self._output[match_index])
81 match_index += 1
82
83 # Then copy match_length bytes (the regions may overlap)
84 for _ in range(self._match_length):
85 self._output.append(self._output[match_index])
86 match_index += 1
87
88 def decompress(self, compressed: bytes) -> bytes:
89 """UCL decompression using NRV2B mode."""
90 self._reader = BitReader(compressed, start=0)
91 self._output = bytearray()
92 self._last_match_offset = 1
93
94 while True:
95 self._process_literal_run()
96 self._match_offset = self._decode_match_offset()
97
98 if self._match_offset == 2:
99 self._match_offset = self._last_match_offset
100 else:
101 # Read an extra byte to complete the offset.
102 self._match_offset = (
103 self._match_offset - 3
104 ) * 256 + self._reader.read_byte()
105 if self._match_offset == END_OF_STREAM_MARKER:
106 break
107 self._match_offset += 1
108 self._last_match_offset = self._match_offset
109
110 self._match_length = self._decode_match_length()
111 self._copy_match_data()
112
113 return bytes(self._output)