Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pydantic/_internal/_generics.py: 66%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

237 statements  

1from __future__ import annotations 

2 

3import sys 

4import types 

5import typing 

6from collections import ChainMap 

7from collections.abc import Iterator, Mapping 

8from contextlib import contextmanager 

9from contextvars import ContextVar 

10from itertools import zip_longest 

11from types import prepare_class 

12from typing import TYPE_CHECKING, Annotated, Any, TypeVar 

13from weakref import WeakValueDictionary 

14 

15import typing_extensions 

16from typing_inspection import typing_objects 

17from typing_inspection.introspection import is_union_origin 

18 

19from . import _typing_extra 

20from ._core_utils import get_type_ref 

21from ._forward_ref import PydanticRecursiveRef 

22from ._utils import all_identical, is_model_class 

23 

24if sys.version_info >= (3, 10): 

25 from typing import _UnionGenericAlias # type: ignore[attr-defined] 

26 

27if TYPE_CHECKING: 

28 from ..main import BaseModel 

29 

30GenericTypesCacheKey = tuple[Any, Any, tuple[Any, ...]] 

31 

32# Note: We want to remove LimitedDict, but to do this, we'd need to improve the handling of generics caching. 

33# Right now, to handle recursive generics, we some types must remain cached for brief periods without references. 

34# By chaining the WeakValuesDict with a LimitedDict, we have a way to retain caching for all types with references, 

35# while also retaining a limited number of types even without references. This is generally enough to build 

36# specific recursive generic models without losing required items out of the cache. 

37 

38KT = TypeVar('KT') 

39VT = TypeVar('VT') 

40_LIMITED_DICT_SIZE = 100 

41 

42 

43class LimitedDict(dict[KT, VT]): 

44 def __init__(self, size_limit: int = _LIMITED_DICT_SIZE) -> None: 

45 self.size_limit = size_limit 

46 super().__init__() 

47 

48 def __setitem__(self, key: KT, value: VT, /) -> None: 

49 super().__setitem__(key, value) 

50 if len(self) > self.size_limit: 

51 excess = len(self) - self.size_limit + self.size_limit // 10 

52 to_remove = list(self.keys())[:excess] 

53 for k in to_remove: 

54 del self[k] 

55 

56 

57# weak dictionaries allow the dynamically created parametrized versions of generic models to get collected 

58# once they are no longer referenced by the caller. 

59GenericTypesCache = WeakValueDictionary[GenericTypesCacheKey, 'type[BaseModel]'] 

60 

61if TYPE_CHECKING: 

62 

63 class DeepChainMap(ChainMap[KT, VT]): # type: ignore 

64 ... 

65 

66else: 

67 

68 class DeepChainMap(ChainMap): 

69 """Variant of ChainMap that allows direct updates to inner scopes. 

70 

71 Taken from https://docs.python.org/3/library/collections.html#collections.ChainMap, 

72 with some light modifications for this use case. 

73 """ 

74 

75 def clear(self) -> None: 

76 for mapping in self.maps: 

77 mapping.clear() 

78 

79 def __setitem__(self, key: KT, value: VT) -> None: 

80 for mapping in self.maps: 

81 mapping[key] = value 

82 

83 def __delitem__(self, key: KT) -> None: 

84 hit = False 

85 for mapping in self.maps: 

86 if key in mapping: 

87 del mapping[key] 

88 hit = True 

89 if not hit: 

90 raise KeyError(key) 

91 

92 

93# Despite the fact that LimitedDict _seems_ no longer necessary, I'm very nervous to actually remove it 

94# and discover later on that we need to re-add all this infrastructure... 

95# _GENERIC_TYPES_CACHE = DeepChainMap(GenericTypesCache(), LimitedDict()) 

96 

97_GENERIC_TYPES_CACHE: ContextVar[GenericTypesCache | None] = ContextVar('_GENERIC_TYPES_CACHE', default=None) 

98 

99 

100class PydanticGenericMetadata(typing_extensions.TypedDict): 

101 origin: type[BaseModel] | None # analogous to typing._GenericAlias.__origin__ 

102 args: tuple[Any, ...] # analogous to typing._GenericAlias.__args__ 

