1from __future__ import annotations
2
3from typing import TYPE_CHECKING, Any, BinaryIO
4
5from dissect.cstruct.types.base import EOF, BaseArray, BaseType
6
7if TYPE_CHECKING:
8 from typing_extensions import Self
9
10
11class CharArray(bytes, BaseArray):
12 """Character array type for reading and writing byte strings."""
13
14 @classmethod
15 def __default__(cls) -> Self:
16 return type.__call__(cls, b"\x00" * (0 if cls.dynamic or cls.null_terminated else cls.num_entries))
17
18 @classmethod
19 def _read(cls, stream: BinaryIO, context: dict[str, Any] | None = None) -> Self:
20 return type.__call__(cls, super()._read(stream, context))
21
22 @classmethod
23 def _write(cls, stream: BinaryIO, data: bytes) -> int:
24 if isinstance(data, list) and data and isinstance(data[0], int):
25 data = bytes(data)
26
27 elif isinstance(data, str):
28 data = data.encode("latin-1")
29
30 if cls.null_terminated:
31 return stream.write(data + b"\x00")
32 return stream.write(data)
33
34
35class Char(bytes, BaseType):
36 """Character type for reading and writing bytes."""
37
38 ArrayType = CharArray
39
40 @classmethod
41 def __default__(cls) -> Self:
42 return type.__call__(cls, b"\x00")
43
44 @classmethod
45 def _read(cls, stream: BinaryIO, context: dict[str, Any] | None = None) -> Self:
46 return cls._read_array(stream, 1, context)
47
48 @classmethod
49 def _read_array(cls, stream: BinaryIO, count: int, context: dict[str, Any] | None = None) -> Self:
50 if count == 0:
51 return type.__call__(cls, b"")
52
53 data = stream.read(-1 if count == EOF else count)
54 if count != EOF and len(data) != count:
55 raise EOFError(f"Read {len(data)} bytes, but expected {count}")
56
57 return type.__call__(cls, data)
58
59 @classmethod
60 def _read_0(cls, stream: BinaryIO, context: dict[str, Any] | None = None) -> Self:
61 buf = []
62 while True:
63 byte = stream.read(1)
64 if byte == b"":
65 raise EOFError("Read 0 bytes, but expected 1")
66
67 if byte == b"\x00":
68 break
69
70 buf.append(byte)
71
72 return type.__call__(cls, b"".join(buf))
73
74 @classmethod
75 def _write(cls, stream: BinaryIO, data: bytes | int | str) -> int:
76 if isinstance(data, int):
77 data = chr(data)
78
79 if isinstance(data, str):
80 data = data.encode("latin-1")
81
82 return stream.write(data)