Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/msgspec/_utils.py: 14%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

181 statements  

1# type: ignore 

2import collections 

3import sys 

4import typing 

5from typing import _AnnotatedAlias # noqa: F401 

6 

7try: 

8 from typing_extensions import get_type_hints as _get_type_hints 

9except Exception: 

10 from typing import get_type_hints as _get_type_hints 

11 

12try: 

13 from typing_extensions import NotRequired, Required 

14except Exception: 

15 try: 

16 from typing import NotRequired, Required 

17 except Exception: 

18 Required = NotRequired = None 

19 

20 

21def get_type_hints(obj): 

22 return _get_type_hints(obj, include_extras=True) 

23 

24 

25# The `is_class` argument was new in 3.11, but was backported to 3.9 and 3.10. 

26# It's _likely_ to be available for 3.9/3.10, but may not be. Easiest way to 

27# check is to try it and see. This check can be removed when we drop support 

28# for Python 3.10. 

29try: 

30 typing.ForwardRef("Foo", is_class=True) 

31except TypeError: 

32 

33 def _forward_ref(value): 

34 return typing.ForwardRef(value, is_argument=False) 

35 

36else: 

37 

38 def _forward_ref(value): 

39 return typing.ForwardRef(value, is_argument=False, is_class=True) 

40 

41 

42# Python 3.13 adds a new mandatory type_params kwarg to _eval_type 

43if sys.version_info >= (3, 13): 

44 

45 def _eval_type(t, globalns, localns): 

46 return typing._eval_type(t, globalns, localns, ()) 

47elif sys.version_info < (3, 10): 

48 

49 def _eval_type(t, globalns, localns): 

50 try: 

51 return typing._eval_type(t, globalns, localns) 

52 except TypeError as e: 

53 try: 

54 from eval_type_backport import eval_type_backport 

55 except ImportError: 

56 raise TypeError( 

57 f"Unable to evaluate type annotation {t.__forward_arg__!r}. If you are making use " 

58 "of the new typing syntax (unions using `|` since Python 3.10 or builtins subscripting " 

59 "since Python 3.9), you should either replace the use of new syntax with the existing " 

60 "`typing` constructs or install the `eval_type_backport` package." 

61 ) from e 

62 

63 return eval_type_backport( 

64 t, 

65 globalns, 

66 localns, 

67 try_default=False, 

68 ) 

69else: 

70 _eval_type = typing._eval_type 

71 

72 

73if sys.version_info >= (3, 14): 

74 from annotationlib import get_annotations as _get_class_annotations 

75else: 

76 

77 def _get_class_annotations(cls): 

78 return cls.__dict__.get("__annotations__", {}) 

79 

80 

81def _apply_params(obj, mapping): 

82 if isinstance(obj, typing.TypeVar): 

83 return mapping.get(obj, obj) 

84 

85 try: 

86 parameters = tuple(obj.__parameters__) 

87 except Exception: 

88 # Not parameterized or __parameters__ is invalid, ignore 

89 return obj 

90 

91 if not parameters: 

92 # Not parametrized 

93 return obj 

94 

95 # Parametrized 

96 args = tuple(mapping.get(p, p) for p in parameters) 

97 return obj[args] 

98 

99 

100def _get_class_mro_and_typevar_mappings(obj): 

101 mapping = {} 

102 

103 if isinstance(obj, type): 

104 cls = obj 

105 else: 

106 cls = obj.__origin__ 

107 

108 def inner(c, scope): 

109 if isinstance(c, type): 

110 cls = c 

111 new_scope = {} 

112 else: 

113 cls = getattr(c, "__origin__", None) 

114 if cls in (None, object, typing.Generic) or cls in mapping: 

115 return 

116 params = cls.__parameters__ 

117 args = tuple(_apply_params(a, scope) for a in c.__args__) 

118 assert len(params) == len(args) 

119 mapping[cls] = new_scope = dict(zip(params, args)) 

120 

121 if issubclass(cls, typing.Generic): 

122 bases = getattr(cls, "__orig_bases__", cls.__bases__) 

123 for b in bases: 

124 inner(b, new_scope) 

125 

126 inner(obj, {}) 

127 return cls.__mro__, mapping 

128 

129 

130def get_class_annotations(obj): 

131 """Get the annotations for a class. 

132 

133 This is similar to ``typing.get_type_hints``, except: 

134 

135 - We maintain it 

136 - It leaves extras like ``Annotated``/``ClassVar`` alone 

137 - It resolves any parametrized generics in the class mro. The returned 

138 mapping may still include ``TypeVar`` values, but those should be treated 

139 as their unparametrized variants (i.e. equal to ``Any`` for the common case). 

140 

141 Note that this function doesn't check that Generic types are being used 

142 properly - invalid uses of `Generic` may slip through without complaint. 

143 

144 The assumption here is that the user is making use of a static analysis 

145 tool like ``mypy``/``pyright`` already, which would catch misuse of these 

146 APIs. 

147 """ 

148 hints = {} 

149 mro, typevar_mappings = _get_class_mro_and_typevar_mappings(obj) 

150 

151 for cls in mro: 

152 if cls in (typing.Generic, object): 

153 continue 

154 

155 mapping = typevar_mappings.get(cls) 

156 cls_locals = dict(vars(cls)) 

157 cls_globals = getattr(sys.modules.get(cls.__module__, None), "__dict__", {}) 

158 

159 ann = _get_class_annotations(cls) 

160 for name, value in ann.items(): 

161 if name in hints: 

162 continue 

163 if isinstance(value, str): 

164 value = _forward_ref(value) 

165 value = _eval_type(value, cls_locals, cls_globals) 

166 if mapping is not None: 