103 parameters: tuple[TypeVar, ...] # analogous to typing.Generic.__parameters__ 

104 

105 

106def create_generic_submodel( 

107 model_name: str, origin: type[BaseModel], args: tuple[Any, ...], params: tuple[Any, ...] 

108) -> type[BaseModel]: 

109 """Dynamically create a submodel of a provided (generic) BaseModel. 

110 

111 This is used when producing concrete parametrizations of generic models. This function 

112 only *creates* the new subclass; the schema/validators/serialization must be updated to 

113 reflect a concrete parametrization elsewhere. 

114 

115 Args: 

116 model_name: The name of the newly created model. 

117 origin: The base class for the new model to inherit from. 

118 args: A tuple of generic metadata arguments. 

119 params: A tuple of generic metadata parameters. 

120 

121 Returns: 

122 The created submodel. 

123 """ 

124 namespace: dict[str, Any] = {'__module__': origin.__module__} 

125 bases = (origin,) 

126 meta, ns, kwds = prepare_class(model_name, bases) 

127 namespace.update(ns) 

128 created_model = meta( 

129 model_name, 

130 bases, 

131 namespace, 

132 __pydantic_generic_metadata__={ 

133 'origin': origin, 

134 'args': args, 

135 'parameters': params, 

136 }, 

137 __pydantic_reset_parent_namespace__=False, 

138 **kwds, 

139 ) 

140 

141 model_module, called_globally = _get_caller_frame_info(depth=3) 

142 if called_globally: # create global reference and therefore allow pickling 

143 object_by_reference = None 

144 reference_name = model_name 

145 reference_module_globals = sys.modules[created_model.__module__].__dict__ 

146 while object_by_reference is not created_model: 

147 object_by_reference = reference_module_globals.setdefault(reference_name, created_model) 

148 reference_name += '_' 

149 

150 return created_model 

151 

152 

153def _get_caller_frame_info(depth: int = 2) -> tuple[str | None, bool]: 

154 """Used inside a function to check whether it was called globally. 

155 

156 Args: 

157 depth: The depth to get the frame. 

158 

159 Returns: 

160 A tuple contains `module_name` and `called_globally`. 

161 

162 Raises: 

163 RuntimeError: If the function is not called inside a function. 

164 """ 

165 try: 

166 previous_caller_frame = sys._getframe(depth) 

167 except ValueError as e: 

168 raise RuntimeError('This function must be used inside another function') from e 

169 except AttributeError: # sys module does not have _getframe function, so there's nothing we can do about it 

170 return None, False 

171 frame_globals = previous_caller_frame.f_globals 

172 return frame_globals.get('__name__'), previous_caller_frame.f_locals is frame_globals 

173 

174 

175DictValues: type[Any] = {}.values().__class__ 

176 

177 

178def iter_contained_typevars(v: Any) -> Iterator[TypeVar]: 

179 """Recursively iterate through all subtypes and type args of `v` and yield any typevars that are found. 

180 

181 This is inspired as an alternative to directly accessing the `__parameters__` attribute of a GenericAlias, 

182 since __parameters__ of (nested) generic BaseModel subclasses won't show up in that list. 

183 """ 

184 if isinstance(v, TypeVar): 

185 yield v 

186 elif is_model_class(v): 

187 yield from v.__pydantic_generic_metadata__['parameters'] 

188 elif isinstance(v, (DictValues, list)): 

189 for var in v: 

190 yield from iter_contained_typevars(var) 

191 else: 

192 args = get_args(v) 

193 for arg in args: 

194 yield from iter_contained_typevars(arg) 

195 

196 

197def get_args(v: Any) -> Any: 

198 pydantic_generic_metadata: PydanticGenericMetadata | None = getattr(v, '__pydantic_generic_metadata__', None) 

199 if pydantic_generic_metadata: 

200 return pydantic_generic_metadata.get('args') 

201 return typing_extensions.get_args(v) 

202 

203 

204def get_origin(v: Any) -> Any: 

205 pydantic_generic_metadata: PydanticGenericMetadata | None = getattr(v, '__pydantic_generic_metadata__', None) 

