Coverage for /pythoncovmergedfiles/medio/medio/src/pydantic/pydantic/_internal/_generics.py: 34%

215 statements  

« prev     ^ index     » next       coverage.py v7.2.3, created at 2023-04-27 07:38 +0000

1from __future__ import annotations 

2 

3import sys 

4import types 

5import typing 

6from collections import ChainMap 

7from contextlib import contextmanager 

8from contextvars import ContextVar 

9from types import prepare_class 

10from typing import TYPE_CHECKING, Any, Iterator, List, Mapping, MutableMapping, Tuple, TypeVar 

11from weakref import WeakValueDictionary 

12 

13import typing_extensions 

14 

15from ._core_utils import get_type_ref 

16from ._forward_ref import PydanticForwardRef, PydanticRecursiveRef 

17from ._typing_extra import TypeVarType, typing_base 

18from ._utils import all_identical, is_basemodel 

19 

20if sys.version_info >= (3, 10): 

21 from typing import _UnionGenericAlias # type: ignore[attr-defined] 

22 

23if TYPE_CHECKING: 

24 from ..main import BaseModel 

25 

26GenericTypesCacheKey = Tuple[Any, Any, Tuple[Any, ...]] 

27 

28# TODO: We want to remove LimitedDict, but to do this, we'll need to improve the handling of generics caching 

29# Right now, to handle recursive generics, we some types must remain cached for brief periods without references 

30# By chaining the WeakValuesDict with a LimitedDict, we have a way to retain caching for all types with references, 

31# while also retaining a limited number of types even without references. This is generally enough to build 

32# specific recursive generic models without losing required items out of the cache. 

33 

34KT = TypeVar('KT') 

35VT = TypeVar('VT') 

36_LIMITED_DICT_SIZE = 100 

37if TYPE_CHECKING: 

38 

39 class LimitedDict(dict, MutableMapping[KT, VT]): # type: ignore[type-arg] 

40 def __init__(self, size_limit: int = _LIMITED_DICT_SIZE): 

41 ... 

42 

43else: 

44 

45 class LimitedDict(dict): 

46 """ 

47 Limit the size/length of a dict used for caching to avoid unlimited increase in memory usage. 

48 

49 Since the dict is ordered, and we always remove elements from the beginning, this is effectively a FIFO cache. 

50 """ 

51 

52 def __init__(self, size_limit: int = _LIMITED_DICT_SIZE): 

53 self.size_limit = size_limit 

54 super().__init__() 

55 

56 def __setitem__(self, __key: Any, __value: Any) -> None: 

57 super().__setitem__(__key, __value) 

58 if len(self) > self.size_limit: 

59 excess = len(self) - self.size_limit + self.size_limit // 10 

60 to_remove = list(self.keys())[:excess] 

61 for key in to_remove: 

62 del self[key] 

63 

64 def __class_getitem__(cls, *args: Any) -> Any: 

65 # to avoid errors with 3.7 

66 return cls 

67 

68 

69# weak dictionaries allow the dynamically created parametrized versions of generic models to get collected 

70# once they are no longer referenced by the caller. 

71if sys.version_info >= (3, 9): # Typing for weak dictionaries available at 3.9 

72 GenericTypesCache = WeakValueDictionary[GenericTypesCacheKey, 'type[BaseModel]'] 

73else: 

74 GenericTypesCache = WeakValueDictionary 

75 

76if TYPE_CHECKING: 

77 

78 class DeepChainMap(ChainMap[KT, VT]): # type: ignore 

79 ... 

80 

81else: 

82 

83 class DeepChainMap(ChainMap): 

84 """ 

85 Variant of ChainMap that allows direct updates to inner scopes 

86 

87 Taken from https://docs.python.org/3/library/collections.html#collections.ChainMap, 

88 with some light modifications for this use case. 

89 """ 

90 

91 def clear(self) -> None: 

92 for mapping in self.maps: 

93 mapping.clear() 

94 

95 def __setitem__(self, key: KT, value: VT) -> None: 

96 for mapping in self.maps: 

97 mapping[key] = value 

98 

99 def __delitem__(self, key: KT) -> None: 

100 hit = False 

101 for mapping in self.maps: 

102 if key in mapping: 

103 del mapping[key] 

104 hit = True 

105 if not hit: 

106 raise KeyError(key) 

107 

108 

