Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pydantic/_internal/_generics.py: 64%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

228 statements  

1from __future__ import annotations 

2 

3import operator 

4import sys 

5import types 

6import typing 

7from collections import ChainMap 

8from collections.abc import Iterator, Mapping 

9from contextlib import contextmanager 

10from contextvars import ContextVar 

11from functools import reduce 

12from itertools import zip_longest 

13from types import prepare_class 

14from typing import TYPE_CHECKING, Annotated, Any, TypedDict, TypeVar, cast 

15from weakref import WeakValueDictionary 

16 

17import typing_extensions 

18from typing_inspection import typing_objects 

19from typing_inspection.introspection import is_union_origin 

20 

21from . import _typing_extra 

22from ._core_utils import get_type_ref 

23from ._forward_ref import PydanticRecursiveRef 

24from ._utils import all_identical, is_model_class 

25 

26if TYPE_CHECKING: 

27 from ..main import BaseModel 

28 

29GenericTypesCacheKey = tuple[Any, Any, tuple[Any, ...]] 

30 

31# Note: We want to remove LimitedDict, but to do this, we'd need to improve the handling of generics caching. 

32# Right now, to handle recursive generics, we some types must remain cached for brief periods without references. 

33# By chaining the WeakValuesDict with a LimitedDict, we have a way to retain caching for all types with references, 

34# while also retaining a limited number of types even without references. This is generally enough to build 

35# specific recursive generic models without losing required items out of the cache. 

36 

37KT = TypeVar('KT') 

38VT = TypeVar('VT') 

39_LIMITED_DICT_SIZE = 100 

40 

41 

42class LimitedDict(dict[KT, VT]): 

43 def __init__(self, size_limit: int = _LIMITED_DICT_SIZE) -> None: 

44 self.size_limit = size_limit 

45 super().__init__() 

46 

47 def __setitem__(self, key: KT, value: VT, /) -> None: 

48 super().__setitem__(key, value) 

49 if len(self) > self.size_limit: 

50 excess = len(self) - self.size_limit + self.size_limit // 10 

51 to_remove = list(self.keys())[:excess] 

52 for k in to_remove: 

53 del self[k] 

54 

55 

56# weak dictionaries allow the dynamically created parametrized versions of generic models to get collected 

57# once they are no longer referenced by the caller. 

58GenericTypesCache = WeakValueDictionary[GenericTypesCacheKey, 'type[BaseModel]'] 

59 

60if TYPE_CHECKING: 

61 

62 class DeepChainMap(ChainMap[KT, VT]): # type: ignore 

63 ... 

64 

65else: 

66 

67 class DeepChainMap(ChainMap): 

68 """Variant of ChainMap that allows direct updates to inner scopes. 

69 

70 Taken from https://docs.python.org/3/library/collections.html#collections.ChainMap, 

71 with some light modifications for this use case. 

72 """ 

73 

74 def clear(self) -> None: 

75 for mapping in self.maps: 

76 mapping.clear() 

77 

78 def __setitem__(self, key: KT, value: VT) -> None: 

79 for mapping in self.maps: 

80 mapping[key] = value 

81 

82 def __delitem__(self, key: KT) -> None: 

83 hit = False 

84 for mapping in self.maps: 

85 if key in mapping: 

86 del mapping[key] 

87 hit = True 

88 if not hit: 

89 raise KeyError(key) 

90 

91 

92# Despite the fact that LimitedDict _seems_ no longer necessary, I'm very nervous to actually remove it 

93# and discover later on that we need to re-add all this infrastructure... 

94# _GENERIC_TYPES_CACHE = DeepChainMap(GenericTypesCache(), LimitedDict()) 

95 

96_GENERIC_TYPES_CACHE = GenericTypesCache() 

97 

98 

99class PydanticGenericMetadata(TypedDict): 

100 origin: type[BaseModel] | None # analogous to typing._GenericAlias.__origin__ 

101 args: tuple[Any, ...] # analogous to typing._GenericAlias.__args__ 

102 parameters: tuple[TypeVar, ...] # analogous to typing.Generic.__parameters__ 

