Coverage for /pythoncovmergedfiles/medio/medio/src/pydantic/pydantic/_internal/_generate_schema.py: 36%

663 statements  

« prev     ^ index     » next       coverage.py v7.2.3, created at 2023-04-27 07:38 +0000

1""" 

2Convert python types to pydantic-core schema. 

3""" 

4from __future__ import annotations as _annotations 

5 

6import collections.abc 

7import dataclasses 

8import inspect 

9import re 

10import sys 

11import typing 

12import warnings 

13from functools import partial 

14from inspect import Parameter, _ParameterKind, signature 

15from itertools import chain 

16from types import FunctionType, LambdaType, MethodType 

17from typing import TYPE_CHECKING, Any, Callable, ForwardRef, Iterable, Mapping, TypeVar, Union 

18 

19from annotated_types import BaseMetadata, GroupedMetadata 

20from pydantic_core import CoreSchema, SchemaError, SchemaValidator, core_schema 

21from typing_extensions import Annotated, Final, Literal, TypedDict, get_args, get_origin, is_typeddict 

22 

23from ..errors import PydanticSchemaGenerationError, PydanticUndefinedAnnotation, PydanticUserError 

24from ..fields import FieldInfo 

25from . import _discriminated_union, _typing_extra 

26from ._config import ConfigWrapper 

27from ._core_metadata import ( 

28 CoreMetadataHandler, 

29 build_metadata_dict, 

30) 

31from ._core_utils import ( 

32 consolidate_refs, 

33 define_expected_missing_refs, 

34 get_type_ref, 

35 is_list_like_schema_with_items_schema, 

36 remove_unnecessary_invalid_definitions, 

37) 

38from ._decorators import ( 

39 ComputedFieldInfo, 

40 Decorator, 

41 DecoratorInfos, 

42 FieldSerializerDecoratorInfo, 

43 FieldValidatorDecoratorInfo, 

44 ModelSerializerDecoratorInfo, 

45 ModelValidatorDecoratorInfo, 

46 RootValidatorDecoratorInfo, 

47 ValidatorDecoratorInfo, 

48 inspect_field_serializer, 

49 inspect_model_serializer, 

50 inspect_validator, 

51) 

52from ._fields import ( 

53 PydanticGeneralMetadata, 

54 PydanticMetadata, 

55 Undefined, 

56 collect_dataclass_fields, 

57 get_type_hints_infer_globalns, 

58) 

59from ._forward_ref import PydanticForwardRef, PydanticRecursiveRef 

60from ._generics import get_standard_typevars_map, recursively_defined_type_refs, replace_types 

61from ._json_schema_shared import ( 

62 CoreSchemaOrField, 

63 GetJsonSchemaFunction, 

64 GetJsonSchemaHandler, 

65 JsonSchemaValue, 

66 UnpackedRefJsonSchemaHandler, 

67 wrap_json_schema_fn_for_model_or_custom_type_with_ref_unpacking, 

68) 

69from ._typing_extra import is_finalvar 

70from ._utils import lenient_issubclass 

71 

72if TYPE_CHECKING: 

73 from ..decorators import FieldValidatorModes 

74 from ..main import BaseModel 

75 from ._dataclasses import StandardDataclass 

76 

77_SUPPORTS_TYPEDDICT = sys.version_info >= (3, 11) 

78 

79FieldDecoratorInfo = Union[ValidatorDecoratorInfo, FieldValidatorDecoratorInfo, FieldSerializerDecoratorInfo] 

80FieldDecoratorInfoType = TypeVar('FieldDecoratorInfoType', bound=FieldDecoratorInfo) 

81AnyFieldDecorator = Union[ 

82 Decorator[ValidatorDecoratorInfo], 

83 Decorator[FieldValidatorDecoratorInfo], 

84 Decorator[FieldSerializerDecoratorInfo], 

85] 

86 

87ModifyCoreSchemaWrapHandler = Callable[[Any], core_schema.CoreSchema] 

88GetCoreSchemaFunction = Callable[[Any, ModifyCoreSchemaWrapHandler], core_schema.CoreSchema] 

89 

90 

91def check_validator_fields_against_field_name( 

92 info: FieldDecoratorInfo, 

93 field: str, 

94) -> bool: 

95 if isinstance(info, ValidatorDecoratorInfo): 

96 # V1 compat: accept `'*'` as a wildcard that matches all fields 

97 if info.fields == ('*',): 

98 return True 

99 for v_field_name in info.fields: 

100 if v_field_name == field: 

101 return True 

102 return False 

103 

104 

105def check_decorator_fields_exist(decorators: Iterable[AnyFieldDecorator], fields: Iterable[str]) -> None: 

106 fields = set(fields) 

107 for dec in decorators: 

108 if isinstance(dec.info, ValidatorDecoratorInfo) and dec.info.fields == ('*',): 

109 # V1 compat: accept `'*'` as a wildcard that matches all fields 

110 continue 

111 if dec.info.check_fields is False: 

112 continue 

113 for field in dec.info.fields: 

114 if field not in fields: 

115 raise PydanticUserError( 

116 f'Validators defined with incorrect fields: {dec.cls_ref}.{dec.cls_var_name}' 

117 " (use check_fields=False if you're inheriting from the model and intended this)", 

118 code='decorator-missing-field', 

119 ) 

120 

121 

122def filter_field_decorator_info_by_field( 

123 validator_functions: Iterable[Decorator[FieldDecoratorInfoType]], field: str 

124) -> list[Decorator[FieldDecoratorInfoType]]: 

125 return [dec for dec in validator_functions if check_validator_fields_against_field_name(dec.info, field)] 

126 

127 

128def apply_each_item_validators( 

129 schema: core_schema.CoreSchema, each_item_validators: list[Decorator[ValidatorDecoratorInfo]] 

130) -> core_schema.CoreSchema: 

131 # TODO: remove this V1 compatibility shim once it's deprecated 

132 # push down any `each_item=True` validators 

133 # note that this won't work for any Annotated types that get wrapped by a function validator 

134 # but that's okay because that didn't exist in V1 

135 if schema['type'] == 'nullable': 

136 schema['schema'] = apply_each_item_validators(schema['schema'], each_item_validators) 

137 return schema 

138 elif is_list_like_schema_with_items_schema(schema): 

139 inner_schema = schema.get('items_schema', None) 

140 if inner_schema is None: 

141 inner_schema = core_schema.any_schema() 

142 schema['items_schema'] = apply_validators(inner_schema, each_item_validators) 

143 elif schema['type'] == 'dict': 

144 # push down any `each_item=True` validators onto dict _values_ 

145 # this is super arbitrary but it's the V1 behavior 

146 inner_schema = schema.get('values_schema', None) 

147 if inner_schema is None: 

148 inner_schema = core_schema.any_schema() 

149 schema['values_schema'] = apply_validators(inner_schema, each_item_validators) 

150 elif each_item_validators: 

151 raise TypeError( 

152 f"`@validator(..., each_item=True)` cannot be applied to fields with a schema of {schema['type']}" 

153 ) 

154 return schema 

155 

156 

157def modify_model_json_schema( 

158 schema_or_field: CoreSchemaOrField, handler: GetJsonSchemaHandler, *, cls: Any 

159) -> JsonSchemaValue: 

160 """Add title and description for model-like classes' JSON schema""" 

161 wrapped_handler = UnpackedRefJsonSchemaHandler(handler) 

162 

163 json_schema = handler(schema_or_field) 

164 original_schema = wrapped_handler.resolve_ref_schema(json_schema) 

165 if 'title' not in original_schema: 

166 original_schema['title'] = cls.__name__ 

167 docstring = cls.__doc__ 

168 if docstring and 'description' not in original_schema: 

169 original_schema['description'] = docstring 

170 return json_schema 

171 

172 

173class GenerateSchema: 

174 __slots__ = '_config_wrapper_stack', 'types_namespace', 'typevars_map', 'recursion_cache', 'definitions' 

175 

176 def __init__( 

177 self, 

178 config_wrapper: ConfigWrapper, 

179 types_namespace: dict[str, Any] | None, 

180 typevars_map: dict[Any, Any] | None = None, 

181 ): 