109# Despite the fact that LimitedDict _seems_ no longer necessary, I'm very nervous to actually remove it 

110# and discover later on that we need to re-add all this infrastructure... 

111# _GENERIC_TYPES_CACHE = DeepChainMap(GenericTypesCache(), LimitedDict()) 

112 

113_GENERIC_TYPES_CACHE = GenericTypesCache() 

114 

115 

116class PydanticGenericMetadata(typing_extensions.TypedDict): 

117 origin: type[BaseModel] | None # analogous to typing._GenericAlias.__origin__ 

118 args: tuple[Any, ...] # analogous to typing._GenericAlias.__args__ 

119 parameters: tuple[type[Any], ...] # analogous to typing.Generic.__parameters__ 

120 

121 

122def create_generic_submodel( 

123 model_name: str, origin: type[BaseModel], args: tuple[Any, ...], params: tuple[Any, ...] 

124) -> type[BaseModel]: 

125 """ 

126 Dynamically create a submodel of a provided (generic) BaseModel. 

127 

128 This is used when producing concrete parametrizations of generic models. This function 

129 only *creates* the new subclass; the schema/validators/serialization must be updated to 

130 reflect a concrete parametrization elsewhere. 

131 

132 :param model_name: name of the newly created model 

133 :param origin: base class for the new model to inherit from 

134 """ 

135 namespace: dict[str, Any] = {'__module__': origin.__module__} 

136 bases = (origin,) 

137 meta, ns, kwds = prepare_class(model_name, bases) 

138 namespace.update(ns) 

139 created_model = meta( 

140 model_name, 

141 bases, 

142 namespace, 

143 __pydantic_generic_metadata__={ 

144 'origin': origin, 

145 'args': args, 

146 'parameters': params, 

147 }, 

148 __pydantic_reset_parent_namespace__=False, 

149 **kwds, 

150 ) 

151 

152 model_module, called_globally = _get_caller_frame_info(depth=3) 

153 if called_globally: # create global reference and therefore allow pickling 

154 object_by_reference = None 

155 reference_name = model_name 

156 reference_module_globals = sys.modules[created_model.__module__].__dict__ 

157 while object_by_reference is not created_model: 

158 object_by_reference = reference_module_globals.setdefault(reference_name, created_model) 

159 reference_name += '_' 

160 

161 return created_model 

162 

163 

164def _get_caller_frame_info(depth: int = 2) -> tuple[str | None, bool]: 

165 """ 

166 Used inside a function to check whether it was called globally 

167 

168 :returns Tuple[module_name, called_globally] 

169 """ 

170 try: 

171 previous_caller_frame = sys._getframe(depth) 

172 except ValueError as e: 

173 raise RuntimeError('This function must be used inside another function') from e 

174 except AttributeError: # sys module does not have _getframe function, so there's nothing we can do about it 

175 return None, False 

176 frame_globals = previous_caller_frame.f_globals 

177 return frame_globals.get('__name__'), previous_caller_frame.f_locals is frame_globals 

178 

179 

180DictValues: type[Any] = {}.values().__class__ 

181 

182 

183def iter_contained_typevars(v: Any) -> Iterator[TypeVarType]: 

184 """ 

185 Recursively iterate through all subtypes and type args of `v` and yield any typevars that are found. 

186 

187 This is inspired as an alternative to directly accessing the `__parameters__` attribute of a GenericAlias, 

188 since __parameters__ of (nested) generic BaseModel subclasses won't show up in that list. 

189 """ 

190 if isinstance(v, TypeVar): 

191 yield v 

192 elif is_basemodel(v): 

193 yield from v.__pydantic_generic_metadata__['parameters'] 

194 elif isinstance(v, (DictValues, list)): 

195 for var in v: 

196 yield from iter_contained_typevars(var) 

197 else: 

198 args = get_args(v) 

199 for arg in args: 

200 yield from iter_contained_typevars(arg) 

201 

202 

203def get_args(v: Any) -> Any: 

204 pydantic_generic_metadata: PydanticGenericMetadata | None = getattr(v, '__pydantic_generic_metadata__', None) 

205 if pydantic_generic_metadata: 

206 return pydantic_generic_metadata.get('args') 

207 return typing_extensions.get_args(v) 

208 

209 

210def get_origin(v: Any) -> Any: 