103 

104 

105def create_generic_submodel( 

106 model_name: str, origin: type[BaseModel], args: tuple[Any, ...], params: tuple[Any, ...] 

107) -> type[BaseModel]: 

108 """Dynamically create a submodel of a provided (generic) BaseModel. 

109 

110 This is used when producing concrete parametrizations of generic models. This function 

111 only *creates* the new subclass; the schema/validators/serialization must be updated to 

112 reflect a concrete parametrization elsewhere. 

113 

114 Args: 

115 model_name: The name of the newly created model. 

116 origin: The base class for the new model to inherit from. 

117 args: A tuple of generic metadata arguments. 

118 params: A tuple of generic metadata parameters. 

119 

120 Returns: 

121 The created submodel. 

122 """ 

123 namespace: dict[str, Any] = {'__module__': origin.__module__} 

124 bases = (origin,) 

125 meta, ns, kwds = prepare_class(model_name, bases) 

126 namespace.update(ns) 

127 created_model = meta( 

128 model_name, 

129 bases, 

130 namespace, 

131 __pydantic_generic_metadata__={ 

132 'origin': origin, 

133 'args': args, 

134 'parameters': params, 

135 }, 

136 __pydantic_reset_parent_namespace__=False, 

137 **kwds, 

138 ) 

139 

140 model_module, called_globally = _get_caller_frame_info(depth=3) 

141 if called_globally: # create global reference and therefore allow pickling 

142 object_by_reference = None 

143 reference_name = model_name 

144 reference_module_globals = sys.modules[created_model.__module__].__dict__ 

145 while object_by_reference is not created_model: 

146 object_by_reference = reference_module_globals.setdefault(reference_name, created_model) 

147 reference_name += '_' 

148 

149 return created_model 

150 

151 

152def _get_caller_frame_info(depth: int = 2) -> tuple[str | None, bool]: 

153 """Used inside a function to check whether it was called globally. 

154 

155 Args: 

156 depth: The depth to get the frame. 

157 

158 Returns: 

159 A tuple contains `module_name` and `called_globally`. 

160 

161 Raises: 

162 RuntimeError: If the function is not called inside a function. 

163 """ 

164 try: 

165 previous_caller_frame = sys._getframe(depth) 

166 except ValueError as e: 

167 raise RuntimeError('This function must be used inside another function') from e 

168 except AttributeError: # sys module does not have _getframe function, so there's nothing we can do about it 

169 return None, False 

170 frame_globals = previous_caller_frame.f_globals 

171 return frame_globals.get('__name__'), previous_caller_frame.f_locals is frame_globals 

172 

173 

174DictValues: type[Any] = {}.values().__class__ 

175 

176 

177def iter_contained_typevars(v: Any) -> Iterator[TypeVar]: 

178 """Recursively iterate through all subtypes and type args of `v` and yield any typevars that are found. 

179 

180 This is inspired as an alternative to directly accessing the `__parameters__` attribute of a GenericAlias, 

181 since __parameters__ of (nested) generic BaseModel subclasses won't show up in that list. 

182 """ 

183 if isinstance(v, TypeVar): 

184 yield v 

185 elif is_model_class(v): 

186 yield from v.__pydantic_generic_metadata__['parameters'] 

187 elif isinstance(v, (DictValues, list)): 

188 for var in v: 

189 yield from iter_contained_typevars(var) 

190 else: 

191 args = get_args(v) 

192 for arg in args: 

193 yield from iter_contained_typevars(arg) 

194 

195 

196def get_args(v: Any) -> Any: 

197 pydantic_generic_metadata: PydanticGenericMetadata | None = getattr(v, '__pydantic_generic_metadata__', None) 

198 if pydantic_generic_metadata: 

199 return pydantic_generic_metadata.get('args') 

200 return typing_extensions.get_args(v) 

201 

202 

203def get_origin(v: Any) -> Any: 

204 pydantic_generic_metadata: PydanticGenericMetadata | None = getattr(v, '__pydantic_generic_metadata__', None) 

