Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dissect/cstruct/types/base.py: 72%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

135 statements  

1from __future__ import annotations 

2 

3import functools 

4from io import BytesIO 

5from typing import TYPE_CHECKING, Any, BinaryIO, Callable, ClassVar, TypeVar 

6 

7from dissect.cstruct.exceptions import ArraySizeError 

8from dissect.cstruct.expression import Expression 

9 

10if TYPE_CHECKING: 

11 from typing_extensions import Self 

12 

13 from dissect.cstruct.cstruct import cstruct 

14 

15 

16EOF = -0xE0F # Negative counts are illegal anyway, so abuse that for our EOF sentinel 

17 

18 

19class MetaType(type): 

20 """Base metaclass for cstruct type classes.""" 

21 

22 cs: cstruct 

23 """The cstruct instance this type class belongs to.""" 

24 size: int | None 

25 """The size of the type in bytes. Can be ``None`` for dynamic sized types.""" 

26 dynamic: bool 

27 """Whether or not the type is dynamically sized.""" 

28 alignment: int | None 

29 """The alignment of the type in bytes. A value of ``None`` will be treated as 1-byte aligned.""" 

30 

31 # This must be the actual type, but since Array is a subclass of BaseType, we correct this at the bottom of the file 

32 ArrayType: type[BaseArray] = "Array" 

33 """The array type for this type class.""" 

34 

35 def __call__(cls, *args, **kwargs) -> Self: # type: ignore 

36 """Adds support for ``TypeClass(bytes | file-like object)`` parsing syntax.""" 

37 # TODO: add support for Type(cs) API to create new bounded type classes, similar to the old API? 

38 if len(args) == 1 and not isinstance(args[0], cls): 

39 stream = args[0] 

40 

41 if _is_readable_type(stream): 

42 return cls._read(stream) 

43 

44 if issubclass(cls, bytes) and isinstance(stream, bytes) and len(stream) == cls.size: 

45 # Shortcut for char/bytes type 

46 return type.__call__(cls, *args, **kwargs) 

47 

48 if _is_buffer_type(stream): 

49 return cls.reads(stream) 

50 

51 return type.__call__(cls, *args, **kwargs) 

52 

53 def __getitem__(cls, num_entries: int | Expression | None) -> type[BaseArray]: 

54 """Create a new array with the given number of entries.""" 

55 return cls.cs._make_array(cls, num_entries) 

56 

57 def __bool__(cls) -> bool: 

58 """Type class is always truthy.""" 

59 return True 

60 

61 def __len__(cls) -> int: 

62 """Return the byte size of the type.""" 

63 # Python 3.9 compat thing for bound type vars 

64 if cls is BaseType: 

65 return 0 

66 

67 if cls.size is None: 

68 raise TypeError("Dynamic size") 

69 

70 return cls.size 

71 

72 def __default__(cls) -> Self: # type: ignore 

73 """Return the default value of this type.""" 

74 return cls() 

75 

76 def reads(cls, data: bytes | memoryview | bytearray) -> Self: # type: ignore 

77 """Parse the given data from a bytes-like object. 

78 

79 Args: 

80 data: Bytes-like object to parse. 

81 

82 Returns: 

83 The parsed value of this type. 

84 """ 

85 return cls._read(BytesIO(data)) 

86 

87 def read(cls, obj: BinaryIO | bytes | memoryview | bytearray) -> Self: # type: ignore 

88 """Parse the given data. 

89 

90 Args: 

91 obj: Data to parse. Can be a bytes-like object or a file-like object. 

92 

93 Returns: 

94 The parsed value of this type. 

95 """ 

96 if _is_buffer_type(obj): 

97 return cls.reads(obj) 

98 

99 if not _is_readable_type(obj): 

100 raise TypeError("Invalid object type") 

101 

102 return cls._read(obj) 

103 

104 def write(cls, stream: BinaryIO, value: Any) -> int: 

105 """Write a value to a writable file-like object. 

106 

107 Args: 

108 stream: File-like objects that supports writing. 

109 value: Value to write. 

110 

111 Returns: 

112 The amount of bytes written. 

113 """ 

114 return cls._write(stream, value) 

115 

116 def dumps(cls, value: Any) -> bytes: 

117 """Dump a value to a byte string. 

118 

119 Args: 

120 value: Value to dump. 

121 

122 Returns: 

123 The raw bytes of this type. 

124 """ 

125 out = BytesIO() 

126 cls._write(out, value) 

127 return out.getvalue() 

128 

129 def _read(cls, stream: BinaryIO, context: dict[str, Any] | None = None) -> Self: # type: ignore 

130 """Internal function for reading value. 

131 

132 Must be implemented per type. 

133 

134 Args: 

135 stream: The stream to read from. 

136 context: Optional reading context. 

137 """ 

138 raise NotImplementedError 

139 

140 def _read_array(cls, stream: BinaryIO, count: int, context: dict[str, Any] | None = None) -> list[Self]: # type: ignore 

141 """Internal function for reading array values. 

142 

143 Allows type implementations to do optimized reading for their type. 

144 

145 Args: 

146 stream: The stream to read from. 

147 count: The amount of values to read. 

148 context: Optional reading context. 

149 """ 

150 if count == EOF: 

151 result = [] 

152 while not _is_eof(stream): 

153 result.append(cls._read(stream, context)) 

154 return result 

155 