211 pydantic_generic_metadata: PydanticGenericMetadata | None = getattr(v, '__pydantic_generic_metadata__', None) 

212 if pydantic_generic_metadata: 

213 return pydantic_generic_metadata.get('origin') 

214 return typing_extensions.get_origin(v) 

215 

216 

217def get_standard_typevars_map(cls: type[Any]) -> dict[TypeVarType, Any] | None: 

218 """ 

219 Package a generic type's typevars and parametrization (if present) into a dictionary compatible with the 

220 `replace_types` function. Specifically, this works with standard typing generics and typing._GenericAlias. 

221 """ 

222 origin = get_origin(cls) 

223 if origin is None: 

224 return None 

225 

226 # In this case, we know that cls is a _GenericAlias, and origin is the generic type 

227 # So it is safe to access cls.__args__ and origin.__parameters__ 

228 args: tuple[Any, ...] = cls.__args__ # type: ignore 

229 parameters: tuple[TypeVarType, ...] = origin.__parameters__ # type: ignore 

230 return dict(zip(parameters, args)) 

231 

232 

233def get_model_typevars_map(cls: type[BaseModel]) -> dict[TypeVarType, Any] | None: 

234 """ 

235 Package a generic BaseModel's typevars and concrete parametrization (if present) into a dictionary compatible 

236 with the `replace_types` function. 

237 

238 Since BaseModel.__class_getitem__ does not produce a typing._GenericAlias, and the BaseModel generic info is 

239 stored in the __pydantic_generic_metadata__ attribute, we need special handling here. 

240 """ 

241 # TODO: This could be unified with `get_standard_typevars_map` if we stored the generic metadata 

242 # in the __origin__, __args__, and __parameters__ attributes of the model. 

243 generic_metadata = cls.__pydantic_generic_metadata__ 

244 origin = generic_metadata['origin'] 

245 args = generic_metadata['args'] 

246 return dict(zip(iter_contained_typevars(origin), args)) 

247 

248 

249def replace_types(type_: Any, type_map: Mapping[Any, Any] | None) -> Any: 

250 """Return type with all occurrences of `type_map` keys recursively replaced with their values. 

251 

252 :param type_: Any type, class or generic alias 

253 :param type_map: Mapping from `TypeVar` instance to concrete types. 

254 :return: New type representing the basic structure of `type_` with all 

255 `typevar_map` keys recursively replaced. 

256 

257 >>> replace_types(Tuple[str, Union[List[str], float]], {str: int}) 

258 Tuple[int, Union[List[int], float]] 

259 

260 """ 

261 if not type_map: 

262 return type_ 

263 

264 type_args = get_args(type_) 

265 origin_type = get_origin(type_) 

266 

267 if origin_type is typing_extensions.Annotated: 

268 annotated_type, *annotations = type_args 

269 annotated = replace_types(annotated_type, type_map) 

270 for annotation in annotations: 

271 annotated = typing_extensions.Annotated[annotated, annotation] 

272 return annotated 

273 

274 # Having type args is a good indicator that this is a typing module 

275 # class instantiation or a generic alias of some sort. 

276 if type_args: 

277 resolved_type_args = tuple(replace_types(arg, type_map) for arg in type_args) 

278 if all_identical(type_args, resolved_type_args): 

279 # If all arguments are the same, there is no need to modify the 

280 # type or create a new object at all 

281 return type_ 

282 if ( 

283 origin_type is not None 

284 and isinstance(type_, typing_base) 

285 and not isinstance(origin_type, typing_base) 

286 and getattr(type_, '_name', None) is not None 

287 ): 

288 # In python < 3.9 generic aliases don't exist so any of these like `list`, 

289 # `type` or `collections.abc.Callable` need to be translated. 

290 # See: https://www.python.org/dev/peps/pep-0585 

291 origin_type = getattr(typing, type_._name) 

292 assert origin_type is not None 

293 # PEP-604 syntax (Ex.: list | str) is represented with a types.UnionType object that does not have __getitem__. 

294 # We also cannot use isinstance() since we have to compare types. 

295 if sys.version_info >= (3, 10) and origin_type is types.UnionType: # noqa: E721 

296 return _UnionGenericAlias(origin_type, resolved_type_args) 

297 return origin_type[resolved_type_args] 

298 

299 # We handle pydantic generic models separately as they don't have the same 

