1from __future__ import annotations
2
3from typing import TYPE_CHECKING, Any, BinaryIO
4
5from dissect.cstruct.types.base import BaseType
6
7if TYPE_CHECKING:
8 from typing_extensions import Self
9
10
11class LEB128(int, BaseType):
12 """Variable-length code compression to store an arbitrarily large integer in a small number of bytes.
13
14 See https://en.wikipedia.org/wiki/LEB128 for more information and an explanation of the algorithm.
15 """
16
17 signed: bool
18
19 @classmethod
20 def _read(cls, stream: BinaryIO, context: dict[str, Any] | None = None) -> Self:
21 result = 0
22 shift = 0
23 while True:
24 b = stream.read(1)
25 if b == b"":
26 raise EOFError("EOF reached, while final LEB128 byte was not yet read")
27
28 b = ord(b)
29 result |= (b & 0x7F) << shift
30 shift += 7
31 if (b & 0x80) == 0:
32 break
33
34 if cls.signed and b & 0x40 != 0:
35 result |= ~0 << shift
36
37 return cls.__new__(cls, result)
38
39 @classmethod
40 def _read_0(cls, stream: BinaryIO, context: dict[str, Any] | None = None) -> list[Self]:
41 result = []
42
43 while True:
44 if (value := cls._read(stream, context)) == 0:
45 break
46
47 result.append(value)
48
49 return result
50
51 @classmethod
52 def _write(cls, stream: BinaryIO, data: int) -> int:
53 # only write negative numbers when in signed mode
54 if data < 0 and not cls.signed:
55 raise ValueError("Attempt to encode a negative integer using unsigned LEB128 encoding")
56
57 result = bytearray()
58 while True:
59 # low-order 7 bits of value
60 byte = data & 0x7F
61 data = data >> 7
62
63 # function works similar for signed- and unsigned integers, except for the check when to stop
64 # the encoding process.
65 if ((cls.signed and (data == 0 and byte & 0x40 == 0)) or (data == -1 and byte & 0x40 != 0)) or (
66 not cls.signed and data == 0
67 ):
68 result.append(byte)
69 break
70
71 # Set high-order bit of byte
72 result.append(0x80 | byte)
73
74 stream.write(result)
75 return len(result)