1from __future__ import annotations
2
3from functools import lru_cache
4from struct import Struct
5from typing import TYPE_CHECKING, Any, BinaryIO, Generic, TypeVar
6
7from dissect.cstruct.types.base import EOF, BaseType
8
9if TYPE_CHECKING:
10 from typing_extensions import Self
11
12
13@lru_cache(1024)
14def _struct(endian: str, packchar: str) -> Struct:
15 return Struct(f"{endian}{packchar}")
16
17
18T = TypeVar("T", int, float)
19
20
21class Packed(BaseType, Generic[T]):
22 """Packed type for Python struct (un)packing."""
23
24 packchar: str
25
26 @classmethod
27 def _read(cls, stream: BinaryIO, context: dict[str, Any] | None = None) -> Self:
28 return cls._read_array(stream, 1, context)[0]
29
30 @classmethod
31 def _read_array(cls, stream: BinaryIO, count: int, context: dict[str, Any] | None = None) -> list[Self]:
32 if count == EOF:
33 data = stream.read()
34 length = len(data)
35 count = length // cls.size
36 else:
37 length = cls.size * count
38 data = stream.read(length)
39
40 fmt = _struct(cls.cs.endian, f"{count}{cls.packchar}")
41
42 if len(data) != length:
43 raise EOFError(f"Read {len(data)} bytes, but expected {length}")
44
45 return [cls.__new__(cls, value) for value in fmt.unpack(data)]
46
47 @classmethod
48 def _read_0(cls, stream: BinaryIO, context: dict[str, Any] | None = None) -> Self:
49 result = []
50
51 fmt = _struct(cls.cs.endian, cls.packchar)
52 while True:
53 data = stream.read(cls.size)
54
55 if len(data) != cls.size:
56 raise EOFError(f"Read {len(data)} bytes, but expected {cls.size}")
57
58 if (value := fmt.unpack(data)[0]) == 0:
59 break
60
61 result.append(cls.__new__(cls, value))
62
63 return result
64
65 @classmethod
66 def _write(cls, stream: BinaryIO, data: Packed[T]) -> int:
67 return stream.write(_struct(cls.cs.endian, cls.packchar).pack(data))
68
69 @classmethod
70 def _write_array(cls, stream: BinaryIO, data: list[Packed[T]]) -> int:
71 return stream.write(_struct(cls.cs.endian, f"{len(data)}{cls.packchar}").pack(*data))