300 # semantics as "typing" classes or generic aliases 

301 

302 if not origin_type and is_basemodel(type_): 

303 parameters = type_.__pydantic_generic_metadata__['parameters'] 

304 if not parameters: 

305 return type_ 

306 resolved_type_args = tuple(replace_types(t, type_map) for t in parameters) 

307 if all_identical(parameters, resolved_type_args): 

308 return type_ 

309 return type_[resolved_type_args] # type: ignore[index] 

310 

311 # Handle special case for typehints that can have lists as arguments. 

312 # `typing.Callable[[int, str], int]` is an example for this. 

313 if isinstance(type_, (List, list)): 

314 resolved_list = list(replace_types(element, type_map) for element in type_) 

315 if all_identical(type_, resolved_list): 

316 return type_ 

317 return resolved_list 

318 

319 if isinstance(type_, PydanticForwardRef): 

320 # queue the replacement as a deferred action 

321 return type_.replace_types(type_map) 

322 

323 # If all else fails, we try to resolve the type directly and otherwise just 

324 # return the input with no modifications. 

325 return type_map.get(type_, type_) 

326 

327 

328def check_parameters_count(cls: type[BaseModel], parameters: tuple[Any, ...]) -> None: 

329 actual = len(parameters) 

330 expected = len(cls.__pydantic_generic_metadata__['parameters']) 

331 if actual != expected: 

332 description = 'many' if actual > expected else 'few' 

333 raise TypeError(f'Too {description} parameters for {cls}; actual {actual}, expected {expected}') 

334 

335 

336_generic_recursion_cache: ContextVar[set[str] | None] = ContextVar('_generic_recursion_cache', default=None) 

337 

338 

339@contextmanager 

340def generic_recursion_self_type( 

341 origin: type[BaseModel], args: tuple[Any, ...] 

342) -> Iterator[PydanticForwardRef | PydanticRecursiveRef | None]: 

343 """ 

344 This contextmanager should be placed around the recursive calls used to build a generic type, 

345 and accept as arguments the generic origin type and the type arguments being passed to it. 

346 

347 If the same origin and arguments are observed twice, it implies that a self-reference placeholder 

348 can be used while building the core schema, and will produce a schema_ref that will be valid in the 

349 final parent schema. 

350 """ 

351 previously_seen_type_refs = _generic_recursion_cache.get() 

352 if previously_seen_type_refs is None: 

353 previously_seen_type_refs = set() 

354 token = _generic_recursion_cache.set(previously_seen_type_refs) 

355 else: 

356 token = None 

357 

358 try: 

359 type_ref = get_type_ref(origin, args_override=args) 

360 if type_ref in previously_seen_type_refs: 

361 self_type = PydanticRecursiveRef(type_ref=type_ref) 

362 yield self_type 

363 else: 

364 previously_seen_type_refs.add(type_ref) 

365 yield None 

366 finally: 

367 if token: 

368 _generic_recursion_cache.reset(token) 

369 

370 

371def recursively_defined_type_refs() -> set[str]: 

372 visited = _generic_recursion_cache.get() 

373 if not visited: 

374 return set() # not in a generic recursion, so there are no types 

375 

376 return visited.copy() # don't allow modifications 

377 

378 

379def get_cached_generic_type_early(parent: type[BaseModel], typevar_values: Any) -> type[BaseModel] | None: 

380 """ 

381 The use of a two-stage cache lookup approach was necessary to have the highest performance possible for 

382 repeated calls to `__class_getitem__` on generic types (which may happen in tighter loops during runtime), 

383 while still ensuring that certain alternative parametrizations ultimately resolve to the same type. 

384 

385 As a concrete example, this approach was necessary to make Model[List[T]][int] equal to Model[List[int]]. 

386 The approach could be modified to not use two different cache keys at different points, but the 

387 _early_cache_key is optimized to be as quick to compute as possible (for repeated-access speed), and the 

388 _late_cache_key is optimized to be as "correct" as possible, so that two types that will ultimately be the 

389 same after resolving the type arguments will always produce cache hits. 

390 

391 If we wanted to move to only using a single cache key per type, we would either need to always use the 

392 slower/more computationally intensive logic associated with _late_cache_key, or would need to accept 

393 that Model[List[T]][int] is a different type than Model[List[T]][int]. Because we rely on subclass relationships 

394 during validation, I think it is worthwhile to ensure that types that are functionally equivalent are actually 

395 equal. 

396 """ 