182 # we need a stack for recursing into child models 

183 self._config_wrapper_stack: list[ConfigWrapper] = [config_wrapper] 

184 self.types_namespace = types_namespace 

185 self.typevars_map = typevars_map 

186 

187 self.recursion_cache: dict[str, core_schema.DefinitionReferenceSchema] = {} 

188 self.definitions: dict[str, core_schema.CoreSchema] = {} 

189 

190 @property 

191 def config_wrapper(self) -> ConfigWrapper: 

192 return self._config_wrapper_stack[-1] 

193 

194 @property 

195 def arbitrary_types(self) -> bool: 

196 return self.config_wrapper.arbitrary_types_allowed 

197 

198 def generate_schema(self, obj: Any, from_dunder_get_core_schema: bool = True) -> core_schema.CoreSchema: 

199 schema: CoreSchema | None = None 

200 if from_dunder_get_core_schema: 

201 from_property = self._generate_schema_from_property(obj, obj) 

202 if from_property is not None: 

203 schema = from_property 

204 if schema is None: 

205 schema = self._generate_schema(obj) 

206 

207 schema = remove_unnecessary_invalid_definitions(schema) 

208 

209 metadata_js_function = _extract_get_pydantic_json_schema(obj) 

210 if metadata_js_function is None: 

211 # Need to do this to handle custom generics: 

212 if hasattr(obj, '__origin__'): 

213 metadata_js_function = _extract_get_pydantic_json_schema(obj.__origin__) 

214 if metadata_js_function is not None: 

215 metadata = CoreMetadataHandler(schema).metadata 

216 # wrap the schema so that we unpack ref schemas and always call metadata_js_function with the full schema 

217 if schema['type'] != 'definition-ref': 

218 # we would fail to unpack recursive ref schemas! 

219 metadata_js_function = wrap_json_schema_fn_for_model_or_custom_type_with_ref_unpacking( 

220 metadata_js_function 

221 ) 

222 metadata['pydantic_js_functions'] = metadata.get('pydantic_js_functions', []) 

223 metadata['pydantic_js_functions'].append(metadata_js_function) 

224 

225 if 'ref' in schema: 

226 # definitions and definition-ref schemas don't have 'ref', causing the type error ignored on the next line 

227 schema_ref = schema['ref'] # type: ignore[typeddict-item] 

228 self.definitions[schema_ref] = schema 

229 

230 return schema 

231 

232 def model_schema(self, cls: type[BaseModel]) -> core_schema.CoreSchema: 

233 """ 

234 Generate schema for a pydantic model. 

235 

236 Since models generate schemas for themselves this method is public and can be called 

237 from within BaseModel's metaclass. 

238 """ 

239 model_ref, schema = self._get_or_cache_recursive_ref(cls) 

240 if schema is not None: 

241 return schema 

242 

243 from ..main import BaseModel 

244 

245 fields = cls.model_fields 

246 decorators = cls.__pydantic_decorators__ 

247 check_decorator_fields_exist( 

248 chain( 

249 decorators.field_validator.values(), 

250 decorators.field_serializer.values(), 

251 decorators.validator.values(), 

252 ), 

253 fields.keys(), 

254 ) 

255 # TODO: we need to do something similar to this for pydantic dataclasses 

256 # This should be straight forward once we expose the pydantic config on the dataclass; 

257 # I have done this in my PR for dataclasses JSON schema 

258 config_wrapper = ConfigWrapper(cls.model_config, check=False) 

259 self._config_wrapper_stack.append(config_wrapper) 

260 try: 

261 fields_schema: core_schema.CoreSchema = core_schema.typed_dict_schema( 

262 {k: self._generate_td_field_schema(k, v, decorators) for k, v in fields.items()}, 

263 computed_fields=generate_computed_field(decorators.computed_fields), 

264 return_fields_set=True, 

265 ) 

266 finally: 

267 self._config_wrapper_stack.pop() 

268 inner_schema = apply_validators(fields_schema, decorators.root_validator.values()) 

269 

270 inner_schema = define_expected_missing_refs(inner_schema, recursively_defined_type_refs()) 

271 

272 core_config = config_wrapper.core_config() 

273 model_post_init = None if cls.model_post_init is BaseModel.model_post_init else 'model_post_init' 

274 

275 metadata = build_metadata_dict(js_functions=[partial(modify_model_json_schema, cls=cls)]) 

276 

277 model_schema = core_schema.model_schema( 

278 cls, 

279 inner_schema, 

280 ref=model_ref, 

281 config=core_config, 

282 post_init=model_post_init, 

283 metadata=metadata, 

284 ) 

285 model_schema = consolidate_refs(model_schema) 

286 schema = apply_model_serializers(model_schema, decorators.model_serializer.values()) 

287 return apply_model_validators(schema, decorators.model_validator.values()) 

288 

289 def _generate_schema_from_property(self, obj: Any, source: Any) -> core_schema.CoreSchema | None: 

290 """ 

291 Try to generate schema from either the `__get_pydantic_core_schema__` function or 

292 `__pydantic_core_schema__` property. 

293 

294 Note: `__get_pydantic_core_schema__` takes priority so it can 

295 decide whether to use a `__pydantic_core_schema__` attribute, or generate a fresh schema. 

296 """ 

297 get_schema = getattr(obj, '__get_pydantic_core_schema__', None) 

298 if get_schema is not None: 

299 # (source) -> CoreSchema 

300 if len(inspect.signature(get_schema).parameters) == 1: 

301 return get_schema(source) 

302 

303 # Can return None to tell pydantic not to override 

304 return get_schema(source, partial(self.generate_schema, from_dunder_get_core_schema=False)) 

305 

306 if _typing_extra.is_dataclass(obj): 

307 # For dataclasses, only use the __pydantic_core_schema__ if it is defined on this exact class, not a parent 

308 schema_property = obj.__dict__.get('__pydantic_core_schema__') 

309 else: 

310 schema_property = getattr(obj, '__pydantic_core_schema__', None) 

311 

312 if schema_property is not None: 

313 return schema_property 

314 

315 return None 

316 

317 def _generate_schema(self, obj: Any) -> core_schema.CoreSchema: # noqa: C901 

318 """ 

319 Recursively generate a pydantic-core schema for any supported python type. 

320 """ 

321 if isinstance(obj, dict): 

322 # we assume this is already a valid schema 

323 return obj # type: ignore[return-value] 

324 

325 if isinstance(obj, str): 

326 obj = ForwardRef(obj) 

327 

328 if isinstance(obj, ForwardRef): 

329 # we assume that types_namespace has the target of forward references in its scope, 

330 # but this could fail, for example, if calling Validator on an imported type which contains 

331 # forward references to other types only defined in the module from which it was imported 

332 # `Validator(SomeImportedTypeAliasWithAForwardReference)` 

333 # or the equivalent for BaseModel 

334 # class Model(BaseModel): 

335 # x: SomeImportedTypeAliasWithAForwardReference 

336 try: 

337 obj = _typing_extra.evaluate_fwd_ref(obj, globalns=self.types_namespace) 

338 except NameError as e: 

339 raise PydanticUndefinedAnnotation.from_name_error(e) from e 

340 

341 # if obj is still a ForwardRef, it means we can't evaluate it, raise PydanticUndefinedAnnotation 

342 if isinstance(obj, ForwardRef): 

343 raise PydanticUndefinedAnnotation(obj.__forward_arg__, f'Unable to evaluate forward reference {obj}') 

344 

345 if self.typevars_map: 

346 obj = replace_types(obj, self.typevars_map) 

347 

348 from ..main import BaseModel 

349 

350 if lenient_issubclass(obj, BaseModel): 

351 return self.model_schema(obj) 

352 

353 if isinstance(obj, PydanticRecursiveRef): 

354 return core_schema.definition_reference_schema(schema_ref=obj.type_ref) 

355 

356 if isinstance(obj, PydanticForwardRef): 

357 if not obj.deferred_actions: 

358 return obj.schema 

359 resolved_model = obj.resolve_model() 