206 if pydantic_generic_metadata: 

207 return pydantic_generic_metadata.get('origin') 

208 return typing_extensions.get_origin(v) 

209 

210 

211def get_standard_typevars_map(cls: Any) -> dict[TypeVar, Any] | None: 

212 """Package a generic type's typevars and parametrization (if present) into a dictionary compatible with the 

213 `replace_types` function. Specifically, this works with standard typing generics and typing._GenericAlias. 

214 """ 

215 origin = get_origin(cls) 

216 if origin is None: 

217 return None 

218 if not hasattr(origin, '__parameters__'): 

219 return None 

220 

221 # In this case, we know that cls is a _GenericAlias, and origin is the generic type 

222 # So it is safe to access cls.__args__ and origin.__parameters__ 

223 args: tuple[Any, ...] = cls.__args__ # type: ignore 

224 parameters: tuple[TypeVar, ...] = origin.__parameters__ 

225 return dict(zip(parameters, args)) 

226 

227 

228def get_model_typevars_map(cls: type[BaseModel]) -> dict[TypeVar, Any]: 

229 """Package a generic BaseModel's typevars and concrete parametrization (if present) into a dictionary compatible 

230 with the `replace_types` function. 

231 

232 Since BaseModel.__class_getitem__ does not produce a typing._GenericAlias, and the BaseModel generic info is 

233 stored in the __pydantic_generic_metadata__ attribute, we need special handling here. 

234 """ 

235 # TODO: This could be unified with `get_standard_typevars_map` if we stored the generic metadata 

236 # in the __origin__, __args__, and __parameters__ attributes of the model. 

237 generic_metadata = cls.__pydantic_generic_metadata__ 

238 origin = generic_metadata['origin'] 

239 args = generic_metadata['args'] 

240 if not args: 

241 # No need to go into `iter_contained_typevars`: 

242 return {} 

243 return dict(zip(iter_contained_typevars(origin), args)) 

244 

245 

246def replace_types(type_: Any, type_map: Mapping[TypeVar, Any] | None) -> Any: 

247 """Return type with all occurrences of `type_map` keys recursively replaced with their values. 

248 

249 Args: 

250 type_: The class or generic alias. 

251 type_map: Mapping from `TypeVar` instance to concrete types. 

252 

253 Returns: 

254 A new type representing the basic structure of `type_` with all 

255 `typevar_map` keys recursively replaced. 

256 

257 Example: 

258 ```python 

259 from typing import List, Union 

260 

261 from pydantic._internal._generics import replace_types 

262 

263 replace_types(tuple[str, Union[List[str], float]], {str: int}) 

264 #> tuple[int, Union[List[int], float]] 

265 ``` 

266 """ 

267 if not type_map: 

268 return type_ 

269 

270 type_args = get_args(type_) 

271 origin_type = get_origin(type_) 

272 

273 if typing_objects.is_annotated(origin_type): 

274 annotated_type, *annotations = type_args 

275 annotated_type = replace_types(annotated_type, type_map) 

276 # TODO remove parentheses when we drop support for Python 3.10: 

277 return Annotated[(annotated_type, *annotations)] 

278 

279 # Having type args is a good indicator that this is a typing special form 

280 # instance or a generic alias of some sort. 

281 if type_args: 

282 resolved_type_args = tuple(replace_types(arg, type_map) for arg in type_args) 

283 if all_identical(type_args, resolved_type_args): 

284 # If all arguments are the same, there is no need to modify the 

285 # type or create a new object at all 

286 return type_ 

287 

288 if ( 

289 origin_type is not None 

290 and isinstance(type_, _typing_extra.typing_base) 

291 and not isinstance(origin_type, _typing_extra.typing_base) 

292 and getattr(type_, '_name', None) is not None 

293 ): 

294 # In python < 3.9 generic aliases don't exist so any of these like `list`, 

295 # `type` or `collections.abc.Callable` need to be translated. 

296 # See: https://www.python.org/dev/peps/pep-0585 

297 origin_type = getattr(typing, type_._name) 

298 assert origin_type is not None 

299 

300 if is_union_origin(origin_type): 

