1from __future__ import annotations
2
3import functools
4from io import BytesIO
5from typing import TYPE_CHECKING, Any, BinaryIO, Callable, ClassVar, TypeVar
6
7from dissect.cstruct.exceptions import ArraySizeError
8from dissect.cstruct.expression import Expression
9
10if TYPE_CHECKING:
11 from typing_extensions import Self
12
13 from dissect.cstruct.cstruct import cstruct
14
15
16EOF = -0xE0F # Negative counts are illegal anyway, so abuse that for our EOF sentinel
17
18
19class MetaType(type):
20 """Base metaclass for cstruct type classes."""
21
22 cs: cstruct
23 """The cstruct instance this type class belongs to."""
24 size: int | None
25 """The size of the type in bytes. Can be ``None`` for dynamic sized types."""
26 dynamic: bool
27 """Whether or not the type is dynamically sized."""
28 alignment: int | None
29 """The alignment of the type in bytes. A value of ``None`` will be treated as 1-byte aligned."""
30
31 # This must be the actual type, but since Array is a subclass of BaseType, we correct this at the bottom of the file
32 ArrayType: type[BaseArray] = "Array"
33 """The array type for this type class."""
34
35 def __call__(cls, *args, **kwargs) -> Self: # type: ignore
36 """Adds support for ``TypeClass(bytes | file-like object)`` parsing syntax."""
37 # TODO: add support for Type(cs) API to create new bounded type classes, similar to the old API?
38 if len(args) == 1 and not isinstance(args[0], cls):
39 stream = args[0]
40
41 if _is_readable_type(stream):
42 return cls._read(stream)
43
44 if issubclass(cls, bytes) and isinstance(stream, bytes) and len(stream) == cls.size:
45 # Shortcut for char/bytes type
46 return type.__call__(cls, *args, **kwargs)
47
48 if _is_buffer_type(stream):
49 return cls.reads(stream)
50
51 return type.__call__(cls, *args, **kwargs)
52
53 def __getitem__(cls, num_entries: int | Expression | None) -> type[BaseArray]:
54 """Create a new array with the given number of entries."""
55 return cls.cs._make_array(cls, num_entries)
56
57 def __bool__(cls) -> bool:
58 """Type class is always truthy."""
59 return True
60
61 def __len__(cls) -> int:
62 """Return the byte size of the type."""
63 # Python 3.9 compat thing for bound type vars
64 if cls is BaseType:
65 return 0
66
67 if cls.size is None:
68 raise TypeError("Dynamic size")
69
70 return cls.size
71
72 def __default__(cls) -> Self: # type: ignore
73 """Return the default value of this type."""
74 return cls()
75
76 def reads(cls, data: bytes | memoryview | bytearray) -> Self: # type: ignore
77 """Parse the given data from a bytes-like object.
78
79 Args:
80 data: Bytes-like object to parse.
81
82 Returns:
83 The parsed value of this type.
84 """
85 return cls._read(BytesIO(data))
86
87 def read(cls, obj: BinaryIO | bytes | memoryview | bytearray) -> Self: # type: ignore
88 """Parse the given data.
89
90 Args:
91 obj: Data to parse. Can be a bytes-like object or a file-like object.
92
93 Returns:
94 The parsed value of this type.
95 """
96 if _is_buffer_type(obj):
97 return cls.reads(obj)
98
99 if not _is_readable_type(obj):
100 raise TypeError("Invalid object type")
101
102 return cls._read(obj)
103
104 def write(cls, stream: BinaryIO, value: Any) -> int:
105 """Write a value to a writable file-like object.
106
107 Args:
108 stream: File-like objects that supports writing.
109 value: Value to write.
110
111 Returns:
112 The amount of bytes written.
113 """
114 return cls._write(stream, value)
115
116 def dumps(cls, value: Any) -> bytes:
117 """Dump a value to a byte string.
118
119 Args:
120 value: Value to dump.
121
122 Returns:
123 The raw bytes of this type.
124 """
125 out = BytesIO()
126 cls._write(out, value)
127 return out.getvalue()
128
129 def _read(cls, stream: BinaryIO, context: dict[str, Any] | None = None) -> Self: # type: ignore
130 """Internal function for reading value.
131
132 Must be implemented per type.
133
134 Args:
135 stream: The stream to read from.
136 context: Optional reading context.
137 """
138 raise NotImplementedError
139
140 def _read_array(cls, stream: BinaryIO, count: int, context: dict[str, Any] | None = None) -> list[Self]: # type: ignore
141 """Internal function for reading array values.
142
143 Allows type implementations to do optimized reading for their type.
144
145 Args:
146 stream: The stream to read from.
147 count: The amount of values to read.
148 context: Optional reading context.
149 """
150 if count == EOF:
151 result = []
152 while not _is_eof(stream):
153 result.append(cls._read(stream, context))
154 return result
155
156 return [cls._read(stream, context) for _ in range(count)]
157
158 def _read_0(cls, stream: BinaryIO, context: dict[str, Any] | None = None) -> list[Self]:
159 """Internal function for reading null-terminated data.
160
161 "Null" is type specific, so must be implemented per type.
162
163 Args:
164 stream: The stream to read from.
165 context: Optional reading context.
166 """
167 raise NotImplementedError
168
169 def _write(cls, stream: BinaryIO, data: Any) -> int:
170 raise NotImplementedError
171
172 def _write_array(cls, stream: BinaryIO, array: list[Self]) -> int: # type: ignore
173 """Internal function for writing arrays.
174
175 Allows type implementations to do optimized writing for their type.
176
177 Args:
178 stream: The stream to read from.
179 array: The array to write.
180 """
181 return sum(cls._write(stream, entry) for entry in array)
182
183 def _write_0(cls, stream: BinaryIO, array: list[Self]) -> int: # type: ignore
184 """Internal function for writing null-terminated arrays.
185
186 Allows type implementations to do optimized writing for their type.
187
188 Args:
189 stream: The stream to read from.
190 array: The array to write.
191 """
192 return cls._write_array(stream, [*array, cls.__default__()])
193
194
195class _overload:
196 """Descriptor to use on the ``write`` and ``dumps`` methods on cstruct types.
197
198 Allows for calling these methods on both the type and instance.
199
200 Example:
201 >>> int32.dumps(123)
202 b'\\x7b\\x00\\x00\\x00'
203 >>> int32(123).dumps()
204 b'\\x7b\\x00\\x00\\x00'
205 """
206
207 def __init__(self, func: Callable[..., Any]) -> None:
208 self.func = func
209
210 def __get__(self, instance: BaseType | None, owner: type[BaseType]) -> Callable[[], bytes]:
211 if instance is None:
212 return functools.partial(self.func, owner)
213 return functools.partial(self.func, instance.__class__, value=instance)
214
215
216class BaseType(metaclass=MetaType):
217 """Base class for cstruct type classes."""
218
219 dumps = _overload(MetaType.dumps)
220 write = _overload(MetaType.write)
221
222 def __len__(self) -> int:
223 """Return the byte size of the type."""
224 if self.__class__.size is None:
225 raise TypeError("Dynamic size")
226
227 return self.__class__.size
228
229
230T = TypeVar("T", bound=BaseType)
231
232
233class BaseArray(BaseType):
234 """Implements a fixed or dynamically sized array type.
235
236 Example:
237 When using the default C-style parser, the following syntax is supported:
238
239 x[3] -> 3 -> static length.
240 x[] -> None -> null-terminated.
241 x[expr] -> expr -> dynamic length.
242 """
243
244 type: ClassVar[type[BaseType]]
245 num_entries: ClassVar[int | Expression | None]
246 null_terminated: ClassVar[bool]
247
248 @classmethod
249 def __default__(cls) -> BaseType:
250 return type.__call__(
251 cls, [cls.type.__default__()] * (cls.num_entries if isinstance(cls.num_entries, int) else 0)
252 )
253
254 @classmethod
255 def _read(cls, stream: BinaryIO, context: dict[str, Any] | None = None) -> list[BaseType]:
256 if cls.null_terminated:
257 return cls.type._read_0(stream, context)
258
259 if isinstance(cls.num_entries, int):
260 num = max(0, cls.num_entries)
261 elif cls.num_entries is None:
262 num = EOF
263 elif isinstance(cls.num_entries, Expression):
264 try:
265 num = max(0, cls.num_entries.evaluate(context))
266 except Exception:
267 if cls.num_entries.expression != "EOF":
268 raise
269 num = EOF
270
271 return cls.type._read_array(stream, num, context)
272
273 @classmethod
274 def _write(cls, stream: BinaryIO, data: list[Any]) -> int:
275 if cls.null_terminated:
276 return cls.type._write_0(stream, data)
277
278 if not cls.dynamic and cls.num_entries != (actual_size := len(data)):
279 raise ArraySizeError(f"Expected static array size {cls.num_entries}, got {actual_size} instead.")
280
281 return cls.type._write_array(stream, data)
282
283
284class Array(list[T], BaseArray):
285 @classmethod
286 def _read(cls, stream: BinaryIO, context: dict[str, Any] | None = None) -> list[T]:
287 return cls(super()._read(stream, context))
288
289
290def _is_readable_type(value: object) -> bool:
291 return hasattr(value, "read")
292
293
294def _is_buffer_type(value: object) -> bool:
295 return isinstance(value, (bytes, memoryview, bytearray))
296
297
298def _is_eof(stream: BinaryIO) -> bool:
299 """Check if the stream has reached EOF."""
300 pos = stream.tell()
301 stream.read(1)
302
303 if stream.tell() == pos:
304 return True
305
306 stream.seek(pos)
307 return False
308
309
310# As mentioned in the BaseType class, we correctly set the type here
311MetaType.ArrayType = Array