360 if isinstance(resolved_model, PydanticForwardRef): 

361 # If you still have a PydanticForwardRef after resolving, it should be deeply nested enough that it will 

362 # eventually be substituted out. So it is safe to return an invalid schema here. 

363 # TODO: Replace this with a (new) CoreSchema that, if present at any level, makes validation fail 

364 return core_schema.none_schema( 

365 metadata={'invalid': True, 'pydantic_debug_self_schema': resolved_model.schema} 

366 ) 

367 else: 

368 model_ref = get_type_ref(resolved_model) 

369 return core_schema.definition_reference_schema(model_ref) 

370 

371 try: 

372 if obj in {bool, int, float, str, bytes, list, set, frozenset, dict}: 

373 # Note: obj may fail to be hashable if it has an unhashable annotation 

374 return {'type': obj.__name__} 

375 elif obj is tuple: 

376 return {'type': 'tuple-variable'} 

377 except TypeError: # obj not hashable; can happen due to unhashable annotations 

378 pass 

379 

380 if obj is Any or obj is object: 

381 return core_schema.AnySchema(type='any') 

382 elif obj is None or obj is _typing_extra.NoneType: 

383 return core_schema.NoneSchema(type='none') 

384 elif obj == type: 

385 return self._type_schema() 

386 elif _typing_extra.is_callable_type(obj): 

387 return core_schema.CallableSchema(type='callable') 

388 elif _typing_extra.is_literal_type(obj): 

389 return self._literal_schema(obj) 

390 elif is_typeddict(obj): 

391 return self._typed_dict_schema(obj, None) 

392 elif _typing_extra.is_namedtuple(obj): 

393 return self._namedtuple_schema(obj) 

394 elif _typing_extra.is_new_type(obj): 

395 # NewType, can't use isinstance because it fails <3.7 

396 return self.generate_schema(obj.__supertype__) 

397 elif obj == re.Pattern: 

398 return self._pattern_schema(obj) 

399 elif obj is collections.abc.Hashable or obj is typing.Hashable: 

400 return self._hashable_schema() 

401 elif isinstance(obj, type): 

402 if obj is dict: 

403 return self._dict_schema(obj) 

404 if issubclass(obj, dict): 

405 # TODO: We would need to handle generic subclasses of certain typing dict subclasses here 

406 # This includes subclasses of typing.Counter, typing.DefaultDict, and typing.OrderedDict 

407 # Note also that we may do a better job of handling typing.DefaultDict by inspecting its arguments. 

408 return self._dict_subclass_schema(obj) 

409 # probably need to take care of other subclasses here 

410 elif isinstance(obj, typing.TypeVar): 

411 return self._unsubstituted_typevar_schema(obj) 

412 elif is_finalvar(obj): 

413 if obj is Final: 

414 return core_schema.AnySchema(type='any') 

415 return self.generate_schema(get_args(obj)[0]) 

416 elif isinstance(obj, (FunctionType, LambdaType, MethodType, partial)): 

417 return self._callable_schema(obj) 

418 

419 # TODO: _std_types_schema iterates over the __mro__ looking for an expected schema. 

420 # This will catch subclasses of typing.Deque, preventing us from properly supporting user-defined 

421 # generic subclasses. (In principle this would also catch typing.OrderedDict, but that is currently 

422 # already getting caught in the `issubclass(obj, dict):` check above. 

423 std_schema = self._std_types_schema(obj) 

424 if std_schema is not None: 

425 return std_schema 

426 

427 if _typing_extra.is_dataclass(obj): 

428 return self._dataclass_schema(obj, None) 

429 

430 origin = get_origin(obj) 

431 if origin is None: 

432 if self.arbitrary_types: 

433 return core_schema.is_instance_schema(obj) 

434 else: 

435 raise PydanticSchemaGenerationError( 

436 f'Unable to generate pydantic-core schema for {obj!r}. ' 

437 f'Setting `arbitrary_types_allowed=True` in the model_config may prevent this error.' 

438 ) 

439 

440 # Need to handle generic dataclasses before looking for the schema properties because attribute accesses 

441 # on _GenericAlias delegate to the origin type, so lose the information about the concrete parametrization 

442 # As a result, currently, there is no way to cache the schema for generic dataclasses. This may be possible 

443 # to resolve by modifying the value returned by `Generic.__class_getitem__`, but that is a dangerous game. 

444 if _typing_extra.is_dataclass(origin): 

445 return self._dataclass_schema(obj, origin) 

446 

447 from_property = self._generate_schema_from_property(origin, obj) 

448 if from_property is not None: 

449 return from_property 

450 

451 if _typing_extra.origin_is_union(origin): 

452 return self._union_schema(obj) 

453 elif issubclass(origin, Annotated): # type: ignore[arg-type] 

454 return self._annotated_schema(obj) 

455 elif issubclass(origin, typing.List): 

456 return self._generic_collection_schema(list, obj, origin) 

457 elif issubclass(origin, typing.Set): 

458 return self._generic_collection_schema(set, obj, origin) 

459 elif issubclass(origin, typing.FrozenSet): 

460 return self._generic_collection_schema(frozenset, obj, origin) 

461 elif issubclass(origin, typing.Tuple): # type: ignore[arg-type] 

462 # TODO: To support generic subclasses of typing.Tuple, we need to better-introspect the args to origin 

463 return self._tuple_schema(obj) 

464 elif issubclass(origin, typing.Counter): 

465 # Subclasses of typing.Counter may be handled as subclasses of dict; see note above 

466 return self._counter_schema(obj) 

467 elif origin in (typing.Dict, dict): 

468 return self._dict_schema(obj) 

469 elif is_typeddict(origin): 

470 return self._typed_dict_schema(obj, origin) 

471 elif issubclass(origin, typing.Dict): 

472 # Subclasses of typing.Dict may be handled as subclasses of dict; see note above 

473 return self._dict_subclass_schema(obj) 

474 elif issubclass(origin, typing.Mapping): 

475 # Because typing.Mapping does not have a specified `__init__` signature, we don't validate into subclasses 

476 return self._mapping_schema(obj) 

477 elif issubclass(origin, typing.Type): # type: ignore[arg-type] 

478 return self._subclass_schema(obj) 

479 elif issubclass(origin, typing.Deque): 

480 from ._std_types_schema import deque_schema 

481 

482 return deque_schema(self, obj) 

483 elif issubclass(origin, typing.OrderedDict): 

484 # Subclasses of typing.OrderedDict may be handled as subclasses of dict; see note above 

485 from ._std_types_schema import ordered_dict_schema 

486 

487 return ordered_dict_schema(self, obj) 

488 elif issubclass(origin, typing.Sequence): 

489 # Because typing.Sequence does not have a specified `__init__` signature, we don't validate into subclasses 

490 return self._sequence_schema(obj) 

491 elif issubclass(origin, typing.MutableSet): 

492 raise PydanticSchemaGenerationError('Unable to generate pydantic-core schema MutableSet TODO.') 

493 elif issubclass(origin, (typing.Iterable, collections.abc.Iterable)): 

494 # Because typing.Iterable does not have a specified `__init__` signature, we don't validate into subclasses 

495 return self._iterable_schema(obj) 

496 elif issubclass(origin, (re.Pattern, typing.Pattern)): 

497 return self._pattern_schema(obj) 

498 else: 

499 if self.arbitrary_types and isinstance(origin, type): 

500 return core_schema.is_instance_schema(origin) 

501 else: 

502 raise PydanticSchemaGenerationError( 

503 f'Unable to generate pydantic-core schema for {obj!r} (origin={origin!r}). ' 

504 f'Setting `arbitrary_types_allowed=True` in the model_config may prevent this error.' 

505 ) 

506 

507 def _generate_td_field_schema( 

508 self, 

509 name: str, 

510 field_info: FieldInfo, 

511 decorators: DecoratorInfos, 

512 *, 

513 required: bool = True, 

514 ) -> core_schema.TypedDictField: 

515 """ 

516 Prepare a TypedDictField to represent a model or typeddict field. 

517 """ 