205 if pydantic_generic_metadata: 

206 return pydantic_generic_metadata.get('origin') 

207 return typing_extensions.get_origin(v) 

208 

209 

210def get_standard_typevars_map(cls: Any) -> dict[TypeVar, Any] | None: 

211 """Package a generic type's typevars and parametrization (if present) into a dictionary compatible with the 

212 `replace_types` function. Specifically, this works with standard typing generics and typing._GenericAlias. 

213 """ 

214 origin = get_origin(cls) 

215 if origin is None: 

216 return None 

217 if not hasattr(origin, '__parameters__'): 

218 return None 

219 

220 # In this case, we know that cls is a _GenericAlias, and origin is the generic type 

221 # So it is safe to access cls.__args__ and origin.__parameters__ 

222 args: tuple[Any, ...] = cls.__args__ # type: ignore 

223 parameters: tuple[TypeVar, ...] = origin.__parameters__ 

224 return dict(zip(parameters, args)) 

225 

226 

227def get_model_typevars_map(cls: type[BaseModel]) -> dict[TypeVar, Any]: 

228 """Package a generic BaseModel's typevars and concrete parametrization (if present) into a dictionary compatible 

229 with the `replace_types` function. 

230 

231 Since BaseModel.__class_getitem__ does not produce a typing._GenericAlias, and the BaseModel generic info is 

232 stored in the __pydantic_generic_metadata__ attribute, we need special handling here. 

233 """ 

234 # TODO: This could be unified with `get_standard_typevars_map` if we stored the generic metadata 

235 # in the __origin__, __args__, and __parameters__ attributes of the model. 

236 generic_metadata = cls.__pydantic_generic_metadata__ 

237 origin = generic_metadata['origin'] 

238 args = generic_metadata['args'] 

239 if not args: 

240 # No need to go into `iter_contained_typevars`: 

241 return {} 

242 return dict(zip(iter_contained_typevars(origin), args)) 

243 

244 

245def replace_types(type_: Any, type_map: Mapping[TypeVar, Any] | None) -> Any: 

246 """Return type with all occurrences of `type_map` keys recursively replaced with their values. 

247 

248 Args: 

249 type_: The class or generic alias. 

250 type_map: Mapping from `TypeVar` instance to concrete types. 

251 

252 Returns: 

253 A new type representing the basic structure of `type_` with all 

254 `typevar_map` keys recursively replaced. 

255 

256 Example: 

257 ```python 

258 from typing import Union 

259 

260 from pydantic._internal._generics import replace_types 

261 

262 replace_types(tuple[str, Union[list[str], float]], {str: int}) 

263 #> tuple[int, Union[list[int], float]] 

264 ``` 

265 """ 

266 if not type_map: 

267 return type_ 

268 

269 type_args = get_args(type_) 

270 origin_type = get_origin(type_) 

271 

272 if typing_objects.is_annotated(origin_type): 

273 annotated_type, *annotations = type_args 

274 annotated_type = replace_types(annotated_type, type_map) 

275 # TODO remove parentheses when we drop support for Python 3.10: 

276 return Annotated[(annotated_type, *annotations)] 

277 

278 # Having type args is a good indicator that this is a typing special form 

279 # instance or a generic alias of some sort. 

280 if type_args: 

281 resolved_type_args = tuple(replace_types(arg, type_map) for arg in type_args) 

282 if all_identical(type_args, resolved_type_args): 

283 # If all arguments are the same, there is no need to modify the 

284 # type or create a new object at all 

285 return type_ 

286 

287 if ( 

288 origin_type is not None 

289 and isinstance(type_, _typing_extra.typing_base) 

290 and not isinstance(origin_type, _typing_extra.typing_base) 

291 and getattr(type_, '_name', None) is not None 

292 ): 

293 # In python < 3.9 generic aliases don't exist so any of these like `list`, 

294 # `type` or `collections.abc.Callable` need to be translated. 

295 # See: https://www.python.org/dev/peps/pep-0585 

296 origin_type = getattr(typing, type_._name) 