301 if any(typing_objects.is_any(arg) for arg in resolved_type_args): 

302 # `Any | T` ~ `Any`: 

303 resolved_type_args = (Any,) 

304 # `Never | T` ~ `T`: 

305 resolved_type_args = tuple( 

306 arg 

307 for arg in resolved_type_args 

308 if not (typing_objects.is_noreturn(arg) or typing_objects.is_never(arg)) 

309 ) 

310 

311 # PEP-604 syntax (Ex.: list | str) is represented with a types.UnionType object that does not have __getitem__. 

312 # We also cannot use isinstance() since we have to compare types. 

313 if sys.version_info >= (3, 10) and origin_type is types.UnionType: 

314 return _UnionGenericAlias(origin_type, resolved_type_args) 

315 # NotRequired[T] and Required[T] don't support tuple type resolved_type_args, hence the condition below 

316 return origin_type[resolved_type_args[0] if len(resolved_type_args) == 1 else resolved_type_args] 

317 

318 # We handle pydantic generic models separately as they don't have the same 

319 # semantics as "typing" classes or generic aliases 

320 

321 if not origin_type and is_model_class(type_): 

322 parameters = type_.__pydantic_generic_metadata__['parameters'] 

323 if not parameters: 

324 return type_ 

325 resolved_type_args = tuple(replace_types(t, type_map) for t in parameters) 

326 if all_identical(parameters, resolved_type_args): 

327 return type_ 

328 return type_[resolved_type_args] 

329 

330 # Handle special case for typehints that can have lists as arguments. 

331 # `typing.Callable[[int, str], int]` is an example for this. 

332 if isinstance(type_, list): 

333 resolved_list = [replace_types(element, type_map) for element in type_] 

334 if all_identical(type_, resolved_list): 

335 return type_ 

336 return resolved_list 

337 

338 # If all else fails, we try to resolve the type directly and otherwise just 

339 # return the input with no modifications. 

340 return type_map.get(type_, type_) 

341 

342 

343def map_generic_model_arguments(cls: type[BaseModel], args: tuple[Any, ...]) -> dict[TypeVar, Any]: 

344 """Return a mapping between the parameters of a generic model and the provided arguments during parameterization. 

345 

346 Raises: 

347 TypeError: If the number of arguments does not match the parameters (i.e. if providing too few or too many arguments). 

348 

349 Example: 

350 ```python {test="skip" lint="skip"} 

351 class Model[T, U, V = int](BaseModel): ... 

352 

353 map_generic_model_arguments(Model, (str, bytes)) 

354 #> {T: str, U: bytes, V: int} 

355 

356 map_generic_model_arguments(Model, (str,)) 

357 #> TypeError: Too few arguments for <class '__main__.Model'>; actual 1, expected at least 2 

358 

359 map_generic_model_arguments(Model, (str, bytes, int, complex)) 

360 #> TypeError: Too many arguments for <class '__main__.Model'>; actual 4, expected 3 

361 ``` 

362 

363 Note: 

364 This function is analogous to the private `typing._check_generic_specialization` function. 

365 """ 

366 parameters = cls.__pydantic_generic_metadata__['parameters'] 

367 expected_len = len(parameters) 

368 typevars_map: dict[TypeVar, Any] = {} 

369 

370 _missing = object() 

371 for parameter, argument in zip_longest(parameters, args, fillvalue=_missing): 

372 if parameter is _missing: 

373 raise TypeError(f'Too many arguments for {cls}; actual {len(args)}, expected {expected_len}') 

374 

375 if argument is _missing: 

376 param = typing.cast(TypeVar, parameter) 

377 try: 

378 has_default = param.has_default() 

379 except AttributeError: 

380 # Happens if using `typing.TypeVar` (and not `typing_extensions`) on Python < 3.13. 

381 has_default = False 

382 if has_default: 

383 # The default might refer to other type parameters. For an example, see: 

384 # https://typing.readthedocs.io/en/latest/spec/generics.html#type-parameters-as-parameters-to-generics 

385 typevars_map[param] = replace_types(param.__default__, typevars_map) 

386 else: 

