Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dissect/cstruct/types/leb128.py: 27%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

45 statements  

1from __future__ import annotations 

2 

3from typing import TYPE_CHECKING, Any, BinaryIO 

4 

5from dissect.cstruct.types.base import BaseType 

6 

7if TYPE_CHECKING: 

8 from typing_extensions import Self 

9 

10 

11class LEB128(int, BaseType): 

12 """Variable-length code compression to store an arbitrarily large integer in a small number of bytes. 

13 

14 See https://en.wikipedia.org/wiki/LEB128 for more information and an explanation of the algorithm. 

15 """ 

16 

17 signed: bool 

18 

19 @classmethod 

20 def _read(cls, stream: BinaryIO, context: dict[str, Any] | None = None) -> Self: 

21 result = 0 

22 shift = 0 

23 while True: 

24 b = stream.read(1) 

25 if b == b"": 

26 raise EOFError("EOF reached, while final LEB128 byte was not yet read") 

27 

28 b = ord(b) 

29 result |= (b & 0x7F) << shift 

30 shift += 7 

31 if (b & 0x80) == 0: 

32 break 

33 

34 if cls.signed and b & 0x40 != 0: 

35 result |= ~0 << shift 

36 

37 return cls.__new__(cls, result) 

38 

39 @classmethod 

40 def _read_0(cls, stream: BinaryIO, context: dict[str, Any] | None = None) -> list[Self]: 

41 result = [] 

42 

43 while True: 

44 if (value := cls._read(stream, context)) == 0: 

45 break 

46 

47 result.append(value) 

48 

49 return result 

50 

51 @classmethod 

52 def _write(cls, stream: BinaryIO, data: int) -> int: 

53 # only write negative numbers when in signed mode 

54 if data < 0 and not cls.signed: 

55 raise ValueError("Attempt to encode a negative integer using unsigned LEB128 encoding") 

56 

57 result = bytearray() 

58 while True: 

59 # low-order 7 bits of value 

60 byte = data & 0x7F 

61 data = data >> 7 

62 

63 # function works similar for signed- and unsigned integers, except for the check when to stop 

64 # the encoding process. 

65 if ((cls.signed and (data == 0 and byte & 0x40 == 0)) or (data == -1 and byte & 0x40 != 0)) or ( 

66 not cls.signed and data == 0 

67 ): 

68 result.append(byte) 

69 break 

70 

71 # Set high-order bit of byte 

72 result.append(0x80 | byte) 

73 

74 stream.write(result) 

75 return len(result)