297 assert origin_type is not None 

298 

299 if is_union_origin(origin_type): 

300 if any(typing_objects.is_any(arg) for arg in resolved_type_args): 

301 # `Any | T` ~ `Any`: 

302 resolved_type_args = (Any,) 

303 # `Never | T` ~ `T`: 

304 resolved_type_args = tuple( 

305 arg 

306 for arg in resolved_type_args 

307 if not (typing_objects.is_noreturn(arg) or typing_objects.is_never(arg)) 

308 ) 

309 

310 # PEP-604 syntax (e.g. `list | str`) is represented with a types.UnionType object that does not 

311 # implement `__getitem__()`. In Python 3.14+, `typing.Union` and `types.UnionType` are the same, 

312 # and we instead rely on `typing.Union` as it implicitly converts string annotations to `ForwardRef` 

313 # instances (this is to avoid type errors as per https://github.com/python/cpython/pull/105366). 

314 # TODO remove type ignore comment when we drop support for Python 3.9 (https://github.com/microsoft/pyright/issues/11241): 

315 if (3, 10) <= sys.version_info < (3, 14) and origin_type is types.UnionType: # pyright: ignore[reportAttributeAccessIssue] 

316 return reduce(operator.or_, resolved_type_args) 

317 # NotRequired[T] and Required[T] don't support tuple type resolved_type_args, hence the condition below 

318 return origin_type[resolved_type_args[0] if len(resolved_type_args) == 1 else resolved_type_args] 

319 

320 # We handle pydantic generic models separately as they don't have the same 

321 # semantics as "typing" classes or generic aliases 

322 

323 if not origin_type and is_model_class(type_): 

324 parameters = type_.__pydantic_generic_metadata__['parameters'] 

325 if not parameters: 

326 return type_ 

327 resolved_type_args = tuple(replace_types(t, type_map) for t in parameters) 

328 if all_identical(parameters, resolved_type_args): 

329 return type_ 

330 return type_[resolved_type_args] 

331 

332 # Handle special case for typehints that can have lists as arguments. 

333 # `typing.Callable[[int, str], int]` is an example for this. 

334 if isinstance(type_, list): 

335 resolved_list = [replace_types(element, type_map) for element in type_] 

336 if all_identical(type_, resolved_list): 

337 return type_ 

338 return resolved_list 

339 

340 # If all else fails, we try to resolve the type directly and otherwise just 

341 # return the input with no modifications. 

342 return type_map.get(type_, type_) 

343 

344 

345def map_generic_model_arguments(cls: type[BaseModel], args: tuple[Any, ...]) -> dict[TypeVar, Any]: 

346 """Return a mapping between the parameters of a generic model and the provided arguments during parameterization. 

347 

348 Raises: 

349 TypeError: If the number of arguments does not match the parameters (i.e. if providing too few or too many arguments). 

350 

351 Example: 

352 ```python {test="skip" lint="skip"} 

353 class Model[T, U, V = int](BaseModel): ... 

354 

355 map_generic_model_arguments(Model, (str, bytes)) 

356 #> {T: str, U: bytes, V: int} 

357 

358 map_generic_model_arguments(Model, (str,)) 

359 #> TypeError: Too few arguments for <class '__main__.Model'>; actual 1, expected at least 2 

360 

361 map_generic_model_arguments(Model, (str, bytes, int, complex)) 

362 #> TypeError: Too many arguments for <class '__main__.Model'>; actual 4, expected 3 

363 ``` 

364 

365 Note: 

366 This function is analogous to the private `typing._check_generic_specialization` function. 

367 """ 

368 parameters = cls.__pydantic_generic_metadata__['parameters'] 

369 expected_len = len(parameters) 

370 typevars_map: dict[TypeVar, Any] = {} 

371 

372 _missing = object() 

373 for parameter, argument in zip_longest(parameters, args, fillvalue=_missing): 

374 if parameter is _missing: 

375 raise TypeError(f'Too many arguments for {cls}; actual {len(args)}, expected {expected_len}') 

376 

377 if argument is _missing: 