518 common_field = self._common_field_schema(name, field_info, decorators) 

519 return core_schema.typed_dict_field( 

520 common_field['schema'], 

521 required=False if not field_info.is_required() else required, 

522 serialization_exclude=common_field['serialization_exclude'], 

523 validation_alias=common_field['validation_alias'], 

524 serialization_alias=common_field['serialization_alias'], 

525 frozen=common_field['frozen'], 

526 metadata=common_field['metadata'], 

527 ) 

528 

529 def _generate_dc_field_schema( 

530 self, 

531 name: str, 

532 field_info: FieldInfo, 

533 decorators: DecoratorInfos, 

534 ) -> core_schema.DataclassField: 

535 """ 

536 Prepare a DataclassField to represent the parameter/field, of a dataclass 

537 """ 

538 common_field = self._common_field_schema(name, field_info, decorators) 

539 return core_schema.dataclass_field( 

540 name, 

541 common_field['schema'], 

542 init_only=field_info.init_var or None, 

543 kw_only=None if field_info.kw_only else False, 

544 serialization_exclude=common_field['serialization_exclude'], 

545 validation_alias=common_field['validation_alias'], 

546 serialization_alias=common_field['serialization_alias'], 

547 frozen=common_field['frozen'], 

548 metadata=common_field['metadata'], 

549 ) 

550 

551 def _common_field_schema(self, name: str, field_info: FieldInfo, decorators: DecoratorInfos) -> _CommonField: 

552 assert field_info.annotation is not None, 'field_info.annotation should not be None when generating a schema' 

553 

554 def generate_schema(source: Any) -> CoreSchema: 

555 schema = self.generate_schema(source) 

556 if field_info.discriminator is not None: 

557 schema = _discriminated_union.apply_discriminator(schema, field_info.discriminator, self.definitions) 

558 return schema 

559 

560 schema = apply_annotations(generate_schema, field_info.metadata, self.definitions)(field_info.annotation) 

561 

562 # TODO: remove this V1 compatibility shim once it's deprecated 

563 # push down any `each_item=True` validators 

564 # note that this won't work for any Annotated types that get wrapped by a function validator 

565 # but that's okay because that didn't exist in V1 

566 this_field_validators = filter_field_decorator_info_by_field(decorators.validator.values(), name) 

567 if _validators_require_validate_default(this_field_validators): 

568 field_info.validate_default = True 

569 each_item_validators = [v for v in this_field_validators if v.info.each_item is True] 

570 this_field_validators = [v for v in this_field_validators if v not in each_item_validators] 

571 schema = apply_each_item_validators(schema, each_item_validators) 

572 

573 schema = apply_validators(schema, filter_field_decorator_info_by_field(this_field_validators, name)) 

574 schema = apply_validators( 

575 schema, filter_field_decorator_info_by_field(decorators.field_validator.values(), name) 

576 ) 

577 

578 # the default validator needs to go outside of any other validators 

579 # so that it is the topmost validator for the typed-dict-field validator 

580 # which uses it to check if the field has a default value or not 

581 if not field_info.is_required(): 

582 schema = wrap_default(field_info, schema) 

583 

584 schema = apply_field_serializers( 

585 schema, filter_field_decorator_info_by_field(decorators.field_serializer.values(), name) 

586 ) 

587 json_schema_updates = { 

588 'title': field_info.title, 

589 'description': field_info.description, 

590 'examples': field_info.examples, 

591 } 

592 json_schema_updates = {k: v for k, v in json_schema_updates.items() if v is not None} 

593 json_schema_updates.update(field_info.json_schema_extra or {}) 

594 

595 def json_schema_update_func(schema: CoreSchemaOrField, handler: GetJsonSchemaHandler) -> JsonSchemaValue: 

596 return {**handler(schema), **json_schema_updates} 

597 

598 metadata = build_metadata_dict(js_functions=[json_schema_update_func]) 

599 return _common_field( 

600 schema, 

601 serialization_exclude=True if field_info.exclude else None, 

602 validation_alias=field_info.validation_alias, 

603 serialization_alias=field_info.serialization_alias, 

604 frozen=field_info.frozen or field_info.final, 

605 metadata=metadata, 

606 ) 

607 

608 def _union_schema(self, union_type: Any) -> core_schema.CoreSchema: 

609 """ 

610 Generate schema for a Union. 

611 """ 

612 args = get_args(union_type) 

613 choices: list[core_schema.CoreSchema] = [] 

614 nullable = False 

615 for arg in args: 

616 if arg is None or arg is _typing_extra.NoneType: 

617 nullable = True 

618 else: 

619 choices.append(self.generate_schema(arg)) 

620 

621 if len(choices) == 1: 

622 s = choices[0] 

623 else: 

624 s = core_schema.union_schema(choices) 

625 

626 if nullable: 

627 s = core_schema.nullable_schema(s) 

628 return s 

629 

630 def _annotated_schema(self, annotated_type: Any) -> core_schema.CoreSchema: 

631 """ 

632 Generate schema for an Annotated type, e.g. `Annotated[int, Field(...)]` or `Annotated[int, Gt(0)]`. 

633 """ 

634 first_arg, *other_args = get_args(annotated_type) 

635 return apply_annotations(self.generate_schema, other_args, self.definitions)(first_arg) 

636 

637 def _literal_schema(self, literal_type: Any) -> core_schema.LiteralSchema: 

638 """ 

639 Generate schema for a Literal. 

640 """ 

641 expected = _typing_extra.all_literal_values(literal_type) 

642 assert expected, f'literal "expected" cannot be empty, obj={literal_type}' 

643 return core_schema.literal_schema(expected) 

644 

645 def _typed_dict_schema( 

646 self, typed_dict_cls: Any, origin: Any 

647 ) -> core_schema.TypedDictSchema | core_schema.DefinitionReferenceSchema: 

648 """ 

649 Generate schema for a TypedDict. 

650 

651 It is not possible to track required/optional keys in TypedDict without __required_keys__ 

652 since TypedDict.__new__ erases the base classes (it replaces them with just `dict`) 

653 and thus we can track usage of total=True/False 

654 __required_keys__ was added in Python 3.9 

655 (https://github.com/miss-islington/cpython/blob/1e9939657dd1f8eb9f596f77c1084d2d351172fc/Doc/library/typing.rst?plain=1#L1546-L1548) 

656 however it is buggy 

657 (https://github.com/python/typing_extensions/blob/ac52ac5f2cb0e00e7988bae1e2a1b8257ac88d6d/src/typing_extensions.py#L657-L666). 

658 Hence to avoid creating validators that do not do what users expect we only 

659 support typing.TypedDict on Python >= 3.11 or typing_extension.TypedDict on all versions 

660 """ 

661 typed_dict_ref, schema = self._get_or_cache_recursive_ref(typed_dict_cls) 

662 if schema is not None: 

663 return schema 

664 

665 typevars_map = get_standard_typevars_map(typed_dict_cls) 

666 if origin is not None: 

667 typed_dict_cls = origin 

668 

669 if not _SUPPORTS_TYPEDDICT and type(typed_dict_cls).__module__ == 'typing': 

670 raise PydanticUserError( 

671 'Please use `typing_extensions.TypedDict` instead of `typing.TypedDict` on Python < 3.11.', 

672 code='typed-dict-version', 

673 ) 

674 

675 required_keys: frozenset[str] = typed_dict_cls.__required_keys__ 

676 

677 fields: dict[str, core_schema.TypedDictField] = {} 

678 

679 for field_name, annotation in get_type_hints_infer_globalns( 

680 typed_dict_cls, localns=self.types_namespace, include_extras=True 

681 ).items(): 

682 annotation = replace_types(annotation, typevars_map) 

683 required = field_name in required_keys 

684 

685 if get_origin(annotation) == _typing_extra.Required: 

686 required = True 

687 annotation = get_args(annotation)[0] 

688 elif get_origin(annotation) == _typing_extra.NotRequired: 

689 required = False 

690 annotation = get_args(annotation)[0] 

691 

692 field_info = FieldInfo.from_annotation(annotation) 