387 expected_len -= sum(hasattr(p, 'has_default') and p.has_default() for p in parameters) 

388 raise TypeError(f'Too few arguments for {cls}; actual {len(args)}, expected at least {expected_len}') 

389 else: 

390 param = typing.cast(TypeVar, parameter) 

391 typevars_map[param] = argument 

392 

393 return typevars_map 

394 

395 

396_generic_recursion_cache: ContextVar[set[str] | None] = ContextVar('_generic_recursion_cache', default=None) 

397 

398 

399@contextmanager 

400def generic_recursion_self_type( 

401 origin: type[BaseModel], args: tuple[Any, ...] 

402) -> Iterator[PydanticRecursiveRef | None]: 

403 """This contextmanager should be placed around the recursive calls used to build a generic type, 

404 and accept as arguments the generic origin type and the type arguments being passed to it. 

405 

406 If the same origin and arguments are observed twice, it implies that a self-reference placeholder 

407 can be used while building the core schema, and will produce a schema_ref that will be valid in the 

408 final parent schema. 

409 """ 

410 previously_seen_type_refs = _generic_recursion_cache.get() 

411 if previously_seen_type_refs is None: 

412 previously_seen_type_refs = set() 

413 token = _generic_recursion_cache.set(previously_seen_type_refs) 

414 else: 

415 token = None 

416 

417 try: 

418 type_ref = get_type_ref(origin, args_override=args) 

419 if type_ref in previously_seen_type_refs: 

420 self_type = PydanticRecursiveRef(type_ref=type_ref) 

421 yield self_type 

422 else: 

423 previously_seen_type_refs.add(type_ref) 

424 yield 

425 previously_seen_type_refs.remove(type_ref) 

426 finally: 

427 if token: 

428 _generic_recursion_cache.reset(token) 

429 

430 

431def recursively_defined_type_refs() -> set[str]: 

432 visited = _generic_recursion_cache.get() 

433 if not visited: 

434 return set() # not in a generic recursion, so there are no types 

435 

436 return visited.copy() # don't allow modifications 

437 

438 

439def get_cached_generic_type_early(parent: type[BaseModel], typevar_values: Any) -> type[BaseModel] | None: 

440 """The use of a two-stage cache lookup approach was necessary to have the highest performance possible for 

441 repeated calls to `__class_getitem__` on generic types (which may happen in tighter loops during runtime), 

442 while still ensuring that certain alternative parametrizations ultimately resolve to the same type. 

443 

444 As a concrete example, this approach was necessary to make Model[List[T]][int] equal to Model[List[int]]. 

445 The approach could be modified to not use two different cache keys at different points, but the 

446 _early_cache_key is optimized to be as quick to compute as possible (for repeated-access speed), and the 

447 _late_cache_key is optimized to be as "correct" as possible, so that two types that will ultimately be the 

448 same after resolving the type arguments will always produce cache hits. 

449 

450 If we wanted to move to only using a single cache key per type, we would either need to always use the 

451 slower/more computationally intensive logic associated with _late_cache_key, or would need to accept 

452 that Model[List[T]][int] is a different type than Model[List[T]][int]. Because we rely on subclass relationships 

453 during validation, I think it is worthwhile to ensure that types that are functionally equivalent are actually 

454 equal. 

455 """ 

456 generic_types_cache = _GENERIC_TYPES_CACHE.get() 

457 if generic_types_cache is None: 

458 generic_types_cache = GenericTypesCache() 

459 _GENERIC_TYPES_CACHE.set(generic_types_cache) 

460 return generic_types_cache.get(_early_cache_key(parent, typevar_values)) 

461 

462 

463def get_cached_generic_type_late( 

464 parent: type[BaseModel], typevar_values: Any, origin: type[BaseModel], args: tuple[Any, ...] 

465) -> type[BaseModel] | None: 

466 """See the docstring of `get_cached_generic_type_early` for more information about the two-stage cache lookup.""" 

467 generic_types_cache = _GENERIC_TYPES_CACHE.get() 

468 if ( 

469 generic_types_cache is None 

470 ): # pragma: no cover (early cache is guaranteed to run first and initialize the cache) 