378 param = cast(TypeVar, parameter) 

379 try: 

380 has_default = param.has_default() # pyright: ignore[reportAttributeAccessIssue] 

381 except AttributeError: 

382 # Happens if using `typing.TypeVar` (and not `typing_extensions`) on Python < 3.13. 

383 has_default = False 

384 if has_default: 

385 # The default might refer to other type parameters. For an example, see: 

386 # https://typing.python.org/en/latest/spec/generics.html#type-parameters-as-parameters-to-generics 

387 typevars_map[param] = replace_types(param.__default__, typevars_map) # pyright: ignore[reportAttributeAccessIssue] 

388 else: 

389 expected_len -= sum(hasattr(p, 'has_default') and p.has_default() for p in parameters) # pyright: ignore[reportAttributeAccessIssue] 

390 raise TypeError(f'Too few arguments for {cls}; actual {len(args)}, expected at least {expected_len}') 

391 else: 

392 param = cast(TypeVar, parameter) 

393 typevars_map[param] = argument 

394 

395 return typevars_map 

396 

397 

398_generic_recursion_cache: ContextVar[set[str] | None] = ContextVar('_generic_recursion_cache', default=None) 

399 

400 

401@contextmanager 

402def generic_recursion_self_type( 

403 origin: type[BaseModel], args: tuple[Any, ...] 

404) -> Iterator[PydanticRecursiveRef | None]: 

405 """This contextmanager should be placed around the recursive calls used to build a generic type, 

406 and accept as arguments the generic origin type and the type arguments being passed to it. 

407 

408 If the same origin and arguments are observed twice, it implies that a self-reference placeholder 

409 can be used while building the core schema, and will produce a schema_ref that will be valid in the 

410 final parent schema. 

411 """ 

412 previously_seen_type_refs = _generic_recursion_cache.get() 

413 if previously_seen_type_refs is None: 

414 previously_seen_type_refs = set() 

415 token = _generic_recursion_cache.set(previously_seen_type_refs) 

416 else: 

417 token = None 

418 

419 try: 

420 type_ref = get_type_ref(origin, args_override=args) 

421 if type_ref in previously_seen_type_refs: 

422 self_type = PydanticRecursiveRef(type_ref=type_ref) 

423 yield self_type 

424 else: 

425 previously_seen_type_refs.add(type_ref) 

426 yield 

427 previously_seen_type_refs.remove(type_ref) 

428 finally: 

429 if token: 

430 _generic_recursion_cache.reset(token) 

431 

432 

433def recursively_defined_type_refs() -> set[str]: 

434 visited = _generic_recursion_cache.get() 

435 if not visited: 

436 return set() # not in a generic recursion, so there are no types 

437 

438 return visited.copy() # don't allow modifications 

439 

440 

441def get_cached_generic_type_early(parent: type[BaseModel], typevar_values: Any) -> type[BaseModel] | None: 

442 """The use of a two-stage cache lookup approach was necessary to have the highest performance possible for 

443 repeated calls to `__class_getitem__` on generic types (which may happen in tighter loops during runtime), 

444 while still ensuring that certain alternative parametrizations ultimately resolve to the same type. 

445 

446 As a concrete example, this approach was necessary to make Model[List[T]][int] equal to Model[List[int]]. 

447 The approach could be modified to not use two different cache keys at different points, but the 

448 _early_cache_key is optimized to be as quick to compute as possible (for repeated-access speed), and the 

449 _late_cache_key is optimized to be as "correct" as possible, so that two types that will ultimately be the 

450 same after resolving the type arguments will always produce cache hits. 

451 

452 If we wanted to move to only using a single cache key per type, we would either need to always use the 

453 slower/more computationally intensive logic associated with _late_cache_key, or would need to accept 

454 that Model[List[T]][int] is a different type than Model[List[T]][int]. Because we rely on subclass relationships 

455 during validation, I think it is worthwhile to ensure that types that are functionally equivalent are actually 

456 equal. 

457 """ 

458 return _GENERIC_TYPES_CACHE.get(_early_cache_key(parent, typevar_values)) 