693 fields[field_name] = self._generate_td_field_schema( 

694 field_name, field_info, DecoratorInfos(), required=required 

695 ) 

696 

697 metadata = build_metadata_dict(js_functions=[partial(modify_model_json_schema, cls=typed_dict_cls)]) 

698 

699 return core_schema.typed_dict_schema( 

700 fields, 

701 extra_behavior='forbid', 

702 ref=typed_dict_ref, 

703 metadata=metadata, 

704 ) 

705 

706 def _namedtuple_schema(self, namedtuple_cls: Any) -> core_schema.CallSchema: 

707 """ 

708 Generate schema for a NamedTuple. 

709 """ 

710 annotations: dict[str, Any] = get_type_hints_infer_globalns( 

711 namedtuple_cls, include_extras=True, localns=self.types_namespace 

712 ) 

713 if not annotations: 

714 # annotations is empty, happens if namedtuple_cls defined via collections.namedtuple(...) 

715 annotations = {k: Any for k in namedtuple_cls._fields} 

716 

717 arguments_schema = core_schema.ArgumentsSchema( 

718 type='arguments', 

719 arguments_schema=[ 

720 self._generate_parameter_schema(field_name, annotation) 

721 for field_name, annotation in annotations.items() 

722 ], 

723 metadata=build_metadata_dict(js_prefer_positional_arguments=True), 

724 ) 

725 return core_schema.call_schema(arguments_schema, namedtuple_cls) 

726 

727 def _generate_parameter_schema( 

728 self, 

729 name: str, 

730 annotation: type[Any], 

731 default: Any = Parameter.empty, 

732 mode: Literal['positional_only', 'positional_or_keyword', 'keyword_only'] | None = None, 

733 ) -> core_schema.ArgumentsParameter: 

734 """ 

735 Prepare a ArgumentsParameter to represent a field in a namedtuple or function signature. 

736 """ 

737 if default is Parameter.empty: 

738 field = FieldInfo.from_annotation(annotation) 

739 else: 

740 field = FieldInfo.from_annotated_attribute(annotation, default) 

741 assert field.annotation is not None, 'field.annotation should not be None when generating a schema' 

742 schema = apply_annotations(self.generate_schema, field.metadata, self.definitions)(annotation) 

743 

744 if not field.is_required(): 

745 schema = wrap_default(field, schema) 

746 

747 parameter_schema = core_schema.arguments_parameter(name, schema) 

748 if mode is not None: 

749 parameter_schema['mode'] = mode 

750 if field.alias is not None: 

751 parameter_schema['alias'] = field.alias 

752 else: 

753 alias_generator = self.config_wrapper.alias_generator 

754 if alias_generator: 

755 parameter_schema['alias'] = alias_generator(name) 

756 return parameter_schema 

757 

758 def _generic_collection_schema( 

759 self, parent_type: type[Any], type_: type[Any], origin: type[Any] 

760 ) -> core_schema.CoreSchema: 

761 """ 

762 Generate schema for List, Set, and FrozenSet, possibly parameterized. 

763 

764 :param parent_type: Either `list`, `set` or `frozenset` - the builtin type 

765 :param type_: The type of the collection, e.g. `List[int]` or `List`, or a subclass of one of them 

766 :param origin: The origin type 

767 """ 

768 schema: core_schema.CoreSchema = { # type: ignore[misc,assignment] 

769 'type': parent_type.__name__.lower(), 

770 'items_schema': self.generate_schema(get_first_arg(type_)), 

771 } 

772 

773 if origin == parent_type: 

774 return schema 

775 else: 

776 # Ensure the validated value is converted back to the specific subclass type 

777 # NOTE: we might have better performance by using a tuple or list validator for the schema here, 

778 # but if you care about performance, you can define your own schema. 

779 # We should optimize for compatibility, not performance in this case 

780 return core_schema.general_after_validator_function( 

781 lambda __input_value, __info: type_(__input_value), schema 

782 ) 

783 

784 def _tuple_schema(self, tuple_type: Any) -> core_schema.CoreSchema: 

785 """ 

786 Generate schema for a Tuple, e.g. `tuple[int, str]` or `tuple[int, ...]`. 

787 """ 

788 params = get_args(tuple_type) 

789 # NOTE: subtle difference: `tuple[()]` gives `params=()`, whereas `typing.Tuple[()]` gives `params=((),)` 

790 if not params: 

791 if tuple_type == typing.Tuple: 

792 return core_schema.tuple_variable_schema() 

793 else: 

794 # special case for `tuple[()]` which means `tuple[]` - an empty tuple 

795 return core_schema.tuple_positional_schema([]) 

796 elif params[-1] is Ellipsis: 

797 if len(params) == 2: 

798 sv = core_schema.tuple_variable_schema(self.generate_schema(params[0])) 

799 return sv 

800 

801 # not sure this case is valid in python, but may as well support it here since pydantic-core does 

802 *items_schema, extra_schema = params 

803 return core_schema.tuple_positional_schema( 

804 [self.generate_schema(p) for p in items_schema], extra_schema=self.generate_schema(extra_schema) 

805 ) 

806 elif len(params) == 1 and params[0] == (): 

807 # special case for `Tuple[()]` which means `Tuple[]` - an empty tuple 

808 return core_schema.tuple_positional_schema([]) 

809 else: 

810 return core_schema.tuple_positional_schema([self.generate_schema(p) for p in params]) 

811 

812 def _dict_schema(self, dict_type: Any) -> core_schema.DictSchema: 

813 """ 

814 Generate schema for a Dict, e.g. `dict[str, int]`. 

815 """ 

816 try: 

817 arg0, arg1 = get_args(dict_type) 

818 except ValueError: 

819 return core_schema.dict_schema() 

820 else: 

821 return core_schema.dict_schema( 

822 keys_schema=self.generate_schema(arg0), 

823 values_schema=self.generate_schema(arg1), 

824 ) 

825 

826 def _dict_subclass_schema(self, dict_subclass: Any) -> core_schema.CoreSchema: 

827 """ 

828 Generate schema for a subclass of dict or Dict 

829 """ 

830 try: 

831 arg0, arg1 = get_args(dict_subclass) 

832 except ValueError: 

833 arg0, arg1 = Any, Any 

834 

835 from ._validators import mapping_validator 

836 

837 # TODO could do `core_schema.chain_schema(core_schema.is_instance_schema(dict_subclass), ...` in strict mode 

838 return core_schema.general_wrap_validator_function( 

839 mapping_validator, 

840 core_schema.dict_schema( 

841 keys_schema=self.generate_schema(arg0), 

842 values_schema=self.generate_schema(arg1), 

843 ), 

844 ) 

845 

846 def _counter_schema(self, counter_type: Any) -> core_schema.CoreSchema: 

847 """ 

848 Generate schema for `typing.Counter` 

849 """ 

850 arg = get_first_arg(counter_type) 

851 

852 from ._validators import construct_counter 

853 

854 # TODO could do `core_schema.chain_schema(core_schema.is_instance_schema(Counter), ...` in strict mode 

855 return core_schema.general_after_validator_function( 

856 construct_counter, 

857 core_schema.dict_schema( 

858 keys_schema=self.generate_schema(arg), 

859 values_schema=core_schema.int_schema(), 

860 ), 

861 ) 

862 

863 def _mapping_schema(self, mapping_type: Any) -> core_schema.CoreSchema: 

864 """ 

865 Generate schema for a Dict, e.g. `dict[str, int]`. 

866 """ 

867 try: 

868 arg0, arg1 = get_args(mapping_type) 

869 except ValueError: 

870 return core_schema.is_instance_schema(typing.Mapping, cls_repr='Mapping') 

871 else: 

872 from ._validators import mapping_validator 

873 

874 return core_schema.general_wrap_validator_function( 

875 mapping_validator, 

876 core_schema.dict_schema( 

877 keys_schema=self.generate_schema(arg0), 

878 values_schema=self.generate_schema(arg1), 

879 ), 

880 ) 

881 

882 def _type_schema(self) -> core_schema.CoreSchema: 

