Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/msgspec/_utils.py: 14%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# type: ignore
2import collections
3import sys
4import typing
5from typing import _AnnotatedAlias # noqa: F401
7try:
8 from typing_extensions import get_type_hints as _get_type_hints
9except Exception:
10 from typing import get_type_hints as _get_type_hints
12try:
13 from typing_extensions import NotRequired, Required
14except Exception:
15 try:
16 from typing import NotRequired, Required
17 except Exception:
18 Required = NotRequired = None
21def get_type_hints(obj):
22 return _get_type_hints(obj, include_extras=True)
25# The `is_class` argument was new in 3.11, but was backported to 3.9 and 3.10.
26# It's _likely_ to be available for 3.9/3.10, but may not be. Easiest way to
27# check is to try it and see. This check can be removed when we drop support
28# for Python 3.10.
29try:
30 typing.ForwardRef("Foo", is_class=True)
31except TypeError:
33 def _forward_ref(value):
34 return typing.ForwardRef(value, is_argument=False)
36else:
38 def _forward_ref(value):
39 return typing.ForwardRef(value, is_argument=False, is_class=True)
42# Python 3.13 adds a new mandatory type_params kwarg to _eval_type
43if sys.version_info >= (3, 13):
45 def _eval_type(t, globalns, localns):
46 return typing._eval_type(t, globalns, localns, ())
47elif sys.version_info < (3, 10):
49 def _eval_type(t, globalns, localns):
50 try:
51 return typing._eval_type(t, globalns, localns)
52 except TypeError as e:
53 try:
54 from eval_type_backport import eval_type_backport
55 except ImportError:
56 raise TypeError(
57 f"Unable to evaluate type annotation {t.__forward_arg__!r}. If you are making use "
58 "of the new typing syntax (unions using `|` since Python 3.10 or builtins subscripting "
59 "since Python 3.9), you should either replace the use of new syntax with the existing "
60 "`typing` constructs or install the `eval_type_backport` package."
61 ) from e
63 return eval_type_backport(
64 t,
65 globalns,
66 localns,
67 try_default=False,
68 )
69else:
70 _eval_type = typing._eval_type
73if sys.version_info >= (3, 14):
74 from annotationlib import get_annotations as _get_class_annotations
75else:
77 def _get_class_annotations(cls):
78 return cls.__dict__.get("__annotations__", {})
81def _apply_params(obj, mapping):
82 if isinstance(obj, typing.TypeVar):
83 return mapping.get(obj, obj)
85 try:
86 parameters = tuple(obj.__parameters__)
87 except Exception:
88 # Not parameterized or __parameters__ is invalid, ignore
89 return obj
91 if not parameters:
92 # Not parametrized
93 return obj
95 # Parametrized
96 args = tuple(mapping.get(p, p) for p in parameters)
97 return obj[args]
100def _get_class_mro_and_typevar_mappings(obj):
101 mapping = {}
103 if isinstance(obj, type):
104 cls = obj
105 else:
106 cls = obj.__origin__
108 def inner(c, scope):
109 if isinstance(c, type):
110 cls = c
111 new_scope = {}
112 else:
113 cls = getattr(c, "__origin__", None)
114 if cls in (None, object, typing.Generic) or cls in mapping:
115 return
116 params = cls.__parameters__
117 args = tuple(_apply_params(a, scope) for a in c.__args__)
118 assert len(params) == len(args)
119 mapping[cls] = new_scope = dict(zip(params, args))
121 if issubclass(cls, typing.Generic):
122 bases = getattr(cls, "__orig_bases__", cls.__bases__)
123 for b in bases:
124 inner(b, new_scope)
126 inner(obj, {})
127 return cls.__mro__, mapping
130def get_class_annotations(obj):
131 """Get the annotations for a class.
133 This is similar to ``typing.get_type_hints``, except:
135 - We maintain it
136 - It leaves extras like ``Annotated``/``ClassVar`` alone
137 - It resolves any parametrized generics in the class mro. The returned
138 mapping may still include ``TypeVar`` values, but those should be treated
139 as their unparametrized variants (i.e. equal to ``Any`` for the common case).
141 Note that this function doesn't check that Generic types are being used
142 properly - invalid uses of `Generic` may slip through without complaint.
144 The assumption here is that the user is making use of a static analysis
145 tool like ``mypy``/``pyright`` already, which would catch misuse of these
146 APIs.
147 """
148 hints = {}
149 mro, typevar_mappings = _get_class_mro_and_typevar_mappings(obj)
151 for cls in mro:
152 if cls in (typing.Generic, object):
153 continue
155 mapping = typevar_mappings.get(cls)
156 cls_locals = dict(vars(cls))
157 cls_globals = getattr(sys.modules.get(cls.__module__, None), "__dict__", {})
159 ann = _get_class_annotations(cls)
160 for name, value in ann.items():
161 if name in hints:
162 continue
163 if isinstance(value, str):
164 value = _forward_ref(value)
165 value = _eval_type(value, cls_locals, cls_globals)
166 if mapping is not None:
167 value = _apply_params(value, mapping)
168 if value is None:
169 value = type(None)
170 hints[name] = value
171 return hints
174# A mapping from a type annotation (or annotation __origin__) to the concrete
175# python type that msgspec will use when decoding. THIS IS PRIVATE FOR A
176# REASON. DON'T MUCK WITH THIS.
177_CONCRETE_TYPES = {
178 list: list,
179 tuple: tuple,
180 set: set,
181 frozenset: frozenset,
182 dict: dict,
183 typing.List: list,
184 typing.Tuple: tuple,
185 typing.Set: set,
186 typing.FrozenSet: frozenset,
187 typing.Dict: dict,
188 typing.Collection: list,
189 typing.MutableSequence: list,
190 typing.Sequence: list,
191 typing.MutableMapping: dict,
192 typing.Mapping: dict,
193 typing.MutableSet: set,
194 typing.AbstractSet: set,
195 collections.abc.Collection: list,
196 collections.abc.MutableSequence: list,
197 collections.abc.Sequence: list,
198 collections.abc.MutableSet: set,
199 collections.abc.Set: set,
200 collections.abc.MutableMapping: dict,
201 collections.abc.Mapping: dict,
202}
205def get_typeddict_info(obj):
206 if isinstance(obj, type):
207 cls = obj
208 else:
209 cls = obj.__origin__
211 raw_hints = get_class_annotations(obj)
213 if hasattr(cls, "__required_keys__"):
214 required = set(cls.__required_keys__)
215 elif cls.__total__:
216 required = set(raw_hints)
217 else:
218 required = set()
220 # Both `typing.TypedDict` and `typing_extensions.TypedDict` have a bug
221 # where `Required`/`NotRequired` aren't properly detected at runtime when
222 # `__future__.annotations` is enabled, meaning the `__required_keys__`
223 # isn't correct. This code block works around this issue by amending the
224 # set of required keys as needed, while also stripping off any
225 # `Required`/`NotRequired` wrappers.
226 hints = {}
227 for k, v in raw_hints.items():
228 origin = getattr(v, "__origin__", False)
229 if origin is Required:
230 required.add(k)
231 hints[k] = v.__args__[0]
232 elif origin is NotRequired:
233 required.discard(k)
234 hints[k] = v.__args__[0]
235 else:
236 hints[k] = v
238 # This can happen if there is a bug in the TypedDict implementation;
239 # such a bug was present in Python 3.14.
240 if not all(k in hints for k in required):
241 raise RuntimeError(
242 f"Required set {required} contains keys that are no in hints: {hints.keys()}"
243 )
244 return hints, required
247def get_dataclass_info(obj):
248 if isinstance(obj, type):
249 cls = obj
250 else:
251 cls = obj.__origin__
252 hints = get_class_annotations(obj)
253 required = []
254 optional = []
255 defaults = []
257 if hasattr(cls, "__dataclass_fields__"):
258 from dataclasses import _FIELD, _FIELD_INITVAR, MISSING
260 for field in cls.__dataclass_fields__.values():
261 if field._field_type is not _FIELD:
262 if field._field_type is _FIELD_INITVAR:
263 raise TypeError(
264 "dataclasses with `InitVar` fields are not supported"
265 )
266 continue
267 name = field.name
268 typ = hints[name]
269 if field.default is not MISSING:
270 defaults.append(field.default)
271 optional.append((name, typ, False))
272 elif field.default_factory is not MISSING:
273 defaults.append(field.default_factory)
274 optional.append((name, typ, True))
275 else:
276 required.append((name, typ, False))
278 required.extend(optional)
280 pre_init = None
281 post_init = getattr(cls, "__post_init__", None)
282 else:
283 from attrs import NOTHING, Factory
285 fields_with_validators = []
287 for field in cls.__attrs_attrs__:
288 name = field.name
289 typ = hints[name]
290 default = field.default
291 if default is not NOTHING:
292 if isinstance(default, Factory):
293 if default.takes_self:
294 raise NotImplementedError(
295 "Support for default factories with `takes_self=True` "
296 "is not implemented. File a GitHub issue if you need "
297 "this feature!"
298 )
299 defaults.append(default.factory)
300 optional.append((name, typ, True))
301 else:
302 defaults.append(default)
303 optional.append((name, typ, False))
304 else:
305 required.append((name, typ, False))
307 if field.validator is not None:
308 fields_with_validators.append(field)
310 required.extend(optional)
312 pre_init = getattr(cls, "__attrs_pre_init__", None)
313 post_init = getattr(cls, "__attrs_post_init__", None)
315 if fields_with_validators:
316 post_init = _wrap_attrs_validators(fields_with_validators, post_init)
318 return cls, tuple(required), tuple(defaults), pre_init, post_init
321def _wrap_attrs_validators(fields, post_init):
322 def inner(obj):
323 for field in fields:
324 field.validator(obj, field, getattr(obj, field.name))
325 if post_init is not None:
326 post_init(obj)
328 return inner
331def rebuild(cls, kwargs):
332 """Used to unpickle Structs with keyword-only fields"""
333 return cls(**kwargs)