397 return _GENERIC_TYPES_CACHE.get(_early_cache_key(parent, typevar_values)) 

398 

399 

400def get_cached_generic_type_late( 

401 parent: type[BaseModel], typevar_values: Any, origin: type[BaseModel], args: tuple[Any, ...] 

402) -> type[BaseModel] | None: 

403 """ 

404 See the docstring of `get_cached_generic_type_early` for more information about the two-stage cache lookup. 

405 """ 

406 cached = _GENERIC_TYPES_CACHE.get(_late_cache_key(origin, args, typevar_values)) 

407 if cached is not None: 

408 set_cached_generic_type(parent, typevar_values, cached, origin, args) 

409 return cached 

410 

411 

412def set_cached_generic_type( 

413 parent: type[BaseModel], 

414 typevar_values: tuple[Any, ...], 

415 type_: type[BaseModel], 

416 origin: type[BaseModel] | None = None, 

417 args: tuple[Any, ...] | None = None, 

418) -> None: 

419 """ 

420 See the docstring of `get_cached_generic_type_early` for more information about why items are cached with 

421 two different keys. 

422 """ 

423 _GENERIC_TYPES_CACHE[_early_cache_key(parent, typevar_values)] = type_ 

424 if len(typevar_values) == 1: 

425 _GENERIC_TYPES_CACHE[_early_cache_key(parent, typevar_values[0])] = type_ 

426 if origin and args: 

427 _GENERIC_TYPES_CACHE[_late_cache_key(origin, args, typevar_values)] = type_ 

428 

429 

430def _union_orderings_key(typevar_values: Any) -> Any: 

431 """ 

432 This is intended to help differentiate between Union types with the same arguments in different order. 

433 

434 Thanks to caching internal to the `typing` module, it is not possible to distinguish between 

435 List[Union[int, float]] and List[Union[float, int]] (and similarly for other "parent" origins besides List) 

436 because `typing` considers Union[int, float] to be equal to Union[float, int]. 

437 

438 However, you _can_ distinguish between (top-level) Union[int, float] vs. Union[float, int]. 

439 Because we parse items as the first Union type that is successful, we get slightly more consistent behavior 

440 if we make an effort to distinguish the ordering of items in a union. It would be best if we could _always_ 

441 get the exact-correct order of items in the union, but that would require a change to the `typing` module itself. 

442 (See https://github.com/python/cpython/issues/86483 for reference.) 

443 """ 

444 if isinstance(typevar_values, tuple): 

445 args_data = [] 

446 for value in typevar_values: 

447 args_data.append(_union_orderings_key(value)) 

448 return tuple(args_data) 

449 elif typing_extensions.get_origin(typevar_values) is typing.Union: 

450 return get_args(typevar_values) 

451 else: 

452 return () 

453 

454 

455def _early_cache_key(cls: type[BaseModel], typevar_values: Any) -> GenericTypesCacheKey: 

456 """ 

457 This is intended for minimal computational overhead during lookups of cached types. 

458 

459 Note that this is overly simplistic, and it's possible that two different cls/typevar_values 

460 inputs would ultimately result in the same type being created in BaseModel.__class_getitem__. 

461 To handle this, we have a fallback _late_cache_key that is checked later if the _early_cache_key 

462 lookup fails, and should result in a cache hit _precisely_ when the inputs to __class_getitem__ 

463 would result in the same type. 

464 """ 

465 return cls, typevar_values, _union_orderings_key(typevar_values) 

466 

467 

468def _late_cache_key(origin: type[BaseModel], args: tuple[Any, ...], typevar_values: Any) -> GenericTypesCacheKey: 

469 """ 

470 This is intended for use later in the process of creating a new type, when we have more information 

471 about the exact args that will be passed. If it turns out that a different set of inputs to 

472 __class_getitem__ resulted in the same inputs to the generic type creation process, we can still 

473 return the cached type, and update the cache with the _early_cache_key as well. 

474 """ 

475 # The _union_orderings_key is placed at the start here to ensure there cannot be a collision with an 

476 # _early_cache_key, as that function will always produce a BaseModel subclass as the first item in the key, 

477 # whereas this function will always produce a tuple as the first item in the key. 

478 return _union_orderings_key(typevar_values), origin, args