883 return core_schema.custom_error_schema( 

884 core_schema.is_instance_schema(type), 

885 custom_error_type='is_type', 

886 custom_error_message='Input should be a type', 

887 ) 

888 

889 def _subclass_schema(self, type_: Any) -> core_schema.CoreSchema: 

890 """ 

891 Generate schema for a Type, e.g. `Type[int]`. 

892 """ 

893 type_param = get_first_arg(type_) 

894 if type_param == Any: 

895 return self._type_schema() 

896 elif isinstance(type_param, typing.TypeVar): 

897 if type_param.__bound__: 

898 return core_schema.is_subclass_schema(type_param.__bound__) 

899 elif type_param.__constraints__: 

900 return core_schema.union_schema( 

901 [self.generate_schema(typing.Type[c]) for c in type_param.__constraints__] 

902 ) 

903 else: 

904 return self._type_schema() 

905 else: 

906 return core_schema.is_subclass_schema(type_param) 

907 

908 def _sequence_schema(self, sequence_type: Any) -> core_schema.CoreSchema: 

909 """ 

910 Generate schema for a Sequence, e.g. `Sequence[int]`. 

911 """ 

912 item_type = get_first_arg(sequence_type) 

913 

914 if item_type == Any: 

915 return core_schema.is_instance_schema(typing.Sequence, cls_repr='Sequence') 

916 else: 

917 from ._validators import sequence_validator 

918 

919 return core_schema.chain_schema( 

920 [ 

921 core_schema.is_instance_schema(typing.Sequence, cls_repr='Sequence'), 

922 core_schema.general_wrap_validator_function( 

923 sequence_validator, 

924 core_schema.list_schema(self.generate_schema(item_type), allow_any_iter=True), 

925 ), 

926 ] 

927 ) 

928 

929 def _iterable_schema(self, type_: Any) -> core_schema.GeneratorSchema: 

930 """ 

931 Generate a schema for an `Iterable`. 

932 

933 TODO replace with pydantic-core's generator validator. 

934 """ 

935 item_type = get_first_arg(type_) 

936 

937 return core_schema.generator_schema(self.generate_schema(item_type)) 

938 

939 def _pattern_schema(self, pattern_type: Any) -> core_schema.CoreSchema: 

940 from . import _serializers, _validators 

941 

942 metadata = build_metadata_dict(js_functions=[lambda _1, _2: {'type': 'string', 'format': 'regex'}]) 

943 ser = core_schema.plain_serializer_function_ser_schema( 

944 _serializers.pattern_serializer, info_arg=True, json_return_type='str' 

945 ) 

946 if pattern_type == typing.Pattern or pattern_type == re.Pattern: 

947 # bare type 

948 return core_schema.general_plain_validator_function( 

949 _validators.pattern_either_validator, serialization=ser, metadata=metadata 

950 ) 

951 

952 param = get_args(pattern_type)[0] 

953 if param == str: 

954 return core_schema.general_plain_validator_function( 

955 _validators.pattern_str_validator, serialization=ser, metadata=metadata 

956 ) 

957 elif param == bytes: 

958 return core_schema.general_plain_validator_function( 

959 _validators.pattern_bytes_validator, serialization=ser, metadata=metadata 

960 ) 

961 else: 

962 raise PydanticSchemaGenerationError(f'Unable to generate pydantic-core schema for {pattern_type!r}.') 

963 

964 def _hashable_schema(self) -> core_schema.CoreSchema: 

965 return core_schema.custom_error_schema( 

966 core_schema.is_instance_schema(collections.abc.Hashable), 

967 custom_error_type='is_hashable', 

968 custom_error_message='Input should be hashable', 

969 ) 

970 

971 def _std_types_schema(self, obj: Any) -> core_schema.CoreSchema | None: 

972 """ 

973 Generate schema for types in the standard library. 

974 """ 

975 if not isinstance(obj, type): 

976 return None 

977 

978 # Import here to avoid the extra import time earlier since _std_validators imports lots of things globally 

979 from ._std_types_schema import SCHEMA_LOOKUP 

980 

981 # instead of iterating over a list and calling is_instance, this should be somewhat faster, 

982 # especially as it should catch most types on the first iteration 

983 # (same as we do/used to do in json encoding) 

984 for base in obj.__mro__[:-1]: 

985 try: 

986 encoder = SCHEMA_LOOKUP[base] 

987 except KeyError: 

988 continue 

989 return encoder(self, obj) 

990 return None 

991 

992 def _dataclass_schema( 

993 self, dataclass: type[StandardDataclass], origin: type[StandardDataclass] | None 

994 ) -> core_schema.CoreSchema: 

995 """ 

996 Generate schema for a dataclass. 

997 """ 

998 dataclass_ref, schema = self._get_or_cache_recursive_ref(dataclass) 

999 if schema is not None: 

1000 return schema 

1001 

1002 typevars_map = get_standard_typevars_map(dataclass) 

1003 if origin is not None: 

1004 dataclass = origin 

1005 

1006 from ._dataclasses import is_pydantic_dataclass 

1007 

1008 if is_pydantic_dataclass(dataclass): 

1009 fields = dataclass.__pydantic_fields__ 

1010 if typevars_map: 

1011 for field in fields.values(): 

1012 field.apply_typevars_map(typevars_map, self.types_namespace) 

1013 else: 

1014 fields = collect_dataclass_fields( 

1015 dataclass, 

1016 self.types_namespace, 

1017 typevars_map=typevars_map, 

1018 ) 

1019 decorators = getattr(dataclass, '__pydantic_decorators__', None) or DecoratorInfos() 

1020 args = [self._generate_dc_field_schema(k, v, decorators) for k, v in fields.items()] 

1021 has_post_init = hasattr(dataclass, '__post_init__') 

1022 args_schema = core_schema.dataclass_args_schema( 

1023 dataclass.__name__, 

1024 args, 

1025 computed_fields=generate_computed_field(decorators.computed_fields), 

1026 collect_init_only=has_post_init, 

1027 ) 

1028 inner_schema = apply_validators(args_schema, decorators.root_validator.values()) 

1029 dc_schema = core_schema.dataclass_schema(dataclass, inner_schema, post_init=has_post_init, ref=dataclass_ref) 

1030 schema = apply_model_serializers(dc_schema, decorators.model_serializer.values()) 

1031 return apply_model_validators(schema, decorators.model_validator.values()) 

1032 

1033 def _callable_schema(self, function: Callable[..., Any]) -> core_schema.CallSchema: 

1034 """ 

1035 Generate schema for a Callable. 

1036 

1037 TODO support functional validators once we support them in Config 

1038 """ 

1039 sig = signature(function) 

1040 

1041 type_hints = _typing_extra.get_function_type_hints(function) 

1042 

1043 mode_lookup: dict[_ParameterKind, Literal['positional_only', 'positional_or_keyword', 'keyword_only']] = { 

1044 Parameter.POSITIONAL_ONLY: 'positional_only', 

1045 Parameter.POSITIONAL_OR_KEYWORD: 'positional_or_keyword', 

1046 Parameter.KEYWORD_ONLY: 'keyword_only', 

1047 } 

1048 

1049 arguments_list: list[core_schema.ArgumentsParameter] = [] 

1050 var_args_schema: core_schema.CoreSchema | None = None 

1051 var_kwargs_schema: core_schema.CoreSchema | None = None 

1052 

1053 for name, p in sig.parameters.items(): 

1054 if p.annotation is sig.empty: 

1055 annotation = Any 

1056 else: 

1057 annotation = type_hints[name] 

1058 

1059 parameter_mode = mode_lookup.get(p.kind) 

1060 if parameter_mode is not None: 

1061 arg_schema = self._generate_parameter_schema(name, annotation, p.default, parameter_mode) 

1062 arguments_list.append(arg_schema) 

1063 elif p.kind == Parameter.VAR_POSITIONAL: 

1064 var_args_schema = self.generate_schema(annotation) 

1065 else: 

1066 assert p.kind == Parameter.VAR_KEYWORD, p.kind 

1067 var_kwargs_schema = self.generate_schema(annotation) 

1068 

1069 return_schema: core_schema.CoreSchema | None = None 