167 value = _apply_params(value, mapping) 

168 if value is None: 

169 value = type(None) 

170 hints[name] = value 

171 return hints 

172 

173 

174# A mapping from a type annotation (or annotation __origin__) to the concrete 

175# python type that msgspec will use when decoding. THIS IS PRIVATE FOR A 

176# REASON. DON'T MUCK WITH THIS. 

177_CONCRETE_TYPES = { 

178 list: list, 

179 tuple: tuple, 

180 set: set, 

181 frozenset: frozenset, 

182 dict: dict, 

183 typing.List: list, 

184 typing.Tuple: tuple, 

185 typing.Set: set, 

186 typing.FrozenSet: frozenset, 

187 typing.Dict: dict, 

188 typing.Collection: list, 

189 typing.MutableSequence: list, 

190 typing.Sequence: list, 

191 typing.MutableMapping: dict, 

192 typing.Mapping: dict, 

193 typing.MutableSet: set, 

194 typing.AbstractSet: set, 

195 collections.abc.Collection: list, 

196 collections.abc.MutableSequence: list, 

197 collections.abc.Sequence: list, 

198 collections.abc.MutableSet: set, 

199 collections.abc.Set: set, 

200 collections.abc.MutableMapping: dict, 

201 collections.abc.Mapping: dict, 

202} 

203 

204 

205def get_typeddict_info(obj): 

206 if isinstance(obj, type): 

207 cls = obj 

208 else: 

209 cls = obj.__origin__ 

210 

211 raw_hints = get_class_annotations(obj) 

212 

213 if hasattr(cls, "__required_keys__"): 

214 required = set(cls.__required_keys__) 

215 elif cls.__total__: 

216 required = set(raw_hints) 

217 else: 

218 required = set() 

219 

220 # Both `typing.TypedDict` and `typing_extensions.TypedDict` have a bug 

221 # where `Required`/`NotRequired` aren't properly detected at runtime when 

222 # `__future__.annotations` is enabled, meaning the `__required_keys__` 

223 # isn't correct. This code block works around this issue by amending the 

224 # set of required keys as needed, while also stripping off any 

225 # `Required`/`NotRequired` wrappers. 

226 hints = {} 

227 for k, v in raw_hints.items(): 

228 origin = getattr(v, "__origin__", False) 

229 if origin is Required: 

230 required.add(k) 

231 hints[k] = v.__args__[0] 

232 elif origin is NotRequired: 

233 required.discard(k) 

234 hints[k] = v.__args__[0] 

235 else: 

236 hints[k] = v 

237 

238 # This can happen if there is a bug in the TypedDict implementation; 

239 # such a bug was present in Python 3.14. 

240 if not all(k in hints for k in required): 

241 raise RuntimeError( 

242 f"Required set {required} contains keys that are no in hints: {hints.keys()}" 

243 ) 

244 return hints, required 

245 

246 

247def get_dataclass_info(obj): 

248 if isinstance(obj, type): 

249 cls = obj 

250 else: 

251 cls = obj.__origin__ 

252 hints = get_class_annotations(obj) 

253 required = [] 

254 optional = [] 

255 defaults = [] 

256 

257 if hasattr(cls, "__dataclass_fields__"): 

258 from dataclasses import _FIELD, _FIELD_INITVAR, MISSING 

259 

260 for field in cls.__dataclass_fields__.values(): 

261 if field._field_type is not _FIELD: 

262 if field._field_type is _FIELD_INITVAR: 

263 raise TypeError( 

264 "dataclasses with `InitVar` fields are not supported" 

265 ) 

266 continue 

267 name = field.name 

268 typ = hints[name] 

269 if field.default is not MISSING: 

270 defaults.append(field.default) 

271 optional.append((name, typ, False)) 

272 elif field.default_factory is not MISSING: 

273 defaults.append(field.default_factory) 

274 optional.append((name, typ, True)) 

275 else: 

276 required.append((name, typ, False)) 

277 

278 required.extend(optional) 

279 

280 pre_init = None 

281 post_init = getattr(cls, "__post_init__", None) 

282 else: 

283 from attrs import NOTHING, Factory 

284 

285 fields_with_validators = [] 

286 

287 for field in cls.__attrs_attrs__: 

288 name = field.name 

289 typ = hints[name] 

290 default = field.default 

291 if default is not NOTHING: 

292 if isinstance(default, Factory): 

293 if default.takes_self: 

294 raise NotImplementedError( 

295 "Support for default factories with `takes_self=True` " 

296 "is not implemented. File a GitHub issue if you need " 

297 "this feature!" 

298 ) 

299 defaults.append(default.factory) 

300 optional.append((name, typ, True)) 

301 else: 

302 defaults.append(default) 

303 optional.append((name, typ, False)) 

304 else: 

305 required.append((name, typ, False)) 

306 

307 if field.validator is not None: 

308 fields_with_validators.append(field) 

309 

310 required.extend(optional) 

311 

312 pre_init = getattr(cls, "__attrs_pre_init__", None) 

313 post_init = getattr(cls, "__attrs_post_init__", None) 

314 

315 if fields_with_validators: 

316 post_init = _wrap_attrs_validators(fields_with_validators, post_init) 

317 

318 return cls, tuple(required), tuple(defaults), pre_init, post_init 

319 

320 

321def _wrap_attrs_validators(fields, post_init): 

322 def inner(obj): 

323 for field in fields: 

324 field.validator(obj, field, getattr(obj, field.name)) 

325 if post_init is not None: 

326 post_init(obj) 

327 

328 return inner 

329 

330 

331def rebuild(cls, kwargs): 

332 """Used to unpickle Structs with keyword-only fields""" 

333 return cls(**kwargs)