1from __future__ import annotations
2
3import sys
4import types
5import typing
6from collections import ChainMap
7from collections.abc import Iterator, Mapping
8from contextlib import contextmanager
9from contextvars import ContextVar
10from itertools import zip_longest
11from types import prepare_class
12from typing import TYPE_CHECKING, Annotated, Any, TypeVar
13from weakref import WeakValueDictionary
14
15import typing_extensions
16from typing_inspection import typing_objects
17from typing_inspection.introspection import is_union_origin
18
19from . import _typing_extra
20from ._core_utils import get_type_ref
21from ._forward_ref import PydanticRecursiveRef
22from ._utils import all_identical, is_model_class
23
24if sys.version_info >= (3, 10):
25 from typing import _UnionGenericAlias # type: ignore[attr-defined]
26
27if TYPE_CHECKING:
28 from ..main import BaseModel
29
30GenericTypesCacheKey = tuple[Any, Any, tuple[Any, ...]]
31
32# Note: We want to remove LimitedDict, but to do this, we'd need to improve the handling of generics caching.
33# Right now, to handle recursive generics, we some types must remain cached for brief periods without references.
34# By chaining the WeakValuesDict with a LimitedDict, we have a way to retain caching for all types with references,
35# while also retaining a limited number of types even without references. This is generally enough to build
36# specific recursive generic models without losing required items out of the cache.
37
38KT = TypeVar('KT')
39VT = TypeVar('VT')
40_LIMITED_DICT_SIZE = 100
41
42
43class LimitedDict(dict[KT, VT]):
44 def __init__(self, size_limit: int = _LIMITED_DICT_SIZE) -> None:
45 self.size_limit = size_limit
46 super().__init__()
47
48 def __setitem__(self, key: KT, value: VT, /) -> None:
49 super().__setitem__(key, value)
50 if len(self) > self.size_limit:
51 excess = len(self) - self.size_limit + self.size_limit // 10
52 to_remove = list(self.keys())[:excess]
53 for k in to_remove:
54 del self[k]
55
56
57# weak dictionaries allow the dynamically created parametrized versions of generic models to get collected
58# once they are no longer referenced by the caller.
59GenericTypesCache = WeakValueDictionary[GenericTypesCacheKey, 'type[BaseModel]']
60
61if TYPE_CHECKING:
62
63 class DeepChainMap(ChainMap[KT, VT]): # type: ignore
64 ...
65
66else:
67
68 class DeepChainMap(ChainMap):
69 """Variant of ChainMap that allows direct updates to inner scopes.
70
71 Taken from https://docs.python.org/3/library/collections.html#collections.ChainMap,
72 with some light modifications for this use case.
73 """
74
75 def clear(self) -> None:
76 for mapping in self.maps:
77 mapping.clear()
78
79 def __setitem__(self, key: KT, value: VT) -> None:
80 for mapping in self.maps:
81 mapping[key] = value
82
83 def __delitem__(self, key: KT) -> None:
84 hit = False
85 for mapping in self.maps:
86 if key in mapping:
87 del mapping[key]
88 hit = True
89 if not hit:
90 raise KeyError(key)
91
92
93# Despite the fact that LimitedDict _seems_ no longer necessary, I'm very nervous to actually remove it
94# and discover later on that we need to re-add all this infrastructure...
95# _GENERIC_TYPES_CACHE = DeepChainMap(GenericTypesCache(), LimitedDict())
96
97_GENERIC_TYPES_CACHE: ContextVar[GenericTypesCache | None] = ContextVar('_GENERIC_TYPES_CACHE', default=None)
98
99
100class PydanticGenericMetadata(typing_extensions.TypedDict):
101 origin: type[BaseModel] | None # analogous to typing._GenericAlias.__origin__
102 args: tuple[Any, ...] # analogous to typing._GenericAlias.__args__
103 parameters: tuple[TypeVar, ...] # analogous to typing.Generic.__parameters__
104
105
106def create_generic_submodel(
107 model_name: str, origin: type[BaseModel], args: tuple[Any, ...], params: tuple[Any, ...]
108) -> type[BaseModel]:
109 """Dynamically create a submodel of a provided (generic) BaseModel.
110
111 This is used when producing concrete parametrizations of generic models. This function
112 only *creates* the new subclass; the schema/validators/serialization must be updated to
113 reflect a concrete parametrization elsewhere.
114
115 Args:
116 model_name: The name of the newly created model.
117 origin: The base class for the new model to inherit from.
118 args: A tuple of generic metadata arguments.
119 params: A tuple of generic metadata parameters.
120
121 Returns:
122 The created submodel.
123 """
124 namespace: dict[str, Any] = {'__module__': origin.__module__}
125 bases = (origin,)
126 meta, ns, kwds = prepare_class(model_name, bases)
127 namespace.update(ns)
128 created_model = meta(
129 model_name,
130 bases,
131 namespace,
132 __pydantic_generic_metadata__={
133 'origin': origin,
134 'args': args,
135 'parameters': params,
136 },
137 __pydantic_reset_parent_namespace__=False,
138 **kwds,
139 )
140
141 model_module, called_globally = _get_caller_frame_info(depth=3)
142 if called_globally: # create global reference and therefore allow pickling
143 object_by_reference = None
144 reference_name = model_name
145 reference_module_globals = sys.modules[created_model.__module__].__dict__
146 while object_by_reference is not created_model:
147 object_by_reference = reference_module_globals.setdefault(reference_name, created_model)
148 reference_name += '_'
149
150 return created_model
151
152
153def _get_caller_frame_info(depth: int = 2) -> tuple[str | None, bool]:
154 """Used inside a function to check whether it was called globally.
155
156 Args:
157 depth: The depth to get the frame.
158
159 Returns:
160 A tuple contains `module_name` and `called_globally`.
161
162 Raises:
163 RuntimeError: If the function is not called inside a function.
164 """
165 try:
166 previous_caller_frame = sys._getframe(depth)
167 except ValueError as e:
168 raise RuntimeError('This function must be used inside another function') from e
169 except AttributeError: # sys module does not have _getframe function, so there's nothing we can do about it
170 return None, False
171 frame_globals = previous_caller_frame.f_globals
172 return frame_globals.get('__name__'), previous_caller_frame.f_locals is frame_globals
173
174
175DictValues: type[Any] = {}.values().__class__
176
177
178def iter_contained_typevars(v: Any) -> Iterator[TypeVar]:
179 """Recursively iterate through all subtypes and type args of `v` and yield any typevars that are found.
180
181 This is inspired as an alternative to directly accessing the `__parameters__` attribute of a GenericAlias,
182 since __parameters__ of (nested) generic BaseModel subclasses won't show up in that list.
183 """
184 if isinstance(v, TypeVar):
185 yield v
186 elif is_model_class(v):
187 yield from v.__pydantic_generic_metadata__['parameters']
188 elif isinstance(v, (DictValues, list)):
189 for var in v:
190 yield from iter_contained_typevars(var)
191 else:
192 args = get_args(v)
193 for arg in args:
194 yield from iter_contained_typevars(arg)
195
196
197def get_args(v: Any) -> Any:
198 pydantic_generic_metadata: PydanticGenericMetadata | None = getattr(v, '__pydantic_generic_metadata__', None)
199 if pydantic_generic_metadata:
200 return pydantic_generic_metadata.get('args')
201 return typing_extensions.get_args(v)
202
203
204def get_origin(v: Any) -> Any:
205 pydantic_generic_metadata: PydanticGenericMetadata | None = getattr(v, '__pydantic_generic_metadata__', None)
206 if pydantic_generic_metadata:
207 return pydantic_generic_metadata.get('origin')
208 return typing_extensions.get_origin(v)
209
210
211def get_standard_typevars_map(cls: Any) -> dict[TypeVar, Any] | None:
212 """Package a generic type's typevars and parametrization (if present) into a dictionary compatible with the
213 `replace_types` function. Specifically, this works with standard typing generics and typing._GenericAlias.
214 """
215 origin = get_origin(cls)
216 if origin is None:
217 return None
218 if not hasattr(origin, '__parameters__'):
219 return None
220
221 # In this case, we know that cls is a _GenericAlias, and origin is the generic type
222 # So it is safe to access cls.__args__ and origin.__parameters__
223 args: tuple[Any, ...] = cls.__args__ # type: ignore
224 parameters: tuple[TypeVar, ...] = origin.__parameters__
225 return dict(zip(parameters, args))
226
227
228def get_model_typevars_map(cls: type[BaseModel]) -> dict[TypeVar, Any]:
229 """Package a generic BaseModel's typevars and concrete parametrization (if present) into a dictionary compatible
230 with the `replace_types` function.
231
232 Since BaseModel.__class_getitem__ does not produce a typing._GenericAlias, and the BaseModel generic info is
233 stored in the __pydantic_generic_metadata__ attribute, we need special handling here.
234 """
235 # TODO: This could be unified with `get_standard_typevars_map` if we stored the generic metadata
236 # in the __origin__, __args__, and __parameters__ attributes of the model.
237 generic_metadata = cls.__pydantic_generic_metadata__
238 origin = generic_metadata['origin']
239 args = generic_metadata['args']
240 if not args:
241 # No need to go into `iter_contained_typevars`:
242 return {}
243 return dict(zip(iter_contained_typevars(origin), args))
244
245
246def replace_types(type_: Any, type_map: Mapping[TypeVar, Any] | None) -> Any:
247 """Return type with all occurrences of `type_map` keys recursively replaced with their values.
248
249 Args:
250 type_: The class or generic alias.
251 type_map: Mapping from `TypeVar` instance to concrete types.
252
253 Returns:
254 A new type representing the basic structure of `type_` with all
255 `typevar_map` keys recursively replaced.
256
257 Example:
258 ```python
259 from typing import List, Union
260
261 from pydantic._internal._generics import replace_types
262
263 replace_types(tuple[str, Union[List[str], float]], {str: int})
264 #> tuple[int, Union[List[int], float]]
265 ```
266 """
267 if not type_map:
268 return type_
269
270 type_args = get_args(type_)
271 origin_type = get_origin(type_)
272
273 if typing_objects.is_annotated(origin_type):
274 annotated_type, *annotations = type_args
275 annotated_type = replace_types(annotated_type, type_map)
276 # TODO remove parentheses when we drop support for Python 3.10:
277 return Annotated[(annotated_type, *annotations)]
278
279 # Having type args is a good indicator that this is a typing special form
280 # instance or a generic alias of some sort.
281 if type_args:
282 resolved_type_args = tuple(replace_types(arg, type_map) for arg in type_args)
283 if all_identical(type_args, resolved_type_args):
284 # If all arguments are the same, there is no need to modify the
285 # type or create a new object at all
286 return type_
287
288 if (
289 origin_type is not None
290 and isinstance(type_, _typing_extra.typing_base)
291 and not isinstance(origin_type, _typing_extra.typing_base)
292 and getattr(type_, '_name', None) is not None
293 ):
294 # In python < 3.9 generic aliases don't exist so any of these like `list`,
295 # `type` or `collections.abc.Callable` need to be translated.
296 # See: https://www.python.org/dev/peps/pep-0585
297 origin_type = getattr(typing, type_._name)
298 assert origin_type is not None
299
300 if is_union_origin(origin_type):
301 if any(typing_objects.is_any(arg) for arg in resolved_type_args):
302 # `Any | T` ~ `Any`:
303 resolved_type_args = (Any,)
304 # `Never | T` ~ `T`:
305 resolved_type_args = tuple(
306 arg
307 for arg in resolved_type_args
308 if not (typing_objects.is_noreturn(arg) or typing_objects.is_never(arg))
309 )
310
311 # PEP-604 syntax (Ex.: list | str) is represented with a types.UnionType object that does not have __getitem__.
312 # We also cannot use isinstance() since we have to compare types.
313 if sys.version_info >= (3, 10) and origin_type is types.UnionType:
314 return _UnionGenericAlias(origin_type, resolved_type_args)
315 # NotRequired[T] and Required[T] don't support tuple type resolved_type_args, hence the condition below
316 return origin_type[resolved_type_args[0] if len(resolved_type_args) == 1 else resolved_type_args]
317
318 # We handle pydantic generic models separately as they don't have the same
319 # semantics as "typing" classes or generic aliases
320
321 if not origin_type and is_model_class(type_):
322 parameters = type_.__pydantic_generic_metadata__['parameters']
323 if not parameters:
324 return type_
325 resolved_type_args = tuple(replace_types(t, type_map) for t in parameters)
326 if all_identical(parameters, resolved_type_args):
327 return type_
328 return type_[resolved_type_args]
329
330 # Handle special case for typehints that can have lists as arguments.
331 # `typing.Callable[[int, str], int]` is an example for this.
332 if isinstance(type_, list):
333 resolved_list = [replace_types(element, type_map) for element in type_]
334 if all_identical(type_, resolved_list):
335 return type_
336 return resolved_list
337
338 # If all else fails, we try to resolve the type directly and otherwise just
339 # return the input with no modifications.
340 return type_map.get(type_, type_)
341
342
343def map_generic_model_arguments(cls: type[BaseModel], args: tuple[Any, ...]) -> dict[TypeVar, Any]:
344 """Return a mapping between the parameters of a generic model and the provided arguments during parameterization.
345
346 Raises:
347 TypeError: If the number of arguments does not match the parameters (i.e. if providing too few or too many arguments).
348
349 Example:
350 ```python {test="skip" lint="skip"}
351 class Model[T, U, V = int](BaseModel): ...
352
353 map_generic_model_arguments(Model, (str, bytes))
354 #> {T: str, U: bytes, V: int}
355
356 map_generic_model_arguments(Model, (str,))
357 #> TypeError: Too few arguments for <class '__main__.Model'>; actual 1, expected at least 2
358
359 map_generic_model_arguments(Model, (str, bytes, int, complex))
360 #> TypeError: Too many arguments for <class '__main__.Model'>; actual 4, expected 3
361 ```
362
363 Note:
364 This function is analogous to the private `typing._check_generic_specialization` function.
365 """
366 parameters = cls.__pydantic_generic_metadata__['parameters']
367 expected_len = len(parameters)
368 typevars_map: dict[TypeVar, Any] = {}
369
370 _missing = object()
371 for parameter, argument in zip_longest(parameters, args, fillvalue=_missing):
372 if parameter is _missing:
373 raise TypeError(f'Too many arguments for {cls}; actual {len(args)}, expected {expected_len}')
374
375 if argument is _missing:
376 param = typing.cast(TypeVar, parameter)
377 try:
378 has_default = param.has_default()
379 except AttributeError:
380 # Happens if using `typing.TypeVar` (and not `typing_extensions`) on Python < 3.13.
381 has_default = False
382 if has_default:
383 # The default might refer to other type parameters. For an example, see:
384 # https://typing.readthedocs.io/en/latest/spec/generics.html#type-parameters-as-parameters-to-generics
385 typevars_map[param] = replace_types(param.__default__, typevars_map)
386 else:
387 expected_len -= sum(hasattr(p, 'has_default') and p.has_default() for p in parameters)
388 raise TypeError(f'Too few arguments for {cls}; actual {len(args)}, expected at least {expected_len}')
389 else:
390 param = typing.cast(TypeVar, parameter)
391 typevars_map[param] = argument
392
393 return typevars_map
394
395
396_generic_recursion_cache: ContextVar[set[str] | None] = ContextVar('_generic_recursion_cache', default=None)
397
398
399@contextmanager
400def generic_recursion_self_type(
401 origin: type[BaseModel], args: tuple[Any, ...]
402) -> Iterator[PydanticRecursiveRef | None]:
403 """This contextmanager should be placed around the recursive calls used to build a generic type,
404 and accept as arguments the generic origin type and the type arguments being passed to it.
405
406 If the same origin and arguments are observed twice, it implies that a self-reference placeholder
407 can be used while building the core schema, and will produce a schema_ref that will be valid in the
408 final parent schema.
409 """
410 previously_seen_type_refs = _generic_recursion_cache.get()
411 if previously_seen_type_refs is None:
412 previously_seen_type_refs = set()
413 token = _generic_recursion_cache.set(previously_seen_type_refs)
414 else:
415 token = None
416
417 try:
418 type_ref = get_type_ref(origin, args_override=args)
419 if type_ref in previously_seen_type_refs:
420 self_type = PydanticRecursiveRef(type_ref=type_ref)
421 yield self_type
422 else:
423 previously_seen_type_refs.add(type_ref)
424 yield
425 previously_seen_type_refs.remove(type_ref)
426 finally:
427 if token:
428 _generic_recursion_cache.reset(token)
429
430
431def recursively_defined_type_refs() -> set[str]:
432 visited = _generic_recursion_cache.get()
433 if not visited:
434 return set() # not in a generic recursion, so there are no types
435
436 return visited.copy() # don't allow modifications
437
438
439def get_cached_generic_type_early(parent: type[BaseModel], typevar_values: Any) -> type[BaseModel] | None:
440 """The use of a two-stage cache lookup approach was necessary to have the highest performance possible for
441 repeated calls to `__class_getitem__` on generic types (which may happen in tighter loops during runtime),
442 while still ensuring that certain alternative parametrizations ultimately resolve to the same type.
443
444 As a concrete example, this approach was necessary to make Model[List[T]][int] equal to Model[List[int]].
445 The approach could be modified to not use two different cache keys at different points, but the
446 _early_cache_key is optimized to be as quick to compute as possible (for repeated-access speed), and the
447 _late_cache_key is optimized to be as "correct" as possible, so that two types that will ultimately be the
448 same after resolving the type arguments will always produce cache hits.
449
450 If we wanted to move to only using a single cache key per type, we would either need to always use the
451 slower/more computationally intensive logic associated with _late_cache_key, or would need to accept
452 that Model[List[T]][int] is a different type than Model[List[T]][int]. Because we rely on subclass relationships
453 during validation, I think it is worthwhile to ensure that types that are functionally equivalent are actually
454 equal.
455 """
456 generic_types_cache = _GENERIC_TYPES_CACHE.get()
457 if generic_types_cache is None:
458 generic_types_cache = GenericTypesCache()
459 _GENERIC_TYPES_CACHE.set(generic_types_cache)
460 return generic_types_cache.get(_early_cache_key(parent, typevar_values))
461
462
463def get_cached_generic_type_late(
464 parent: type[BaseModel], typevar_values: Any, origin: type[BaseModel], args: tuple[Any, ...]
465) -> type[BaseModel] | None:
466 """See the docstring of `get_cached_generic_type_early` for more information about the two-stage cache lookup."""
467 generic_types_cache = _GENERIC_TYPES_CACHE.get()
468 if (
469 generic_types_cache is None
470 ): # pragma: no cover (early cache is guaranteed to run first and initialize the cache)
471 generic_types_cache = GenericTypesCache()
472 _GENERIC_TYPES_CACHE.set(generic_types_cache)
473 cached = generic_types_cache.get(_late_cache_key(origin, args, typevar_values))
474 if cached is not None:
475 set_cached_generic_type(parent, typevar_values, cached, origin, args)
476 return cached
477
478
479def set_cached_generic_type(
480 parent: type[BaseModel],
481 typevar_values: tuple[Any, ...],
482 type_: type[BaseModel],
483 origin: type[BaseModel] | None = None,
484 args: tuple[Any, ...] | None = None,
485) -> None:
486 """See the docstring of `get_cached_generic_type_early` for more information about why items are cached with
487 two different keys.
488 """
489 generic_types_cache = _GENERIC_TYPES_CACHE.get()
490 if (
491 generic_types_cache is None
492 ): # pragma: no cover (cache lookup is guaranteed to run first and initialize the cache)
493 generic_types_cache = GenericTypesCache()
494 _GENERIC_TYPES_CACHE.set(generic_types_cache)
495 generic_types_cache[_early_cache_key(parent, typevar_values)] = type_
496 if len(typevar_values) == 1:
497 generic_types_cache[_early_cache_key(parent, typevar_values[0])] = type_
498 if origin and args:
499 generic_types_cache[_late_cache_key(origin, args, typevar_values)] = type_
500
501
502def _union_orderings_key(typevar_values: Any) -> Any:
503 """This is intended to help differentiate between Union types with the same arguments in different order.
504
505 Thanks to caching internal to the `typing` module, it is not possible to distinguish between
506 List[Union[int, float]] and List[Union[float, int]] (and similarly for other "parent" origins besides List)
507 because `typing` considers Union[int, float] to be equal to Union[float, int].
508
509 However, you _can_ distinguish between (top-level) Union[int, float] vs. Union[float, int].
510 Because we parse items as the first Union type that is successful, we get slightly more consistent behavior
511 if we make an effort to distinguish the ordering of items in a union. It would be best if we could _always_
512 get the exact-correct order of items in the union, but that would require a change to the `typing` module itself.
513 (See https://github.com/python/cpython/issues/86483 for reference.)
514 """
515 if isinstance(typevar_values, tuple):
516 args_data = []
517 for value in typevar_values:
518 args_data.append(_union_orderings_key(value))
519 return tuple(args_data)
520 elif typing_objects.is_union(typing_extensions.get_origin(typevar_values)):
521 return get_args(typevar_values)
522 else:
523 return ()
524
525
526def _early_cache_key(cls: type[BaseModel], typevar_values: Any) -> GenericTypesCacheKey:
527 """This is intended for minimal computational overhead during lookups of cached types.
528
529 Note that this is overly simplistic, and it's possible that two different cls/typevar_values
530 inputs would ultimately result in the same type being created in BaseModel.__class_getitem__.
531 To handle this, we have a fallback _late_cache_key that is checked later if the _early_cache_key
532 lookup fails, and should result in a cache hit _precisely_ when the inputs to __class_getitem__
533 would result in the same type.
534 """
535 return cls, typevar_values, _union_orderings_key(typevar_values)
536
537
538def _late_cache_key(origin: type[BaseModel], args: tuple[Any, ...], typevar_values: Any) -> GenericTypesCacheKey:
539 """This is intended for use later in the process of creating a new type, when we have more information
540 about the exact args that will be passed. If it turns out that a different set of inputs to
541 __class_getitem__ resulted in the same inputs to the generic type creation process, we can still
542 return the cached type, and update the cache with the _early_cache_key as well.
543 """
544 # The _union_orderings_key is placed at the start here to ensure there cannot be a collision with an
545 # _early_cache_key, as that function will always produce a BaseModel subclass as the first item in the key,
546 # whereas this function will always produce a tuple as the first item in the key.
547 return _union_orderings_key(typevar_values), origin, args