1070 config_wrapper = self.config_wrapper 

1071 if config_wrapper.validate_return: 

1072 return_hint = type_hints.get('return') 

1073 if return_hint is not None: 

1074 return_schema = self.generate_schema(return_hint) 

1075 

1076 return core_schema.call_schema( 

1077 core_schema.arguments_schema( 

1078 arguments_list, 

1079 var_args_schema=var_args_schema, 

1080 var_kwargs_schema=var_kwargs_schema, 

1081 populate_by_name=config_wrapper.populate_by_name, 

1082 ), 

1083 function, 

1084 return_schema=return_schema, 

1085 ) 

1086 

1087 def _unsubstituted_typevar_schema(self, typevar: typing.TypeVar) -> core_schema.CoreSchema: 

1088 assert isinstance(typevar, typing.TypeVar) 

1089 

1090 if typevar.__bound__: 

1091 return self.generate_schema(typevar.__bound__) 

1092 elif typevar.__constraints__: 

1093 return self._union_schema(typing.Union[typevar.__constraints__]) # type: ignore 

1094 else: 

1095 return core_schema.AnySchema(type='any') 

1096 

1097 def _get_or_cache_recursive_ref(self, cls: type[Any]) -> tuple[str, core_schema.DefinitionReferenceSchema | None]: 

1098 obj_ref = get_type_ref(cls) 

1099 if obj_ref in self.recursion_cache: 

1100 return obj_ref, self.recursion_cache[obj_ref] 

1101 else: 

1102 self.recursion_cache[obj_ref] = core_schema.definition_reference_schema(obj_ref) 

1103 return obj_ref, None 

1104 

1105 

1106_VALIDATOR_F_MATCH: Mapping[ 

1107 tuple[FieldValidatorModes, Literal['no-info', 'general', 'field']], 

1108 Callable[[Callable[..., Any], core_schema.CoreSchema], core_schema.CoreSchema], 

1109] = { 

1110 ('before', 'no-info'): core_schema.no_info_before_validator_function, 

1111 ('after', 'no-info'): core_schema.no_info_after_validator_function, 

1112 ('plain', 'no-info'): lambda f, _: core_schema.no_info_plain_validator_function(f), 

1113 ('wrap', 'no-info'): core_schema.no_info_wrap_validator_function, 

1114 ('before', 'general'): core_schema.general_before_validator_function, 

1115 ('after', 'general'): core_schema.general_after_validator_function, 

1116 ('plain', 'general'): lambda f, _: core_schema.general_plain_validator_function(f), 

1117 ('wrap', 'general'): core_schema.general_wrap_validator_function, 

1118 ('before', 'field'): core_schema.field_before_validator_function, 

1119 ('after', 'field'): core_schema.field_after_validator_function, 

1120 ('plain', 'field'): lambda f, _: core_schema.field_plain_validator_function(f), 

1121 ('wrap', 'field'): core_schema.field_wrap_validator_function, 

1122} 

1123 

1124 

1125def apply_validators( 

1126 schema: core_schema.CoreSchema, 

1127 validators: Iterable[Decorator[RootValidatorDecoratorInfo]] 

1128 | Iterable[Decorator[ValidatorDecoratorInfo]] 

1129 | Iterable[Decorator[FieldValidatorDecoratorInfo]], 

1130) -> core_schema.CoreSchema: 

1131 """ 

1132 Apply validators to a schema. 

1133 """ 

1134 for validator in validators: 

1135 info_arg = inspect_validator(validator.func, validator.info.mode) 

1136 if not info_arg: 

1137 val_type: Literal['no-info', 'general', 'field'] = 'no-info' 

1138 elif isinstance(validator.info, (FieldValidatorDecoratorInfo, ValidatorDecoratorInfo)): 

1139 val_type = 'field' 

1140 else: 

1141 val_type = 'general' 

1142 schema = _VALIDATOR_F_MATCH[(validator.info.mode, val_type)](validator.func, schema) 

1143 return schema 

1144 

1145 

1146def _validators_require_validate_default(validators: Iterable[Decorator[ValidatorDecoratorInfo]]) -> bool: 

1147 """ 

1148 In v1, if any of the validators for a field had `always=True`, the default value would be validated. 

1149 

1150 This serves as an auxiliary function for re-implementing that logic, by looping over a provided 

1151 collection of (v1-style) ValidatorDecoratorInfo's and checking if any of them have `always=True`. 

1152 

1153 We should be able to drop this function and the associated logic calling it once we drop support 

1154 for v1-style validator decorators. (Or we can extend it and keep it if we add something equivalent 

1155 to the v1-validator `always` kwarg to `field_validator`.) 

1156 """ 

1157 for validator in validators: 

1158 if validator.info.always: 

1159 return True 

1160 return False 

1161 

1162 

1163def apply_field_serializers( 

1164 schema: core_schema.CoreSchema, serializers: list[Decorator[FieldSerializerDecoratorInfo]] 

1165) -> core_schema.CoreSchema: 

1166 """ 

1167 Apply field serializers to a schema. 

1168 """ 

1169 if serializers: 

1170 # use the last serializer to make it easy to override a serializer set on a parent model 

1171 serializer = serializers[-1] 

1172 is_field_serializer, info_arg = inspect_field_serializer(serializer.func, serializer.info.mode) 

1173 if serializer.info.mode == 'wrap': 

1174 schema['serialization'] = core_schema.wrap_serializer_function_ser_schema( 

1175 serializer.func, 

1176 is_field_serializer=is_field_serializer, 

1177 info_arg=info_arg, 

1178 json_return_type=serializer.info.json_return_type, 

1179 when_used=serializer.info.when_used, 

1180 ) 

1181 else: 

1182 assert serializer.info.mode == 'plain' 

1183 schema['serialization'] = core_schema.plain_serializer_function_ser_schema( 

1184 serializer.func, 

1185 is_field_serializer=is_field_serializer, 

1186 info_arg=info_arg, 

1187 json_return_type=serializer.info.json_return_type, 

1188 when_used=serializer.info.when_used, 

1189 ) 

1190 return schema 

1191 

1192 

1193def apply_model_serializers( 

1194 schema: core_schema.CoreSchema, serializers: Iterable[Decorator[ModelSerializerDecoratorInfo]] 

1195) -> core_schema.CoreSchema: 

1196 """ 

1197 Apply model serializers to a schema. 

1198 """ 

1199 if serializers: 

1200 serializer = list(serializers)[-1] 

1201 info_arg = inspect_model_serializer(serializer.func, serializer.info.mode) 

1202 if serializer.info.mode == 'wrap': 

1203 ser_schema: core_schema.SerSchema = core_schema.wrap_serializer_function_ser_schema( 

1204 serializer.func, 

1205 info_arg=info_arg, 

1206 json_return_type=serializer.info.json_return_type, 

1207 ) 

1208 else: 

1209 # plain 

1210 ser_schema = core_schema.plain_serializer_function_ser_schema( 

1211 serializer.func, 

1212 info_arg=info_arg, 

1213 json_return_type=serializer.info.json_return_type, 

1214 ) 

1215 schema['serialization'] = ser_schema 

1216 return schema 

1217 

1218 

1219def apply_model_validators( 

1220 schema: core_schema.CoreSchema, validators: Iterable[Decorator[ModelValidatorDecoratorInfo]] 

1221) -> core_schema.CoreSchema: 

1222 """ 

1223 Apply model validators to a schema. 

1224 """ 

1225 for validator in validators: 

1226 info_arg = inspect_validator(validator.func, validator.info.mode) 

1227 if validator.info.mode == 'wrap': 

1228 if info_arg: 

1229 schema = core_schema.general_wrap_validator_function(function=validator.func, schema=schema) 

1230 else: 

1231 schema = core_schema.no_info_wrap_validator_function(function=validator.func, schema=schema) 

1232 elif validator.info.mode == 'before': 

1233 if info_arg: 

1234 schema = core_schema.general_before_validator_function(function=validator.func, schema=schema) 

1235 else: 

1236 schema = core_schema.no_info_before_validator_function(function=validator.func, schema=schema) 

