Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/compression/_ucl.py: 16%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

77 statements  

1END_OF_STREAM_MARKER = 0xFF_FF_FF_FF 

2 

3 

4class BitReader: 

5 def __init__(self, data: bytes, start: int = 0): 

6 self.data = data 

7 self.index = start # index in the data (byte-wise) 

8 self.bb = 0 # bit buffer 

9 self.bc = 0 # number of bits remaining in the buffer 

10 

11 def get_bit(self) -> int: 

12 """Return the next bit from the data stream (0 or 1). When the bit buffer is empty, loads the next byte.""" 

13 if self.bc == 0: 

14 if self.index >= len(self.data): 

15 raise ValueError("Unexpected end of data while reading bit") 

16 self.bb = self.data[self.index] 

17 self.index += 1 

18 self.bc = 8 

19 self.bc -= 1 

20 return (self.bb >> self.bc) & 1 

21 

22 def read_byte(self) -> int: 

23 """Read and return the next full byte from the data stream. This does not take into account any bits already buffered.""" 

24 if self.index >= len(self.data): 

25 raise ValueError("Unexpected end of data while reading byte") 

26 b = self.data[self.index] 

27 self.index += 1 

28 return b 

29 

30 

31class UCLDecompressor: 

32 def __init__(self): 

33 self._reader: BitReader = BitReader(b"") 

34 self._output: bytearray = bytearray() 

35 self._last_match_offset: int = -1 

36 self._match_offset: int = -1 

37 self._match_length: int = -1 

38 

39 def _process_literal_run(self) -> None: 

40 """Process a run of literal bytes while the next bit is 1.""" 

41 while self._reader.get_bit(): 

42 self._output.append(self._reader.read_byte()) 

43 

44 def _decode_match_offset(self) -> int: 

45 """Decode the match offset value from the bit stream.""" 

46 match_offset = 1 

47 while True: 

48 match_offset = (match_offset << 1) + self._reader.get_bit() 

49 if self._reader.get_bit() == 1: 

50 break 

51 return match_offset 

52 

53 def _decode_match_length(self) -> int: 

54 """Decode the match length value from the bit stream.""" 

55 # Get a two-bit base for the match length 

56 match_length = (self._reader.get_bit() << 1) + self._reader.get_bit() 

57 

58 if match_length == 0: 

59 match_length += 1 

60 # Read extra bits until a terminating 1 is encountered 

61 while True: 

62 match_length = (match_length << 1) + self._reader.get_bit() 

63 if self._reader.get_bit() == 1: 

64 break 

65 match_length += 2 

66 

67 # If the match offset is large, add an extra byte to the length. 

68 if self._match_offset > 0xD00: 

69 match_length += 1 

70 

71 return match_length 

72 

73 def _copy_match_data(self) -> None: 

74 """Copy match data from the already decompressed output.""" 

75 match_index = len(self._output) - self._match_offset 

76 if match_index < 0: 

77 raise ValueError("Invalid match offset") 

78 

79 # Copy one byte unconditionally 

80 self._output.append(self._output[match_index]) 

81 match_index += 1 

82 

83 # Then copy match_length bytes (the regions may overlap) 

84 for _ in range(self._match_length): 

85 self._output.append(self._output[match_index]) 

86 match_index += 1 

87 

88 def decompress(self, compressed: bytes) -> bytes: 

89 """UCL decompression using NRV2B mode.""" 

90 self._reader = BitReader(compressed, start=0) 

91 self._output = bytearray() 

92 self._last_match_offset = 1 

93 

94 while True: 

95 self._process_literal_run() 

96 self._match_offset = self._decode_match_offset() 

97 

98 if self._match_offset == 2: 

99 self._match_offset = self._last_match_offset 

100 else: 

101 # Read an extra byte to complete the offset. 

102 self._match_offset = ( 

103 self._match_offset - 3 

104 ) * 256 + self._reader.read_byte() 

105 if self._match_offset == END_OF_STREAM_MARKER: 

106 break 

107 self._match_offset += 1 

108 self._last_match_offset = self._match_offset 

109 

110 self._match_length = self._decode_match_length() 

111 self._copy_match_data() 

112 

113 return bytes(self._output)