471 generic_types_cache = GenericTypesCache() 

472 _GENERIC_TYPES_CACHE.set(generic_types_cache) 

473 cached = generic_types_cache.get(_late_cache_key(origin, args, typevar_values)) 

474 if cached is not None: 

475 set_cached_generic_type(parent, typevar_values, cached, origin, args) 

476 return cached 

477 

478 

479def set_cached_generic_type( 

480 parent: type[BaseModel], 

481 typevar_values: tuple[Any, ...], 

482 type_: type[BaseModel], 

483 origin: type[BaseModel] | None = None, 

484 args: tuple[Any, ...] | None = None, 

485) -> None: 

486 """See the docstring of `get_cached_generic_type_early` for more information about why items are cached with 

487 two different keys. 

488 """ 

489 generic_types_cache = _GENERIC_TYPES_CACHE.get() 

490 if ( 

491 generic_types_cache is None 

492 ): # pragma: no cover (cache lookup is guaranteed to run first and initialize the cache) 

493 generic_types_cache = GenericTypesCache() 

494 _GENERIC_TYPES_CACHE.set(generic_types_cache) 

495 generic_types_cache[_early_cache_key(parent, typevar_values)] = type_ 

496 if len(typevar_values) == 1: 

497 generic_types_cache[_early_cache_key(parent, typevar_values[0])] = type_ 

498 if origin and args: 

499 generic_types_cache[_late_cache_key(origin, args, typevar_values)] = type_ 

500 

501 

502def _union_orderings_key(typevar_values: Any) -> Any: 

503 """This is intended to help differentiate between Union types with the same arguments in different order. 

504 

505 Thanks to caching internal to the `typing` module, it is not possible to distinguish between 

506 List[Union[int, float]] and List[Union[float, int]] (and similarly for other "parent" origins besides List) 

507 because `typing` considers Union[int, float] to be equal to Union[float, int]. 

508 

509 However, you _can_ distinguish between (top-level) Union[int, float] vs. Union[float, int]. 

510 Because we parse items as the first Union type that is successful, we get slightly more consistent behavior 

511 if we make an effort to distinguish the ordering of items in a union. It would be best if we could _always_ 

512 get the exact-correct order of items in the union, but that would require a change to the `typing` module itself. 

513 (See https://github.com/python/cpython/issues/86483 for reference.) 

514 """ 

515 if isinstance(typevar_values, tuple): 

516 args_data = [] 

517 for value in typevar_values: 

518 args_data.append(_union_orderings_key(value)) 

519 return tuple(args_data) 

520 elif typing_objects.is_union(typing_extensions.get_origin(typevar_values)): 

521 return get_args(typevar_values) 

522 else: 

523 return () 

524 

525 

526def _early_cache_key(cls: type[BaseModel], typevar_values: Any) -> GenericTypesCacheKey: 

527 """This is intended for minimal computational overhead during lookups of cached types. 

528 

529 Note that this is overly simplistic, and it's possible that two different cls/typevar_values 

530 inputs would ultimately result in the same type being created in BaseModel.__class_getitem__. 

531 To handle this, we have a fallback _late_cache_key that is checked later if the _early_cache_key 

532 lookup fails, and should result in a cache hit _precisely_ when the inputs to __class_getitem__ 

533 would result in the same type. 

534 """ 

535 return cls, typevar_values, _union_orderings_key(typevar_values) 

536 

537 

538def _late_cache_key(origin: type[BaseModel], args: tuple[Any, ...], typevar_values: Any) -> GenericTypesCacheKey: 

539 """This is intended for use later in the process of creating a new type, when we have more information 

540 about the exact args that will be passed. If it turns out that a different set of inputs to 

541 __class_getitem__ resulted in the same inputs to the generic type creation process, we can still 

542 return the cached type, and update the cache with the _early_cache_key as well. 

543 """ 

544 # The _union_orderings_key is placed at the start here to ensure there cannot be a collision with an 

545 # _early_cache_key, as that function will always produce a BaseModel subclass as the first item in the key, 

546 # whereas this function will always produce a tuple as the first item in the key. 

547 return _union_orderings_key(typevar_values), origin, args