1237 else: 

1238 assert validator.info.mode == 'after' 

1239 if info_arg: 

1240 schema = core_schema.general_after_validator_function(function=validator.func, schema=schema) 

1241 else: 

1242 schema = core_schema.no_info_after_validator_function(function=validator.func, schema=schema) 

1243 return schema 

1244 

1245 

1246def apply_annotations( 

1247 get_inner_schema: ModifyCoreSchemaWrapHandler, 

1248 annotations: typing.Iterable[Any], 

1249 definitions: dict[str, core_schema.CoreSchema], 

1250) -> ModifyCoreSchemaWrapHandler: 

1251 """ 

1252 Apply arguments from `Annotated` or from `FieldInfo` to a schema. 

1253 """ 

1254 for annotation in annotations: 

1255 if annotation is None: 

1256 continue 

1257 get_inner_schema = get_wrapped_inner_schema(get_inner_schema, annotation, definitions) 

1258 

1259 return get_inner_schema 

1260 

1261 

1262def get_wrapped_inner_schema( 

1263 get_inner_schema: ModifyCoreSchemaWrapHandler, annotation: Any, definitions: dict[str, core_schema.CoreSchema] 

1264) -> Callable[[Any], core_schema.CoreSchema]: 

1265 metadata_get_schema: GetCoreSchemaFunction = getattr(annotation, '__get_pydantic_core_schema__', None) or ( 

1266 lambda source, handler: handler(source) 

1267 ) 

1268 

1269 def _new_inner_schema_handler(source: Any) -> core_schema.CoreSchema: 

1270 schema = metadata_get_schema(source, get_inner_schema) 

1271 schema = apply_single_annotation(schema, annotation, definitions) 

1272 

1273 metadata_js_function = _extract_get_pydantic_json_schema(annotation) 

1274 if metadata_js_function is not None: 

1275 metadata = CoreMetadataHandler(schema).metadata 

1276 metadata['pydantic_js_functions'] = metadata.get('pydantic_js_functions', []) 

1277 metadata['pydantic_js_functions'].append(metadata_js_function) 

1278 return schema 

1279 

1280 return _new_inner_schema_handler 

1281 

1282 

1283def apply_single_annotation( 

1284 schema: core_schema.CoreSchema, metadata: Any, definitions: dict[str, core_schema.CoreSchema] 

1285) -> core_schema.CoreSchema: 

1286 if isinstance(metadata, GroupedMetadata): 

1287 for a in metadata: 

1288 schema = apply_single_annotation(schema, a, definitions) 

1289 elif isinstance(metadata, FieldInfo): 

1290 for field_metadata in metadata.metadata: 

1291 schema = apply_single_annotation(schema, field_metadata, definitions) 

1292 if metadata.discriminator is not None: 

1293 schema = _discriminated_union.apply_discriminator(schema, metadata.discriminator, definitions) 

1294 # TODO setting a default here needs to be tested 

1295 return wrap_default(metadata, schema) 

1296 

1297 if isinstance(metadata, PydanticGeneralMetadata): 

1298 metadata_dict = metadata.__dict__ 

1299 elif isinstance(metadata, (BaseMetadata, PydanticMetadata)): 

1300 metadata_dict = dataclasses.asdict(metadata) # type: ignore[call-overload] 

1301 elif isinstance(metadata, type) and issubclass(metadata, PydanticMetadata): 

1302 # also support PydanticMetadata classes being used without initialisation, 

1303 # e.g. `Annotated[int, Strict]` as well as `Annotated[int, Strict()]` 

1304 metadata_dict = {k: v for k, v in vars(metadata).items() if not k.startswith('_')} 

1305 else: 

1306 # PEP 593: "If a library (or tool) encounters a typehint Annotated[T, x] and has no 

1307 # special logic for metadata x, it should ignore it and simply treat the type as T." 

1308 # Allow, but ignore, any unknown metadata. 

1309 return schema 

1310 

1311 # TODO we need a way to remove metadata which this line currently prevents 

1312 metadata_dict = {k: v for k, v in metadata_dict.items() if v is not None} 

1313 if not metadata_dict: 

1314 return schema 

1315 

1316 handler = CoreMetadataHandler(schema) 

1317 update_schema_function = handler.metadata.get('pydantic_cs_update_function') 

1318 if update_schema_function is not None: 

1319 new_schema = update_schema_function(schema, **metadata_dict) 

1320 if new_schema is not None: 

1321 schema = new_schema 

1322 else: 

1323 if schema['type'] == 'nullable': 

1324 # for nullable schemas, metadata is automatically applied to the inner schema 

1325 # TODO need to do the same for lists, tuples and more 

1326 schema['schema'].update(metadata_dict) 

1327 else: 

1328 schema.update(metadata_dict) # type: ignore[typeddict-item] 

1329 try: 

1330 SchemaValidator(schema) 

1331 except SchemaError as e: 

1332 # TODO: Generate an easier-to-understand ValueError here saying the field constraints are not enforced 

1333 # The relevant test is: `tests.test_schema.test_unenforced_constraints_schema 

1334 raise e 

1335 return schema 

1336 

1337 

1338def wrap_default(field_info: FieldInfo, schema: core_schema.CoreSchema) -> core_schema.CoreSchema: 

1339 if field_info.default_factory: 

1340 return core_schema.with_default_schema( 

1341 schema, default_factory=field_info.default_factory, validate_default=field_info.validate_default 

1342 ) 

1343 elif field_info.default is not Undefined: 

1344 return core_schema.with_default_schema( 

1345 schema, default=field_info.default, validate_default=field_info.validate_default 

1346 ) 

1347 else: 

1348 return schema 

1349 

1350 

1351def get_first_arg(type_: Any) -> Any: 

1352 """ 

1353 Get the first argument from a typing object, e.g. `List[int]` -> `int`, or `Any` if no argument. 

1354 """ 

1355 try: 

1356 return get_args(type_)[0] 

1357 except IndexError: 

1358 return Any 

1359 

1360 

1361def _extract_get_pydantic_json_schema(tp: Any) -> GetJsonSchemaFunction | None: 

1362 """Extract `__get_pydantic_json_schema__` from a type, handling the deprecated `__modify_schema__`""" 

1363 js_modify_function = getattr(tp, '__get_pydantic_json_schema__', None) 

1364 

1365 if js_modify_function is None and hasattr(tp, '__modify_schema__'): 

1366 warnings.warn( 

1367 'The __modify_schema__ method is deprecated, use __get_pydantic_json_schema__ instead', 

1368 DeprecationWarning, 

1369 ) 

1370 return lambda c, h: tp.__modify_schema__(h(c)) 

1371 

1372 return js_modify_function 

1373 

1374 

1375class _CommonField(TypedDict): 

1376 schema: core_schema.CoreSchema 

1377 validation_alias: str | list[str | int] | list[list[str | int]] | None 

1378 serialization_alias: str | None 

1379 serialization_exclude: bool | None 

1380 frozen: bool | None 

1381 metadata: Any 

1382 

1383 

1384def _common_field( 

1385 schema: core_schema.CoreSchema, 

1386 *, 

1387 validation_alias: str | list[str | int] | list[list[str | int]] | None = None, 

1388 serialization_alias: str | None = None, 

1389 serialization_exclude: bool | None = None, 

1390 frozen: bool | None = None, 

1391 metadata: Any = None, 

1392) -> _CommonField: 

1393 return { 

1394 'schema': schema, 

1395 'validation_alias': validation_alias, 

1396 'serialization_alias': serialization_alias, 

1397 'serialization_exclude': serialization_exclude, 

1398 'frozen': frozen, 

1399 'metadata': metadata, 

1400 } 

1401 

1402 

1403def generate_computed_field(d: dict[str, Decorator[ComputedFieldInfo]]) -> list[core_schema.ComputedField] | None: 

1404 r = [ 

1405 core_schema.computed_field( 

1406 d.cls_var_name, 

1407 json_return_type=d.info.json_return_type, 

1408 alias=d.info.alias, 

1409 ) 

1410 for d in d.values() 

1411 ] 

1412 return r