459 

460 

461def get_cached_generic_type_late( 

462 parent: type[BaseModel], typevar_values: Any, origin: type[BaseModel], args: tuple[Any, ...] 

463) -> type[BaseModel] | None: 

464 """See the docstring of `get_cached_generic_type_early` for more information about the two-stage cache lookup.""" 

465 cached = _GENERIC_TYPES_CACHE.get(_late_cache_key(origin, args, typevar_values)) 

466 if cached is not None: 

467 set_cached_generic_type(parent, typevar_values, cached, origin, args) 

468 return cached 

469 

470 

471def set_cached_generic_type( 

472 parent: type[BaseModel], 

473 typevar_values: tuple[Any, ...], 

474 type_: type[BaseModel], 

475 origin: type[BaseModel] | None = None, 

476 args: tuple[Any, ...] | None = None, 

477) -> None: 

478 """See the docstring of `get_cached_generic_type_early` for more information about why items are cached with 

479 two different keys. 

480 """ 

481 _GENERIC_TYPES_CACHE[_early_cache_key(parent, typevar_values)] = type_ 

482 if len(typevar_values) == 1: 

483 _GENERIC_TYPES_CACHE[_early_cache_key(parent, typevar_values[0])] = type_ 

484 if origin and args: 

485 _GENERIC_TYPES_CACHE[_late_cache_key(origin, args, typevar_values)] = type_ 

486 

487 

488def _union_orderings_key(typevar_values: Any) -> Any: 

489 """This is intended to help differentiate between Union types with the same arguments in different order. 

490 

491 Thanks to caching internal to the `typing` module, it is not possible to distinguish between 

492 List[Union[int, float]] and List[Union[float, int]] (and similarly for other "parent" origins besides List) 

493 because `typing` considers Union[int, float] to be equal to Union[float, int]. 

494 

495 However, you _can_ distinguish between (top-level) Union[int, float] vs. Union[float, int]. 

496 Because we parse items as the first Union type that is successful, we get slightly more consistent behavior 

497 if we make an effort to distinguish the ordering of items in a union. It would be best if we could _always_ 

498 get the exact-correct order of items in the union, but that would require a change to the `typing` module itself. 

499 (See https://github.com/python/cpython/issues/86483 for reference.) 

500 """ 

501 if isinstance(typevar_values, tuple): 

502 return tuple(_union_orderings_key(value) for value in typevar_values) 

503 elif typing_objects.is_union(typing_extensions.get_origin(typevar_values)): 

504 return get_args(typevar_values) 

505 else: 

506 return () 

507 

508 

509def _early_cache_key(cls: type[BaseModel], typevar_values: Any) -> GenericTypesCacheKey: 

510 """This is intended for minimal computational overhead during lookups of cached types. 

511 

512 Note that this is overly simplistic, and it's possible that two different cls/typevar_values 

513 inputs would ultimately result in the same type being created in BaseModel.__class_getitem__. 

514 To handle this, we have a fallback _late_cache_key that is checked later if the _early_cache_key 

515 lookup fails, and should result in a cache hit _precisely_ when the inputs to __class_getitem__ 

516 would result in the same type. 

517 """ 

518 return cls, typevar_values, _union_orderings_key(typevar_values) 

519 

520 

521def _late_cache_key(origin: type[BaseModel], args: tuple[Any, ...], typevar_values: Any) -> GenericTypesCacheKey: 

522 """This is intended for use later in the process of creating a new type, when we have more information 

523 about the exact args that will be passed. If it turns out that a different set of inputs to 

524 __class_getitem__ resulted in the same inputs to the generic type creation process, we can still 

525 return the cached type, and update the cache with the _early_cache_key as well. 

526 """ 

527 # The _union_orderings_key is placed at the start here to ensure there cannot be a collision with an 

528 # _early_cache_key, as that function will always produce a BaseModel subclass as the first item in the key, 

529 # whereas this function will always produce a tuple as the first item in the key. 

530 return _union_orderings_key(typevar_values), origin, args