Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dissect/cstruct/types/packed.py: 87%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

47 statements  

1from __future__ import annotations 

2 

3from functools import lru_cache 

4from struct import Struct 

5from typing import TYPE_CHECKING, Any, BinaryIO, Generic, TypeVar 

6 

7from dissect.cstruct.types.base import EOF, BaseType 

8 

9if TYPE_CHECKING: 

10 from typing_extensions import Self 

11 

12 

13@lru_cache(1024) 

14def _struct(endian: str, packchar: str) -> Struct: 

15 return Struct(f"{endian}{packchar}") 

16 

17 

18T = TypeVar("T", int, float) 

19 

20 

21class Packed(BaseType, Generic[T]): 

22 """Packed type for Python struct (un)packing.""" 

23 

24 packchar: str 

25 

26 @classmethod 

27 def _read(cls, stream: BinaryIO, context: dict[str, Any] | None = None) -> Self: 

28 return cls._read_array(stream, 1, context)[0] 

29 

30 @classmethod 

31 def _read_array(cls, stream: BinaryIO, count: int, context: dict[str, Any] | None = None) -> list[Self]: 

32 if count == EOF: 

33 data = stream.read() 

34 length = len(data) 

35 count = length // cls.size 

36 else: 

37 length = cls.size * count 

38 data = stream.read(length) 

39 

40 fmt = _struct(cls.cs.endian, f"{count}{cls.packchar}") 

41 

42 if len(data) != length: 

43 raise EOFError(f"Read {len(data)} bytes, but expected {length}") 

44 

45 return [cls.__new__(cls, value) for value in fmt.unpack(data)] 

46 

47 @classmethod 

48 def _read_0(cls, stream: BinaryIO, context: dict[str, Any] | None = None) -> Self: 

49 result = [] 

50 

51 fmt = _struct(cls.cs.endian, cls.packchar) 

52 while True: 

53 data = stream.read(cls.size) 

54 

55 if len(data) != cls.size: 

56 raise EOFError(f"Read {len(data)} bytes, but expected {cls.size}") 

57 

58 if (value := fmt.unpack(data)[0]) == 0: 

59 break 

60 

61 result.append(cls.__new__(cls, value)) 

62 

63 return result 

64 

65 @classmethod 

66 def _write(cls, stream: BinaryIO, data: Packed[T]) -> int: 

67 return stream.write(_struct(cls.cs.endian, cls.packchar).pack(data)) 

68 

69 @classmethod 

70 def _write_array(cls, stream: BinaryIO, data: list[Packed[T]]) -> int: 

71 return stream.write(_struct(cls.cs.endian, f"{len(data)}{cls.packchar}").pack(*data))