156 return [cls._read(stream, context) for _ in range(count)] 

157 

158 def _read_0(cls, stream: BinaryIO, context: dict[str, Any] | None = None) -> list[Self]: 

159 """Internal function for reading null-terminated data. 

160 

161 "Null" is type specific, so must be implemented per type. 

162 

163 Args: 

164 stream: The stream to read from. 

165 context: Optional reading context. 

166 """ 

167 raise NotImplementedError 

168 

169 def _write(cls, stream: BinaryIO, data: Any) -> int: 

170 raise NotImplementedError 

171 

172 def _write_array(cls, stream: BinaryIO, array: list[Self]) -> int: # type: ignore 

173 """Internal function for writing arrays. 

174 

175 Allows type implementations to do optimized writing for their type. 

176 

177 Args: 

178 stream: The stream to read from. 

179 array: The array to write. 

180 """ 

181 return sum(cls._write(stream, entry) for entry in array) 

182 

183 def _write_0(cls, stream: BinaryIO, array: list[Self]) -> int: # type: ignore 

184 """Internal function for writing null-terminated arrays. 

185 

186 Allows type implementations to do optimized writing for their type. 

187 

188 Args: 

189 stream: The stream to read from. 

190 array: The array to write. 

191 """ 

192 return cls._write_array(stream, [*array, cls.__default__()]) 

193 

194 

195class _overload: 

196 """Descriptor to use on the ``write`` and ``dumps`` methods on cstruct types. 

197 

198 Allows for calling these methods on both the type and instance. 

199 

200 Example: 

201 >>> int32.dumps(123) 

202 b'\\x7b\\x00\\x00\\x00' 

203 >>> int32(123).dumps() 

204 b'\\x7b\\x00\\x00\\x00' 

205 """ 

206 

207 def __init__(self, func: Callable[..., Any]) -> None: 

208 self.func = func 

209 

210 def __get__(self, instance: BaseType | None, owner: type[BaseType]) -> Callable[[], bytes]: 

211 if instance is None: 

212 return functools.partial(self.func, owner) 

213 return functools.partial(self.func, instance.__class__, value=instance) 

214 

215 

216class BaseType(metaclass=MetaType): 

217 """Base class for cstruct type classes.""" 

218 

219 dumps = _overload(MetaType.dumps) 

220 write = _overload(MetaType.write) 

221 

222 def __len__(self) -> int: 

223 """Return the byte size of the type.""" 

224 if self.__class__.size is None: 

225 raise TypeError("Dynamic size") 

226 

227 return self.__class__.size 

228 

229 

230T = TypeVar("T", bound=BaseType) 

231 

232 

233class BaseArray(BaseType): 

234 """Implements a fixed or dynamically sized array type. 

235 

236 Example: 

237 When using the default C-style parser, the following syntax is supported: 

238 

239 x[3] -> 3 -> static length. 

240 x[] -> None -> null-terminated. 

241 x[expr] -> expr -> dynamic length. 

242 """ 

243 

244 type: ClassVar[type[BaseType]] 

245 num_entries: ClassVar[int | Expression | None] 

246 null_terminated: ClassVar[bool] 

247 

248 @classmethod 

249 def __default__(cls) -> BaseType: 

250 return type.__call__( 

251 cls, [cls.type.__default__()] * (cls.num_entries if isinstance(cls.num_entries, int) else 0) 

252 ) 

253 

254 @classmethod 

255 def _read(cls, stream: BinaryIO, context: dict[str, Any] | None = None) -> list[BaseType]: 

256 if cls.null_terminated: 

257 return cls.type._read_0(stream, context) 

258 

259 if isinstance(cls.num_entries, int): 

260 num = max(0, cls.num_entries) 

261 elif cls.num_entries is None: 

262 num = EOF 

263 elif isinstance(cls.num_entries, Expression): 

264 try: 

265 num = max(0, cls.num_entries.evaluate(context)) 

266 except Exception: 

267 if cls.num_entries.expression != "EOF": 

268 raise 

269 num = EOF 

270 

271 return cls.type._read_array(stream, num, context) 

272 

273 @classmethod 

274 def _write(cls, stream: BinaryIO, data: list[Any]) -> int: 

275 if cls.null_terminated: 

276 return cls.type._write_0(stream, data) 

277 

278 if not cls.dynamic and cls.num_entries != (actual_size := len(data)): 

279 raise ArraySizeError(f"Expected static array size {cls.num_entries}, got {actual_size} instead.") 

280 

281 return cls.type._write_array(stream, data) 

282 

283 

284class Array(list[T], BaseArray): 

285 @classmethod 

286 def _read(cls, stream: BinaryIO, context: dict[str, Any] | None = None) -> list[T]: 

287 return cls(super()._read(stream, context)) 

288 

289 

290def _is_readable_type(value: object) -> bool: 

291 return hasattr(value, "read") 

292 

293 

294def _is_buffer_type(value: object) -> bool: 

295 return isinstance(value, (bytes, memoryview, bytearray)) 

296 

297 

298def _is_eof(stream: BinaryIO) -> bool: 

299 """Check if the stream has reached EOF.""" 

300 pos = stream.tell() 

301 stream.read(1) 

302 

303 if stream.tell() == pos: 

304 return True 

305 

306 stream.seek(pos) 

307 return False 

308 

309 

310# As mentioned in the BaseType class, we correctly set the type here 

311MetaType.ArrayType = Array