Coverage for /pythoncovmergedfiles/medio/medio/src/pydantic/pydantic/_internal/_generics.py: 34%
215 statements
« prev ^ index » next coverage.py v7.2.3, created at 2023-04-27 07:38 +0000
« prev ^ index » next coverage.py v7.2.3, created at 2023-04-27 07:38 +0000
1from __future__ import annotations
3import sys
4import types
5import typing
6from collections import ChainMap
7from contextlib import contextmanager
8from contextvars import ContextVar
9from types import prepare_class
10from typing import TYPE_CHECKING, Any, Iterator, List, Mapping, MutableMapping, Tuple, TypeVar
11from weakref import WeakValueDictionary
13import typing_extensions
15from ._core_utils import get_type_ref
16from ._forward_ref import PydanticForwardRef, PydanticRecursiveRef
17from ._typing_extra import TypeVarType, typing_base
18from ._utils import all_identical, is_basemodel
20if sys.version_info >= (3, 10):
21 from typing import _UnionGenericAlias # type: ignore[attr-defined]
23if TYPE_CHECKING:
24 from ..main import BaseModel
26GenericTypesCacheKey = Tuple[Any, Any, Tuple[Any, ...]]
28# TODO: We want to remove LimitedDict, but to do this, we'll need to improve the handling of generics caching
29# Right now, to handle recursive generics, we some types must remain cached for brief periods without references
30# By chaining the WeakValuesDict with a LimitedDict, we have a way to retain caching for all types with references,
31# while also retaining a limited number of types even without references. This is generally enough to build
32# specific recursive generic models without losing required items out of the cache.
34KT = TypeVar('KT')
35VT = TypeVar('VT')
36_LIMITED_DICT_SIZE = 100
37if TYPE_CHECKING:
39 class LimitedDict(dict, MutableMapping[KT, VT]): # type: ignore[type-arg]
40 def __init__(self, size_limit: int = _LIMITED_DICT_SIZE):
41 ...
43else:
45 class LimitedDict(dict):
46 """
47 Limit the size/length of a dict used for caching to avoid unlimited increase in memory usage.
49 Since the dict is ordered, and we always remove elements from the beginning, this is effectively a FIFO cache.
50 """
52 def __init__(self, size_limit: int = _LIMITED_DICT_SIZE):
53 self.size_limit = size_limit
54 super().__init__()
56 def __setitem__(self, __key: Any, __value: Any) -> None:
57 super().__setitem__(__key, __value)
58 if len(self) > self.size_limit:
59 excess = len(self) - self.size_limit + self.size_limit // 10
60 to_remove = list(self.keys())[:excess]
61 for key in to_remove:
62 del self[key]
64 def __class_getitem__(cls, *args: Any) -> Any:
65 # to avoid errors with 3.7
66 return cls
69# weak dictionaries allow the dynamically created parametrized versions of generic models to get collected
70# once they are no longer referenced by the caller.
71if sys.version_info >= (3, 9): # Typing for weak dictionaries available at 3.9
72 GenericTypesCache = WeakValueDictionary[GenericTypesCacheKey, 'type[BaseModel]']
73else:
74 GenericTypesCache = WeakValueDictionary
76if TYPE_CHECKING:
78 class DeepChainMap(ChainMap[KT, VT]): # type: ignore
79 ...
81else:
83 class DeepChainMap(ChainMap):
84 """
85 Variant of ChainMap that allows direct updates to inner scopes
87 Taken from https://docs.python.org/3/library/collections.html#collections.ChainMap,
88 with some light modifications for this use case.
89 """
91 def clear(self) -> None:
92 for mapping in self.maps:
93 mapping.clear()
95 def __setitem__(self, key: KT, value: VT) -> None:
96 for mapping in self.maps:
97 mapping[key] = value
99 def __delitem__(self, key: KT) -> None:
100 hit = False
101 for mapping in self.maps:
102 if key in mapping:
103 del mapping[key]
104 hit = True
105 if not hit:
106 raise KeyError(key)
109# Despite the fact that LimitedDict _seems_ no longer necessary, I'm very nervous to actually remove it
110# and discover later on that we need to re-add all this infrastructure...
111# _GENERIC_TYPES_CACHE = DeepChainMap(GenericTypesCache(), LimitedDict())
113_GENERIC_TYPES_CACHE = GenericTypesCache()
116class PydanticGenericMetadata(typing_extensions.TypedDict):
117 origin: type[BaseModel] | None # analogous to typing._GenericAlias.__origin__
118 args: tuple[Any, ...] # analogous to typing._GenericAlias.__args__
119 parameters: tuple[type[Any], ...] # analogous to typing.Generic.__parameters__
122def create_generic_submodel(
123 model_name: str, origin: type[BaseModel], args: tuple[Any, ...], params: tuple[Any, ...]
124) -> type[BaseModel]:
125 """
126 Dynamically create a submodel of a provided (generic) BaseModel.
128 This is used when producing concrete parametrizations of generic models. This function
129 only *creates* the new subclass; the schema/validators/serialization must be updated to
130 reflect a concrete parametrization elsewhere.
132 :param model_name: name of the newly created model
133 :param origin: base class for the new model to inherit from
134 """
135 namespace: dict[str, Any] = {'__module__': origin.__module__}
136 bases = (origin,)
137 meta, ns, kwds = prepare_class(model_name, bases)
138 namespace.update(ns)
139 created_model = meta(
140 model_name,
141 bases,
142 namespace,
143 __pydantic_generic_metadata__={
144 'origin': origin,
145 'args': args,
146 'parameters': params,
147 },
148 __pydantic_reset_parent_namespace__=False,
149 **kwds,
150 )
152 model_module, called_globally = _get_caller_frame_info(depth=3)
153 if called_globally: # create global reference and therefore allow pickling
154 object_by_reference = None
155 reference_name = model_name
156 reference_module_globals = sys.modules[created_model.__module__].__dict__
157 while object_by_reference is not created_model:
158 object_by_reference = reference_module_globals.setdefault(reference_name, created_model)
159 reference_name += '_'
161 return created_model
164def _get_caller_frame_info(depth: int = 2) -> tuple[str | None, bool]:
165 """
166 Used inside a function to check whether it was called globally
168 :returns Tuple[module_name, called_globally]
169 """
170 try:
171 previous_caller_frame = sys._getframe(depth)
172 except ValueError as e:
173 raise RuntimeError('This function must be used inside another function') from e
174 except AttributeError: # sys module does not have _getframe function, so there's nothing we can do about it
175 return None, False
176 frame_globals = previous_caller_frame.f_globals
177 return frame_globals.get('__name__'), previous_caller_frame.f_locals is frame_globals
180DictValues: type[Any] = {}.values().__class__
183def iter_contained_typevars(v: Any) -> Iterator[TypeVarType]:
184 """
185 Recursively iterate through all subtypes and type args of `v` and yield any typevars that are found.
187 This is inspired as an alternative to directly accessing the `__parameters__` attribute of a GenericAlias,
188 since __parameters__ of (nested) generic BaseModel subclasses won't show up in that list.
189 """
190 if isinstance(v, TypeVar):
191 yield v
192 elif is_basemodel(v):
193 yield from v.__pydantic_generic_metadata__['parameters']
194 elif isinstance(v, (DictValues, list)):
195 for var in v:
196 yield from iter_contained_typevars(var)
197 else:
198 args = get_args(v)
199 for arg in args:
200 yield from iter_contained_typevars(arg)
203def get_args(v: Any) -> Any:
204 pydantic_generic_metadata: PydanticGenericMetadata | None = getattr(v, '__pydantic_generic_metadata__', None)
205 if pydantic_generic_metadata:
206 return pydantic_generic_metadata.get('args')
207 return typing_extensions.get_args(v)
210def get_origin(v: Any) -> Any:
211 pydantic_generic_metadata: PydanticGenericMetadata | None = getattr(v, '__pydantic_generic_metadata__', None)
212 if pydantic_generic_metadata:
213 return pydantic_generic_metadata.get('origin')
214 return typing_extensions.get_origin(v)
217def get_standard_typevars_map(cls: type[Any]) -> dict[TypeVarType, Any] | None:
218 """
219 Package a generic type's typevars and parametrization (if present) into a dictionary compatible with the
220 `replace_types` function. Specifically, this works with standard typing generics and typing._GenericAlias.
221 """
222 origin = get_origin(cls)
223 if origin is None:
224 return None
226 # In this case, we know that cls is a _GenericAlias, and origin is the generic type
227 # So it is safe to access cls.__args__ and origin.__parameters__
228 args: tuple[Any, ...] = cls.__args__ # type: ignore
229 parameters: tuple[TypeVarType, ...] = origin.__parameters__ # type: ignore
230 return dict(zip(parameters, args))
233def get_model_typevars_map(cls: type[BaseModel]) -> dict[TypeVarType, Any] | None:
234 """
235 Package a generic BaseModel's typevars and concrete parametrization (if present) into a dictionary compatible
236 with the `replace_types` function.
238 Since BaseModel.__class_getitem__ does not produce a typing._GenericAlias, and the BaseModel generic info is
239 stored in the __pydantic_generic_metadata__ attribute, we need special handling here.
240 """
241 # TODO: This could be unified with `get_standard_typevars_map` if we stored the generic metadata
242 # in the __origin__, __args__, and __parameters__ attributes of the model.
243 generic_metadata = cls.__pydantic_generic_metadata__
244 origin = generic_metadata['origin']
245 args = generic_metadata['args']
246 return dict(zip(iter_contained_typevars(origin), args))
249def replace_types(type_: Any, type_map: Mapping[Any, Any] | None) -> Any:
250 """Return type with all occurrences of `type_map` keys recursively replaced with their values.
252 :param type_: Any type, class or generic alias
253 :param type_map: Mapping from `TypeVar` instance to concrete types.
254 :return: New type representing the basic structure of `type_` with all
255 `typevar_map` keys recursively replaced.
257 >>> replace_types(Tuple[str, Union[List[str], float]], {str: int})
258 Tuple[int, Union[List[int], float]]
260 """
261 if not type_map:
262 return type_
264 type_args = get_args(type_)
265 origin_type = get_origin(type_)
267 if origin_type is typing_extensions.Annotated:
268 annotated_type, *annotations = type_args
269 annotated = replace_types(annotated_type, type_map)
270 for annotation in annotations:
271 annotated = typing_extensions.Annotated[annotated, annotation]
272 return annotated
274 # Having type args is a good indicator that this is a typing module
275 # class instantiation or a generic alias of some sort.
276 if type_args:
277 resolved_type_args = tuple(replace_types(arg, type_map) for arg in type_args)
278 if all_identical(type_args, resolved_type_args):
279 # If all arguments are the same, there is no need to modify the
280 # type or create a new object at all
281 return type_
282 if (
283 origin_type is not None
284 and isinstance(type_, typing_base)
285 and not isinstance(origin_type, typing_base)
286 and getattr(type_, '_name', None) is not None
287 ):
288 # In python < 3.9 generic aliases don't exist so any of these like `list`,
289 # `type` or `collections.abc.Callable` need to be translated.
290 # See: https://www.python.org/dev/peps/pep-0585
291 origin_type = getattr(typing, type_._name)
292 assert origin_type is not None
293 # PEP-604 syntax (Ex.: list | str) is represented with a types.UnionType object that does not have __getitem__.
294 # We also cannot use isinstance() since we have to compare types.
295 if sys.version_info >= (3, 10) and origin_type is types.UnionType: # noqa: E721
296 return _UnionGenericAlias(origin_type, resolved_type_args)
297 return origin_type[resolved_type_args]
299 # We handle pydantic generic models separately as they don't have the same
300 # semantics as "typing" classes or generic aliases
302 if not origin_type and is_basemodel(type_):
303 parameters = type_.__pydantic_generic_metadata__['parameters']
304 if not parameters:
305 return type_
306 resolved_type_args = tuple(replace_types(t, type_map) for t in parameters)
307 if all_identical(parameters, resolved_type_args):
308 return type_
309 return type_[resolved_type_args] # type: ignore[index]
311 # Handle special case for typehints that can have lists as arguments.
312 # `typing.Callable[[int, str], int]` is an example for this.
313 if isinstance(type_, (List, list)):
314 resolved_list = list(replace_types(element, type_map) for element in type_)
315 if all_identical(type_, resolved_list):
316 return type_
317 return resolved_list
319 if isinstance(type_, PydanticForwardRef):
320 # queue the replacement as a deferred action
321 return type_.replace_types(type_map)
323 # If all else fails, we try to resolve the type directly and otherwise just
324 # return the input with no modifications.
325 return type_map.get(type_, type_)
328def check_parameters_count(cls: type[BaseModel], parameters: tuple[Any, ...]) -> None:
329 actual = len(parameters)
330 expected = len(cls.__pydantic_generic_metadata__['parameters'])
331 if actual != expected:
332 description = 'many' if actual > expected else 'few'
333 raise TypeError(f'Too {description} parameters for {cls}; actual {actual}, expected {expected}')
336_generic_recursion_cache: ContextVar[set[str] | None] = ContextVar('_generic_recursion_cache', default=None)
339@contextmanager
340def generic_recursion_self_type(
341 origin: type[BaseModel], args: tuple[Any, ...]
342) -> Iterator[PydanticForwardRef | PydanticRecursiveRef | None]:
343 """
344 This contextmanager should be placed around the recursive calls used to build a generic type,
345 and accept as arguments the generic origin type and the type arguments being passed to it.
347 If the same origin and arguments are observed twice, it implies that a self-reference placeholder
348 can be used while building the core schema, and will produce a schema_ref that will be valid in the
349 final parent schema.
350 """
351 previously_seen_type_refs = _generic_recursion_cache.get()
352 if previously_seen_type_refs is None:
353 previously_seen_type_refs = set()
354 token = _generic_recursion_cache.set(previously_seen_type_refs)
355 else:
356 token = None
358 try:
359 type_ref = get_type_ref(origin, args_override=args)
360 if type_ref in previously_seen_type_refs:
361 self_type = PydanticRecursiveRef(type_ref=type_ref)
362 yield self_type
363 else:
364 previously_seen_type_refs.add(type_ref)
365 yield None
366 finally:
367 if token:
368 _generic_recursion_cache.reset(token)
371def recursively_defined_type_refs() -> set[str]:
372 visited = _generic_recursion_cache.get()
373 if not visited:
374 return set() # not in a generic recursion, so there are no types
376 return visited.copy() # don't allow modifications
379def get_cached_generic_type_early(parent: type[BaseModel], typevar_values: Any) -> type[BaseModel] | None:
380 """
381 The use of a two-stage cache lookup approach was necessary to have the highest performance possible for
382 repeated calls to `__class_getitem__` on generic types (which may happen in tighter loops during runtime),
383 while still ensuring that certain alternative parametrizations ultimately resolve to the same type.
385 As a concrete example, this approach was necessary to make Model[List[T]][int] equal to Model[List[int]].
386 The approach could be modified to not use two different cache keys at different points, but the
387 _early_cache_key is optimized to be as quick to compute as possible (for repeated-access speed), and the
388 _late_cache_key is optimized to be as "correct" as possible, so that two types that will ultimately be the
389 same after resolving the type arguments will always produce cache hits.
391 If we wanted to move to only using a single cache key per type, we would either need to always use the
392 slower/more computationally intensive logic associated with _late_cache_key, or would need to accept
393 that Model[List[T]][int] is a different type than Model[List[T]][int]. Because we rely on subclass relationships
394 during validation, I think it is worthwhile to ensure that types that are functionally equivalent are actually
395 equal.
396 """
397 return _GENERIC_TYPES_CACHE.get(_early_cache_key(parent, typevar_values))
400def get_cached_generic_type_late(
401 parent: type[BaseModel], typevar_values: Any, origin: type[BaseModel], args: tuple[Any, ...]
402) -> type[BaseModel] | None:
403 """
404 See the docstring of `get_cached_generic_type_early` for more information about the two-stage cache lookup.
405 """
406 cached = _GENERIC_TYPES_CACHE.get(_late_cache_key(origin, args, typevar_values))
407 if cached is not None:
408 set_cached_generic_type(parent, typevar_values, cached, origin, args)
409 return cached
412def set_cached_generic_type(
413 parent: type[BaseModel],
414 typevar_values: tuple[Any, ...],
415 type_: type[BaseModel],
416 origin: type[BaseModel] | None = None,
417 args: tuple[Any, ...] | None = None,
418) -> None:
419 """
420 See the docstring of `get_cached_generic_type_early` for more information about why items are cached with
421 two different keys.
422 """
423 _GENERIC_TYPES_CACHE[_early_cache_key(parent, typevar_values)] = type_
424 if len(typevar_values) == 1:
425 _GENERIC_TYPES_CACHE[_early_cache_key(parent, typevar_values[0])] = type_
426 if origin and args:
427 _GENERIC_TYPES_CACHE[_late_cache_key(origin, args, typevar_values)] = type_
430def _union_orderings_key(typevar_values: Any) -> Any:
431 """
432 This is intended to help differentiate between Union types with the same arguments in different order.
434 Thanks to caching internal to the `typing` module, it is not possible to distinguish between
435 List[Union[int, float]] and List[Union[float, int]] (and similarly for other "parent" origins besides List)
436 because `typing` considers Union[int, float] to be equal to Union[float, int].
438 However, you _can_ distinguish between (top-level) Union[int, float] vs. Union[float, int].
439 Because we parse items as the first Union type that is successful, we get slightly more consistent behavior
440 if we make an effort to distinguish the ordering of items in a union. It would be best if we could _always_
441 get the exact-correct order of items in the union, but that would require a change to the `typing` module itself.
442 (See https://github.com/python/cpython/issues/86483 for reference.)
443 """
444 if isinstance(typevar_values, tuple):
445 args_data = []
446 for value in typevar_values:
447 args_data.append(_union_orderings_key(value))
448 return tuple(args_data)
449 elif typing_extensions.get_origin(typevar_values) is typing.Union:
450 return get_args(typevar_values)
451 else:
452 return ()
455def _early_cache_key(cls: type[BaseModel], typevar_values: Any) -> GenericTypesCacheKey:
456 """
457 This is intended for minimal computational overhead during lookups of cached types.
459 Note that this is overly simplistic, and it's possible that two different cls/typevar_values
460 inputs would ultimately result in the same type being created in BaseModel.__class_getitem__.
461 To handle this, we have a fallback _late_cache_key that is checked later if the _early_cache_key
462 lookup fails, and should result in a cache hit _precisely_ when the inputs to __class_getitem__
463 would result in the same type.
464 """
465 return cls, typevar_values, _union_orderings_key(typevar_values)
468def _late_cache_key(origin: type[BaseModel], args: tuple[Any, ...], typevar_values: Any) -> GenericTypesCacheKey:
469 """
470 This is intended for use later in the process of creating a new type, when we have more information
471 about the exact args that will be passed. If it turns out that a different set of inputs to
472 __class_getitem__ resulted in the same inputs to the generic type creation process, we can still
473 return the cached type, and update the cache with the _early_cache_key as well.
474 """
475 # The _union_orderings_key is placed at the start here to ensure there cannot be a collision with an
476 # _early_cache_key, as that function will always produce a BaseModel subclass as the first item in the key,
477 # whereas this function will always produce a tuple as the first item in the key.
478 return _union_orderings_key(typevar_values), origin, args