Coverage for /pythoncovmergedfiles/medio/medio/src/pydantic/pydantic/_internal/_generate_schema.py: 36%
663 statements
« prev ^ index » next coverage.py v7.2.3, created at 2023-04-27 07:38 +0000
« prev ^ index » next coverage.py v7.2.3, created at 2023-04-27 07:38 +0000
1"""
2Convert python types to pydantic-core schema.
3"""
4from __future__ import annotations as _annotations
6import collections.abc
7import dataclasses
8import inspect
9import re
10import sys
11import typing
12import warnings
13from functools import partial
14from inspect import Parameter, _ParameterKind, signature
15from itertools import chain
16from types import FunctionType, LambdaType, MethodType
17from typing import TYPE_CHECKING, Any, Callable, ForwardRef, Iterable, Mapping, TypeVar, Union
19from annotated_types import BaseMetadata, GroupedMetadata
20from pydantic_core import CoreSchema, SchemaError, SchemaValidator, core_schema
21from typing_extensions import Annotated, Final, Literal, TypedDict, get_args, get_origin, is_typeddict
23from ..errors import PydanticSchemaGenerationError, PydanticUndefinedAnnotation, PydanticUserError
24from ..fields import FieldInfo
25from . import _discriminated_union, _typing_extra
26from ._config import ConfigWrapper
27from ._core_metadata import (
28 CoreMetadataHandler,
29 build_metadata_dict,
30)
31from ._core_utils import (
32 consolidate_refs,
33 define_expected_missing_refs,
34 get_type_ref,
35 is_list_like_schema_with_items_schema,
36 remove_unnecessary_invalid_definitions,
37)
38from ._decorators import (
39 ComputedFieldInfo,
40 Decorator,
41 DecoratorInfos,
42 FieldSerializerDecoratorInfo,
43 FieldValidatorDecoratorInfo,
44 ModelSerializerDecoratorInfo,
45 ModelValidatorDecoratorInfo,
46 RootValidatorDecoratorInfo,
47 ValidatorDecoratorInfo,
48 inspect_field_serializer,
49 inspect_model_serializer,
50 inspect_validator,
51)
52from ._fields import (
53 PydanticGeneralMetadata,
54 PydanticMetadata,
55 Undefined,
56 collect_dataclass_fields,
57 get_type_hints_infer_globalns,
58)
59from ._forward_ref import PydanticForwardRef, PydanticRecursiveRef
60from ._generics import get_standard_typevars_map, recursively_defined_type_refs, replace_types
61from ._json_schema_shared import (
62 CoreSchemaOrField,
63 GetJsonSchemaFunction,
64 GetJsonSchemaHandler,
65 JsonSchemaValue,
66 UnpackedRefJsonSchemaHandler,
67 wrap_json_schema_fn_for_model_or_custom_type_with_ref_unpacking,
68)
69from ._typing_extra import is_finalvar
70from ._utils import lenient_issubclass
72if TYPE_CHECKING:
73 from ..decorators import FieldValidatorModes
74 from ..main import BaseModel
75 from ._dataclasses import StandardDataclass
77_SUPPORTS_TYPEDDICT = sys.version_info >= (3, 11)
79FieldDecoratorInfo = Union[ValidatorDecoratorInfo, FieldValidatorDecoratorInfo, FieldSerializerDecoratorInfo]
80FieldDecoratorInfoType = TypeVar('FieldDecoratorInfoType', bound=FieldDecoratorInfo)
81AnyFieldDecorator = Union[
82 Decorator[ValidatorDecoratorInfo],
83 Decorator[FieldValidatorDecoratorInfo],
84 Decorator[FieldSerializerDecoratorInfo],
85]
87ModifyCoreSchemaWrapHandler = Callable[[Any], core_schema.CoreSchema]
88GetCoreSchemaFunction = Callable[[Any, ModifyCoreSchemaWrapHandler], core_schema.CoreSchema]
91def check_validator_fields_against_field_name(
92 info: FieldDecoratorInfo,
93 field: str,
94) -> bool:
95 if isinstance(info, ValidatorDecoratorInfo):
96 # V1 compat: accept `'*'` as a wildcard that matches all fields
97 if info.fields == ('*',):
98 return True
99 for v_field_name in info.fields:
100 if v_field_name == field:
101 return True
102 return False
105def check_decorator_fields_exist(decorators: Iterable[AnyFieldDecorator], fields: Iterable[str]) -> None:
106 fields = set(fields)
107 for dec in decorators:
108 if isinstance(dec.info, ValidatorDecoratorInfo) and dec.info.fields == ('*',):
109 # V1 compat: accept `'*'` as a wildcard that matches all fields
110 continue
111 if dec.info.check_fields is False:
112 continue
113 for field in dec.info.fields:
114 if field not in fields:
115 raise PydanticUserError(
116 f'Validators defined with incorrect fields: {dec.cls_ref}.{dec.cls_var_name}'
117 " (use check_fields=False if you're inheriting from the model and intended this)",
118 code='decorator-missing-field',
119 )
122def filter_field_decorator_info_by_field(
123 validator_functions: Iterable[Decorator[FieldDecoratorInfoType]], field: str
124) -> list[Decorator[FieldDecoratorInfoType]]:
125 return [dec for dec in validator_functions if check_validator_fields_against_field_name(dec.info, field)]
128def apply_each_item_validators(
129 schema: core_schema.CoreSchema, each_item_validators: list[Decorator[ValidatorDecoratorInfo]]
130) -> core_schema.CoreSchema:
131 # TODO: remove this V1 compatibility shim once it's deprecated
132 # push down any `each_item=True` validators
133 # note that this won't work for any Annotated types that get wrapped by a function validator
134 # but that's okay because that didn't exist in V1
135 if schema['type'] == 'nullable':
136 schema['schema'] = apply_each_item_validators(schema['schema'], each_item_validators)
137 return schema
138 elif is_list_like_schema_with_items_schema(schema):
139 inner_schema = schema.get('items_schema', None)
140 if inner_schema is None:
141 inner_schema = core_schema.any_schema()
142 schema['items_schema'] = apply_validators(inner_schema, each_item_validators)
143 elif schema['type'] == 'dict':
144 # push down any `each_item=True` validators onto dict _values_
145 # this is super arbitrary but it's the V1 behavior
146 inner_schema = schema.get('values_schema', None)
147 if inner_schema is None:
148 inner_schema = core_schema.any_schema()
149 schema['values_schema'] = apply_validators(inner_schema, each_item_validators)
150 elif each_item_validators:
151 raise TypeError(
152 f"`@validator(..., each_item=True)` cannot be applied to fields with a schema of {schema['type']}"
153 )
154 return schema
157def modify_model_json_schema(
158 schema_or_field: CoreSchemaOrField, handler: GetJsonSchemaHandler, *, cls: Any
159) -> JsonSchemaValue:
160 """Add title and description for model-like classes' JSON schema"""
161 wrapped_handler = UnpackedRefJsonSchemaHandler(handler)
163 json_schema = handler(schema_or_field)
164 original_schema = wrapped_handler.resolve_ref_schema(json_schema)
165 if 'title' not in original_schema:
166 original_schema['title'] = cls.__name__
167 docstring = cls.__doc__
168 if docstring and 'description' not in original_schema:
169 original_schema['description'] = docstring
170 return json_schema
173class GenerateSchema:
174 __slots__ = '_config_wrapper_stack', 'types_namespace', 'typevars_map', 'recursion_cache', 'definitions'
176 def __init__(
177 self,
178 config_wrapper: ConfigWrapper,
179 types_namespace: dict[str, Any] | None,
180 typevars_map: dict[Any, Any] | None = None,
181 ):
182 # we need a stack for recursing into child models
183 self._config_wrapper_stack: list[ConfigWrapper] = [config_wrapper]
184 self.types_namespace = types_namespace
185 self.typevars_map = typevars_map
187 self.recursion_cache: dict[str, core_schema.DefinitionReferenceSchema] = {}
188 self.definitions: dict[str, core_schema.CoreSchema] = {}
190 @property
191 def config_wrapper(self) -> ConfigWrapper:
192 return self._config_wrapper_stack[-1]
194 @property
195 def arbitrary_types(self) -> bool:
196 return self.config_wrapper.arbitrary_types_allowed
198 def generate_schema(self, obj: Any, from_dunder_get_core_schema: bool = True) -> core_schema.CoreSchema:
199 schema: CoreSchema | None = None
200 if from_dunder_get_core_schema:
201 from_property = self._generate_schema_from_property(obj, obj)
202 if from_property is not None:
203 schema = from_property
204 if schema is None:
205 schema = self._generate_schema(obj)
207 schema = remove_unnecessary_invalid_definitions(schema)
209 metadata_js_function = _extract_get_pydantic_json_schema(obj)
210 if metadata_js_function is None:
211 # Need to do this to handle custom generics:
212 if hasattr(obj, '__origin__'):
213 metadata_js_function = _extract_get_pydantic_json_schema(obj.__origin__)
214 if metadata_js_function is not None:
215 metadata = CoreMetadataHandler(schema).metadata
216 # wrap the schema so that we unpack ref schemas and always call metadata_js_function with the full schema
217 if schema['type'] != 'definition-ref':
218 # we would fail to unpack recursive ref schemas!
219 metadata_js_function = wrap_json_schema_fn_for_model_or_custom_type_with_ref_unpacking(
220 metadata_js_function
221 )
222 metadata['pydantic_js_functions'] = metadata.get('pydantic_js_functions', [])
223 metadata['pydantic_js_functions'].append(metadata_js_function)
225 if 'ref' in schema:
226 # definitions and definition-ref schemas don't have 'ref', causing the type error ignored on the next line
227 schema_ref = schema['ref'] # type: ignore[typeddict-item]
228 self.definitions[schema_ref] = schema
230 return schema
232 def model_schema(self, cls: type[BaseModel]) -> core_schema.CoreSchema:
233 """
234 Generate schema for a pydantic model.
236 Since models generate schemas for themselves this method is public and can be called
237 from within BaseModel's metaclass.
238 """
239 model_ref, schema = self._get_or_cache_recursive_ref(cls)
240 if schema is not None:
241 return schema
243 from ..main import BaseModel
245 fields = cls.model_fields
246 decorators = cls.__pydantic_decorators__
247 check_decorator_fields_exist(
248 chain(
249 decorators.field_validator.values(),
250 decorators.field_serializer.values(),
251 decorators.validator.values(),
252 ),
253 fields.keys(),
254 )
255 # TODO: we need to do something similar to this for pydantic dataclasses
256 # This should be straight forward once we expose the pydantic config on the dataclass;
257 # I have done this in my PR for dataclasses JSON schema
258 config_wrapper = ConfigWrapper(cls.model_config, check=False)
259 self._config_wrapper_stack.append(config_wrapper)
260 try:
261 fields_schema: core_schema.CoreSchema = core_schema.typed_dict_schema(
262 {k: self._generate_td_field_schema(k, v, decorators) for k, v in fields.items()},
263 computed_fields=generate_computed_field(decorators.computed_fields),
264 return_fields_set=True,
265 )
266 finally:
267 self._config_wrapper_stack.pop()
268 inner_schema = apply_validators(fields_schema, decorators.root_validator.values())
270 inner_schema = define_expected_missing_refs(inner_schema, recursively_defined_type_refs())
272 core_config = config_wrapper.core_config()
273 model_post_init = None if cls.model_post_init is BaseModel.model_post_init else 'model_post_init'
275 metadata = build_metadata_dict(js_functions=[partial(modify_model_json_schema, cls=cls)])
277 model_schema = core_schema.model_schema(
278 cls,
279 inner_schema,
280 ref=model_ref,
281 config=core_config,
282 post_init=model_post_init,
283 metadata=metadata,
284 )
285 model_schema = consolidate_refs(model_schema)
286 schema = apply_model_serializers(model_schema, decorators.model_serializer.values())
287 return apply_model_validators(schema, decorators.model_validator.values())
289 def _generate_schema_from_property(self, obj: Any, source: Any) -> core_schema.CoreSchema | None:
290 """
291 Try to generate schema from either the `__get_pydantic_core_schema__` function or
292 `__pydantic_core_schema__` property.
294 Note: `__get_pydantic_core_schema__` takes priority so it can
295 decide whether to use a `__pydantic_core_schema__` attribute, or generate a fresh schema.
296 """
297 get_schema = getattr(obj, '__get_pydantic_core_schema__', None)
298 if get_schema is not None:
299 # (source) -> CoreSchema
300 if len(inspect.signature(get_schema).parameters) == 1:
301 return get_schema(source)
303 # Can return None to tell pydantic not to override
304 return get_schema(source, partial(self.generate_schema, from_dunder_get_core_schema=False))
306 if _typing_extra.is_dataclass(obj):
307 # For dataclasses, only use the __pydantic_core_schema__ if it is defined on this exact class, not a parent
308 schema_property = obj.__dict__.get('__pydantic_core_schema__')
309 else:
310 schema_property = getattr(obj, '__pydantic_core_schema__', None)
312 if schema_property is not None:
313 return schema_property
315 return None
317 def _generate_schema(self, obj: Any) -> core_schema.CoreSchema: # noqa: C901
318 """
319 Recursively generate a pydantic-core schema for any supported python type.
320 """
321 if isinstance(obj, dict):
322 # we assume this is already a valid schema
323 return obj # type: ignore[return-value]
325 if isinstance(obj, str):
326 obj = ForwardRef(obj)
328 if isinstance(obj, ForwardRef):
329 # we assume that types_namespace has the target of forward references in its scope,
330 # but this could fail, for example, if calling Validator on an imported type which contains
331 # forward references to other types only defined in the module from which it was imported
332 # `Validator(SomeImportedTypeAliasWithAForwardReference)`
333 # or the equivalent for BaseModel
334 # class Model(BaseModel):
335 # x: SomeImportedTypeAliasWithAForwardReference
336 try:
337 obj = _typing_extra.evaluate_fwd_ref(obj, globalns=self.types_namespace)
338 except NameError as e:
339 raise PydanticUndefinedAnnotation.from_name_error(e) from e
341 # if obj is still a ForwardRef, it means we can't evaluate it, raise PydanticUndefinedAnnotation
342 if isinstance(obj, ForwardRef):
343 raise PydanticUndefinedAnnotation(obj.__forward_arg__, f'Unable to evaluate forward reference {obj}')
345 if self.typevars_map:
346 obj = replace_types(obj, self.typevars_map)
348 from ..main import BaseModel
350 if lenient_issubclass(obj, BaseModel):
351 return self.model_schema(obj)
353 if isinstance(obj, PydanticRecursiveRef):
354 return core_schema.definition_reference_schema(schema_ref=obj.type_ref)
356 if isinstance(obj, PydanticForwardRef):
357 if not obj.deferred_actions:
358 return obj.schema
359 resolved_model = obj.resolve_model()
360 if isinstance(resolved_model, PydanticForwardRef):
361 # If you still have a PydanticForwardRef after resolving, it should be deeply nested enough that it will
362 # eventually be substituted out. So it is safe to return an invalid schema here.
363 # TODO: Replace this with a (new) CoreSchema that, if present at any level, makes validation fail
364 return core_schema.none_schema(
365 metadata={'invalid': True, 'pydantic_debug_self_schema': resolved_model.schema}
366 )
367 else:
368 model_ref = get_type_ref(resolved_model)
369 return core_schema.definition_reference_schema(model_ref)
371 try:
372 if obj in {bool, int, float, str, bytes, list, set, frozenset, dict}:
373 # Note: obj may fail to be hashable if it has an unhashable annotation
374 return {'type': obj.__name__}
375 elif obj is tuple:
376 return {'type': 'tuple-variable'}
377 except TypeError: # obj not hashable; can happen due to unhashable annotations
378 pass
380 if obj is Any or obj is object:
381 return core_schema.AnySchema(type='any')
382 elif obj is None or obj is _typing_extra.NoneType:
383 return core_schema.NoneSchema(type='none')
384 elif obj == type:
385 return self._type_schema()
386 elif _typing_extra.is_callable_type(obj):
387 return core_schema.CallableSchema(type='callable')
388 elif _typing_extra.is_literal_type(obj):
389 return self._literal_schema(obj)
390 elif is_typeddict(obj):
391 return self._typed_dict_schema(obj, None)
392 elif _typing_extra.is_namedtuple(obj):
393 return self._namedtuple_schema(obj)
394 elif _typing_extra.is_new_type(obj):
395 # NewType, can't use isinstance because it fails <3.7
396 return self.generate_schema(obj.__supertype__)
397 elif obj == re.Pattern:
398 return self._pattern_schema(obj)
399 elif obj is collections.abc.Hashable or obj is typing.Hashable:
400 return self._hashable_schema()
401 elif isinstance(obj, type):
402 if obj is dict:
403 return self._dict_schema(obj)
404 if issubclass(obj, dict):
405 # TODO: We would need to handle generic subclasses of certain typing dict subclasses here
406 # This includes subclasses of typing.Counter, typing.DefaultDict, and typing.OrderedDict
407 # Note also that we may do a better job of handling typing.DefaultDict by inspecting its arguments.
408 return self._dict_subclass_schema(obj)
409 # probably need to take care of other subclasses here
410 elif isinstance(obj, typing.TypeVar):
411 return self._unsubstituted_typevar_schema(obj)
412 elif is_finalvar(obj):
413 if obj is Final:
414 return core_schema.AnySchema(type='any')
415 return self.generate_schema(get_args(obj)[0])
416 elif isinstance(obj, (FunctionType, LambdaType, MethodType, partial)):
417 return self._callable_schema(obj)
419 # TODO: _std_types_schema iterates over the __mro__ looking for an expected schema.
420 # This will catch subclasses of typing.Deque, preventing us from properly supporting user-defined
421 # generic subclasses. (In principle this would also catch typing.OrderedDict, but that is currently
422 # already getting caught in the `issubclass(obj, dict):` check above.
423 std_schema = self._std_types_schema(obj)
424 if std_schema is not None:
425 return std_schema
427 if _typing_extra.is_dataclass(obj):
428 return self._dataclass_schema(obj, None)
430 origin = get_origin(obj)
431 if origin is None:
432 if self.arbitrary_types:
433 return core_schema.is_instance_schema(obj)
434 else:
435 raise PydanticSchemaGenerationError(
436 f'Unable to generate pydantic-core schema for {obj!r}. '
437 f'Setting `arbitrary_types_allowed=True` in the model_config may prevent this error.'
438 )
440 # Need to handle generic dataclasses before looking for the schema properties because attribute accesses
441 # on _GenericAlias delegate to the origin type, so lose the information about the concrete parametrization
442 # As a result, currently, there is no way to cache the schema for generic dataclasses. This may be possible
443 # to resolve by modifying the value returned by `Generic.__class_getitem__`, but that is a dangerous game.
444 if _typing_extra.is_dataclass(origin):
445 return self._dataclass_schema(obj, origin)
447 from_property = self._generate_schema_from_property(origin, obj)
448 if from_property is not None:
449 return from_property
451 if _typing_extra.origin_is_union(origin):
452 return self._union_schema(obj)
453 elif issubclass(origin, Annotated): # type: ignore[arg-type]
454 return self._annotated_schema(obj)
455 elif issubclass(origin, typing.List):
456 return self._generic_collection_schema(list, obj, origin)
457 elif issubclass(origin, typing.Set):
458 return self._generic_collection_schema(set, obj, origin)
459 elif issubclass(origin, typing.FrozenSet):
460 return self._generic_collection_schema(frozenset, obj, origin)
461 elif issubclass(origin, typing.Tuple): # type: ignore[arg-type]
462 # TODO: To support generic subclasses of typing.Tuple, we need to better-introspect the args to origin
463 return self._tuple_schema(obj)
464 elif issubclass(origin, typing.Counter):
465 # Subclasses of typing.Counter may be handled as subclasses of dict; see note above
466 return self._counter_schema(obj)
467 elif origin in (typing.Dict, dict):
468 return self._dict_schema(obj)
469 elif is_typeddict(origin):
470 return self._typed_dict_schema(obj, origin)
471 elif issubclass(origin, typing.Dict):
472 # Subclasses of typing.Dict may be handled as subclasses of dict; see note above
473 return self._dict_subclass_schema(obj)
474 elif issubclass(origin, typing.Mapping):
475 # Because typing.Mapping does not have a specified `__init__` signature, we don't validate into subclasses
476 return self._mapping_schema(obj)
477 elif issubclass(origin, typing.Type): # type: ignore[arg-type]
478 return self._subclass_schema(obj)
479 elif issubclass(origin, typing.Deque):
480 from ._std_types_schema import deque_schema
482 return deque_schema(self, obj)
483 elif issubclass(origin, typing.OrderedDict):
484 # Subclasses of typing.OrderedDict may be handled as subclasses of dict; see note above
485 from ._std_types_schema import ordered_dict_schema
487 return ordered_dict_schema(self, obj)
488 elif issubclass(origin, typing.Sequence):
489 # Because typing.Sequence does not have a specified `__init__` signature, we don't validate into subclasses
490 return self._sequence_schema(obj)
491 elif issubclass(origin, typing.MutableSet):
492 raise PydanticSchemaGenerationError('Unable to generate pydantic-core schema MutableSet TODO.')
493 elif issubclass(origin, (typing.Iterable, collections.abc.Iterable)):
494 # Because typing.Iterable does not have a specified `__init__` signature, we don't validate into subclasses
495 return self._iterable_schema(obj)
496 elif issubclass(origin, (re.Pattern, typing.Pattern)):
497 return self._pattern_schema(obj)
498 else:
499 if self.arbitrary_types and isinstance(origin, type):
500 return core_schema.is_instance_schema(origin)
501 else:
502 raise PydanticSchemaGenerationError(
503 f'Unable to generate pydantic-core schema for {obj!r} (origin={origin!r}). '
504 f'Setting `arbitrary_types_allowed=True` in the model_config may prevent this error.'
505 )
507 def _generate_td_field_schema(
508 self,
509 name: str,
510 field_info: FieldInfo,
511 decorators: DecoratorInfos,
512 *,
513 required: bool = True,
514 ) -> core_schema.TypedDictField:
515 """
516 Prepare a TypedDictField to represent a model or typeddict field.
517 """
518 common_field = self._common_field_schema(name, field_info, decorators)
519 return core_schema.typed_dict_field(
520 common_field['schema'],
521 required=False if not field_info.is_required() else required,
522 serialization_exclude=common_field['serialization_exclude'],
523 validation_alias=common_field['validation_alias'],
524 serialization_alias=common_field['serialization_alias'],
525 frozen=common_field['frozen'],
526 metadata=common_field['metadata'],
527 )
529 def _generate_dc_field_schema(
530 self,
531 name: str,
532 field_info: FieldInfo,
533 decorators: DecoratorInfos,
534 ) -> core_schema.DataclassField:
535 """
536 Prepare a DataclassField to represent the parameter/field, of a dataclass
537 """
538 common_field = self._common_field_schema(name, field_info, decorators)
539 return core_schema.dataclass_field(
540 name,
541 common_field['schema'],
542 init_only=field_info.init_var or None,
543 kw_only=None if field_info.kw_only else False,
544 serialization_exclude=common_field['serialization_exclude'],
545 validation_alias=common_field['validation_alias'],
546 serialization_alias=common_field['serialization_alias'],
547 frozen=common_field['frozen'],
548 metadata=common_field['metadata'],
549 )
551 def _common_field_schema(self, name: str, field_info: FieldInfo, decorators: DecoratorInfos) -> _CommonField:
552 assert field_info.annotation is not None, 'field_info.annotation should not be None when generating a schema'
554 def generate_schema(source: Any) -> CoreSchema:
555 schema = self.generate_schema(source)
556 if field_info.discriminator is not None:
557 schema = _discriminated_union.apply_discriminator(schema, field_info.discriminator, self.definitions)
558 return schema
560 schema = apply_annotations(generate_schema, field_info.metadata, self.definitions)(field_info.annotation)
562 # TODO: remove this V1 compatibility shim once it's deprecated
563 # push down any `each_item=True` validators
564 # note that this won't work for any Annotated types that get wrapped by a function validator
565 # but that's okay because that didn't exist in V1
566 this_field_validators = filter_field_decorator_info_by_field(decorators.validator.values(), name)
567 if _validators_require_validate_default(this_field_validators):
568 field_info.validate_default = True
569 each_item_validators = [v for v in this_field_validators if v.info.each_item is True]
570 this_field_validators = [v for v in this_field_validators if v not in each_item_validators]
571 schema = apply_each_item_validators(schema, each_item_validators)
573 schema = apply_validators(schema, filter_field_decorator_info_by_field(this_field_validators, name))
574 schema = apply_validators(
575 schema, filter_field_decorator_info_by_field(decorators.field_validator.values(), name)
576 )
578 # the default validator needs to go outside of any other validators
579 # so that it is the topmost validator for the typed-dict-field validator
580 # which uses it to check if the field has a default value or not
581 if not field_info.is_required():
582 schema = wrap_default(field_info, schema)
584 schema = apply_field_serializers(
585 schema, filter_field_decorator_info_by_field(decorators.field_serializer.values(), name)
586 )
587 json_schema_updates = {
588 'title': field_info.title,
589 'description': field_info.description,
590 'examples': field_info.examples,
591 }
592 json_schema_updates = {k: v for k, v in json_schema_updates.items() if v is not None}
593 json_schema_updates.update(field_info.json_schema_extra or {})
595 def json_schema_update_func(schema: CoreSchemaOrField, handler: GetJsonSchemaHandler) -> JsonSchemaValue:
596 return {**handler(schema), **json_schema_updates}
598 metadata = build_metadata_dict(js_functions=[json_schema_update_func])
599 return _common_field(
600 schema,
601 serialization_exclude=True if field_info.exclude else None,
602 validation_alias=field_info.validation_alias,
603 serialization_alias=field_info.serialization_alias,
604 frozen=field_info.frozen or field_info.final,
605 metadata=metadata,
606 )
608 def _union_schema(self, union_type: Any) -> core_schema.CoreSchema:
609 """
610 Generate schema for a Union.
611 """
612 args = get_args(union_type)
613 choices: list[core_schema.CoreSchema] = []
614 nullable = False
615 for arg in args:
616 if arg is None or arg is _typing_extra.NoneType:
617 nullable = True
618 else:
619 choices.append(self.generate_schema(arg))
621 if len(choices) == 1:
622 s = choices[0]
623 else:
624 s = core_schema.union_schema(choices)
626 if nullable:
627 s = core_schema.nullable_schema(s)
628 return s
630 def _annotated_schema(self, annotated_type: Any) -> core_schema.CoreSchema:
631 """
632 Generate schema for an Annotated type, e.g. `Annotated[int, Field(...)]` or `Annotated[int, Gt(0)]`.
633 """
634 first_arg, *other_args = get_args(annotated_type)
635 return apply_annotations(self.generate_schema, other_args, self.definitions)(first_arg)
637 def _literal_schema(self, literal_type: Any) -> core_schema.LiteralSchema:
638 """
639 Generate schema for a Literal.
640 """
641 expected = _typing_extra.all_literal_values(literal_type)
642 assert expected, f'literal "expected" cannot be empty, obj={literal_type}'
643 return core_schema.literal_schema(expected)
645 def _typed_dict_schema(
646 self, typed_dict_cls: Any, origin: Any
647 ) -> core_schema.TypedDictSchema | core_schema.DefinitionReferenceSchema:
648 """
649 Generate schema for a TypedDict.
651 It is not possible to track required/optional keys in TypedDict without __required_keys__
652 since TypedDict.__new__ erases the base classes (it replaces them with just `dict`)
653 and thus we can track usage of total=True/False
654 __required_keys__ was added in Python 3.9
655 (https://github.com/miss-islington/cpython/blob/1e9939657dd1f8eb9f596f77c1084d2d351172fc/Doc/library/typing.rst?plain=1#L1546-L1548)
656 however it is buggy
657 (https://github.com/python/typing_extensions/blob/ac52ac5f2cb0e00e7988bae1e2a1b8257ac88d6d/src/typing_extensions.py#L657-L666).
658 Hence to avoid creating validators that do not do what users expect we only
659 support typing.TypedDict on Python >= 3.11 or typing_extension.TypedDict on all versions
660 """
661 typed_dict_ref, schema = self._get_or_cache_recursive_ref(typed_dict_cls)
662 if schema is not None:
663 return schema
665 typevars_map = get_standard_typevars_map(typed_dict_cls)
666 if origin is not None:
667 typed_dict_cls = origin
669 if not _SUPPORTS_TYPEDDICT and type(typed_dict_cls).__module__ == 'typing':
670 raise PydanticUserError(
671 'Please use `typing_extensions.TypedDict` instead of `typing.TypedDict` on Python < 3.11.',
672 code='typed-dict-version',
673 )
675 required_keys: frozenset[str] = typed_dict_cls.__required_keys__
677 fields: dict[str, core_schema.TypedDictField] = {}
679 for field_name, annotation in get_type_hints_infer_globalns(
680 typed_dict_cls, localns=self.types_namespace, include_extras=True
681 ).items():
682 annotation = replace_types(annotation, typevars_map)
683 required = field_name in required_keys
685 if get_origin(annotation) == _typing_extra.Required:
686 required = True
687 annotation = get_args(annotation)[0]
688 elif get_origin(annotation) == _typing_extra.NotRequired:
689 required = False
690 annotation = get_args(annotation)[0]
692 field_info = FieldInfo.from_annotation(annotation)
693 fields[field_name] = self._generate_td_field_schema(
694 field_name, field_info, DecoratorInfos(), required=required
695 )
697 metadata = build_metadata_dict(js_functions=[partial(modify_model_json_schema, cls=typed_dict_cls)])
699 return core_schema.typed_dict_schema(
700 fields,
701 extra_behavior='forbid',
702 ref=typed_dict_ref,
703 metadata=metadata,
704 )
706 def _namedtuple_schema(self, namedtuple_cls: Any) -> core_schema.CallSchema:
707 """
708 Generate schema for a NamedTuple.
709 """
710 annotations: dict[str, Any] = get_type_hints_infer_globalns(
711 namedtuple_cls, include_extras=True, localns=self.types_namespace
712 )
713 if not annotations:
714 # annotations is empty, happens if namedtuple_cls defined via collections.namedtuple(...)
715 annotations = {k: Any for k in namedtuple_cls._fields}
717 arguments_schema = core_schema.ArgumentsSchema(
718 type='arguments',
719 arguments_schema=[
720 self._generate_parameter_schema(field_name, annotation)
721 for field_name, annotation in annotations.items()
722 ],
723 metadata=build_metadata_dict(js_prefer_positional_arguments=True),
724 )
725 return core_schema.call_schema(arguments_schema, namedtuple_cls)
727 def _generate_parameter_schema(
728 self,
729 name: str,
730 annotation: type[Any],
731 default: Any = Parameter.empty,
732 mode: Literal['positional_only', 'positional_or_keyword', 'keyword_only'] | None = None,
733 ) -> core_schema.ArgumentsParameter:
734 """
735 Prepare a ArgumentsParameter to represent a field in a namedtuple or function signature.
736 """
737 if default is Parameter.empty:
738 field = FieldInfo.from_annotation(annotation)
739 else:
740 field = FieldInfo.from_annotated_attribute(annotation, default)
741 assert field.annotation is not None, 'field.annotation should not be None when generating a schema'
742 schema = apply_annotations(self.generate_schema, field.metadata, self.definitions)(annotation)
744 if not field.is_required():
745 schema = wrap_default(field, schema)
747 parameter_schema = core_schema.arguments_parameter(name, schema)
748 if mode is not None:
749 parameter_schema['mode'] = mode
750 if field.alias is not None:
751 parameter_schema['alias'] = field.alias
752 else:
753 alias_generator = self.config_wrapper.alias_generator
754 if alias_generator:
755 parameter_schema['alias'] = alias_generator(name)
756 return parameter_schema
758 def _generic_collection_schema(
759 self, parent_type: type[Any], type_: type[Any], origin: type[Any]
760 ) -> core_schema.CoreSchema:
761 """
762 Generate schema for List, Set, and FrozenSet, possibly parameterized.
764 :param parent_type: Either `list`, `set` or `frozenset` - the builtin type
765 :param type_: The type of the collection, e.g. `List[int]` or `List`, or a subclass of one of them
766 :param origin: The origin type
767 """
768 schema: core_schema.CoreSchema = { # type: ignore[misc,assignment]
769 'type': parent_type.__name__.lower(),
770 'items_schema': self.generate_schema(get_first_arg(type_)),
771 }
773 if origin == parent_type:
774 return schema
775 else:
776 # Ensure the validated value is converted back to the specific subclass type
777 # NOTE: we might have better performance by using a tuple or list validator for the schema here,
778 # but if you care about performance, you can define your own schema.
779 # We should optimize for compatibility, not performance in this case
780 return core_schema.general_after_validator_function(
781 lambda __input_value, __info: type_(__input_value), schema
782 )
784 def _tuple_schema(self, tuple_type: Any) -> core_schema.CoreSchema:
785 """
786 Generate schema for a Tuple, e.g. `tuple[int, str]` or `tuple[int, ...]`.
787 """
788 params = get_args(tuple_type)
789 # NOTE: subtle difference: `tuple[()]` gives `params=()`, whereas `typing.Tuple[()]` gives `params=((),)`
790 if not params:
791 if tuple_type == typing.Tuple:
792 return core_schema.tuple_variable_schema()
793 else:
794 # special case for `tuple[()]` which means `tuple[]` - an empty tuple
795 return core_schema.tuple_positional_schema([])
796 elif params[-1] is Ellipsis:
797 if len(params) == 2:
798 sv = core_schema.tuple_variable_schema(self.generate_schema(params[0]))
799 return sv
801 # not sure this case is valid in python, but may as well support it here since pydantic-core does
802 *items_schema, extra_schema = params
803 return core_schema.tuple_positional_schema(
804 [self.generate_schema(p) for p in items_schema], extra_schema=self.generate_schema(extra_schema)
805 )
806 elif len(params) == 1 and params[0] == ():
807 # special case for `Tuple[()]` which means `Tuple[]` - an empty tuple
808 return core_schema.tuple_positional_schema([])
809 else:
810 return core_schema.tuple_positional_schema([self.generate_schema(p) for p in params])
812 def _dict_schema(self, dict_type: Any) -> core_schema.DictSchema:
813 """
814 Generate schema for a Dict, e.g. `dict[str, int]`.
815 """
816 try:
817 arg0, arg1 = get_args(dict_type)
818 except ValueError:
819 return core_schema.dict_schema()
820 else:
821 return core_schema.dict_schema(
822 keys_schema=self.generate_schema(arg0),
823 values_schema=self.generate_schema(arg1),
824 )
826 def _dict_subclass_schema(self, dict_subclass: Any) -> core_schema.CoreSchema:
827 """
828 Generate schema for a subclass of dict or Dict
829 """
830 try:
831 arg0, arg1 = get_args(dict_subclass)
832 except ValueError:
833 arg0, arg1 = Any, Any
835 from ._validators import mapping_validator
837 # TODO could do `core_schema.chain_schema(core_schema.is_instance_schema(dict_subclass), ...` in strict mode
838 return core_schema.general_wrap_validator_function(
839 mapping_validator,
840 core_schema.dict_schema(
841 keys_schema=self.generate_schema(arg0),
842 values_schema=self.generate_schema(arg1),
843 ),
844 )
846 def _counter_schema(self, counter_type: Any) -> core_schema.CoreSchema:
847 """
848 Generate schema for `typing.Counter`
849 """
850 arg = get_first_arg(counter_type)
852 from ._validators import construct_counter
854 # TODO could do `core_schema.chain_schema(core_schema.is_instance_schema(Counter), ...` in strict mode
855 return core_schema.general_after_validator_function(
856 construct_counter,
857 core_schema.dict_schema(
858 keys_schema=self.generate_schema(arg),
859 values_schema=core_schema.int_schema(),
860 ),
861 )
863 def _mapping_schema(self, mapping_type: Any) -> core_schema.CoreSchema:
864 """
865 Generate schema for a Dict, e.g. `dict[str, int]`.
866 """
867 try:
868 arg0, arg1 = get_args(mapping_type)
869 except ValueError:
870 return core_schema.is_instance_schema(typing.Mapping, cls_repr='Mapping')
871 else:
872 from ._validators import mapping_validator
874 return core_schema.general_wrap_validator_function(
875 mapping_validator,
876 core_schema.dict_schema(
877 keys_schema=self.generate_schema(arg0),
878 values_schema=self.generate_schema(arg1),
879 ),
880 )
882 def _type_schema(self) -> core_schema.CoreSchema:
883 return core_schema.custom_error_schema(
884 core_schema.is_instance_schema(type),
885 custom_error_type='is_type',
886 custom_error_message='Input should be a type',
887 )
889 def _subclass_schema(self, type_: Any) -> core_schema.CoreSchema:
890 """
891 Generate schema for a Type, e.g. `Type[int]`.
892 """
893 type_param = get_first_arg(type_)
894 if type_param == Any:
895 return self._type_schema()
896 elif isinstance(type_param, typing.TypeVar):
897 if type_param.__bound__:
898 return core_schema.is_subclass_schema(type_param.__bound__)
899 elif type_param.__constraints__:
900 return core_schema.union_schema(
901 [self.generate_schema(typing.Type[c]) for c in type_param.__constraints__]
902 )
903 else:
904 return self._type_schema()
905 else:
906 return core_schema.is_subclass_schema(type_param)
908 def _sequence_schema(self, sequence_type: Any) -> core_schema.CoreSchema:
909 """
910 Generate schema for a Sequence, e.g. `Sequence[int]`.
911 """
912 item_type = get_first_arg(sequence_type)
914 if item_type == Any:
915 return core_schema.is_instance_schema(typing.Sequence, cls_repr='Sequence')
916 else:
917 from ._validators import sequence_validator
919 return core_schema.chain_schema(
920 [
921 core_schema.is_instance_schema(typing.Sequence, cls_repr='Sequence'),
922 core_schema.general_wrap_validator_function(
923 sequence_validator,
924 core_schema.list_schema(self.generate_schema(item_type), allow_any_iter=True),
925 ),
926 ]
927 )
929 def _iterable_schema(self, type_: Any) -> core_schema.GeneratorSchema:
930 """
931 Generate a schema for an `Iterable`.
933 TODO replace with pydantic-core's generator validator.
934 """
935 item_type = get_first_arg(type_)
937 return core_schema.generator_schema(self.generate_schema(item_type))
939 def _pattern_schema(self, pattern_type: Any) -> core_schema.CoreSchema:
940 from . import _serializers, _validators
942 metadata = build_metadata_dict(js_functions=[lambda _1, _2: {'type': 'string', 'format': 'regex'}])
943 ser = core_schema.plain_serializer_function_ser_schema(
944 _serializers.pattern_serializer, info_arg=True, json_return_type='str'
945 )
946 if pattern_type == typing.Pattern or pattern_type == re.Pattern:
947 # bare type
948 return core_schema.general_plain_validator_function(
949 _validators.pattern_either_validator, serialization=ser, metadata=metadata
950 )
952 param = get_args(pattern_type)[0]
953 if param == str:
954 return core_schema.general_plain_validator_function(
955 _validators.pattern_str_validator, serialization=ser, metadata=metadata
956 )
957 elif param == bytes:
958 return core_schema.general_plain_validator_function(
959 _validators.pattern_bytes_validator, serialization=ser, metadata=metadata
960 )
961 else:
962 raise PydanticSchemaGenerationError(f'Unable to generate pydantic-core schema for {pattern_type!r}.')
964 def _hashable_schema(self) -> core_schema.CoreSchema:
965 return core_schema.custom_error_schema(
966 core_schema.is_instance_schema(collections.abc.Hashable),
967 custom_error_type='is_hashable',
968 custom_error_message='Input should be hashable',
969 )
971 def _std_types_schema(self, obj: Any) -> core_schema.CoreSchema | None:
972 """
973 Generate schema for types in the standard library.
974 """
975 if not isinstance(obj, type):
976 return None
978 # Import here to avoid the extra import time earlier since _std_validators imports lots of things globally
979 from ._std_types_schema import SCHEMA_LOOKUP
981 # instead of iterating over a list and calling is_instance, this should be somewhat faster,
982 # especially as it should catch most types on the first iteration
983 # (same as we do/used to do in json encoding)
984 for base in obj.__mro__[:-1]:
985 try:
986 encoder = SCHEMA_LOOKUP[base]
987 except KeyError:
988 continue
989 return encoder(self, obj)
990 return None
992 def _dataclass_schema(
993 self, dataclass: type[StandardDataclass], origin: type[StandardDataclass] | None
994 ) -> core_schema.CoreSchema:
995 """
996 Generate schema for a dataclass.
997 """
998 dataclass_ref, schema = self._get_or_cache_recursive_ref(dataclass)
999 if schema is not None:
1000 return schema
1002 typevars_map = get_standard_typevars_map(dataclass)
1003 if origin is not None:
1004 dataclass = origin
1006 from ._dataclasses import is_pydantic_dataclass
1008 if is_pydantic_dataclass(dataclass):
1009 fields = dataclass.__pydantic_fields__
1010 if typevars_map:
1011 for field in fields.values():
1012 field.apply_typevars_map(typevars_map, self.types_namespace)
1013 else:
1014 fields = collect_dataclass_fields(
1015 dataclass,
1016 self.types_namespace,
1017 typevars_map=typevars_map,
1018 )
1019 decorators = getattr(dataclass, '__pydantic_decorators__', None) or DecoratorInfos()
1020 args = [self._generate_dc_field_schema(k, v, decorators) for k, v in fields.items()]
1021 has_post_init = hasattr(dataclass, '__post_init__')
1022 args_schema = core_schema.dataclass_args_schema(
1023 dataclass.__name__,
1024 args,
1025 computed_fields=generate_computed_field(decorators.computed_fields),
1026 collect_init_only=has_post_init,
1027 )
1028 inner_schema = apply_validators(args_schema, decorators.root_validator.values())
1029 dc_schema = core_schema.dataclass_schema(dataclass, inner_schema, post_init=has_post_init, ref=dataclass_ref)
1030 schema = apply_model_serializers(dc_schema, decorators.model_serializer.values())
1031 return apply_model_validators(schema, decorators.model_validator.values())
1033 def _callable_schema(self, function: Callable[..., Any]) -> core_schema.CallSchema:
1034 """
1035 Generate schema for a Callable.
1037 TODO support functional validators once we support them in Config
1038 """
1039 sig = signature(function)
1041 type_hints = _typing_extra.get_function_type_hints(function)
1043 mode_lookup: dict[_ParameterKind, Literal['positional_only', 'positional_or_keyword', 'keyword_only']] = {
1044 Parameter.POSITIONAL_ONLY: 'positional_only',
1045 Parameter.POSITIONAL_OR_KEYWORD: 'positional_or_keyword',
1046 Parameter.KEYWORD_ONLY: 'keyword_only',
1047 }
1049 arguments_list: list[core_schema.ArgumentsParameter] = []
1050 var_args_schema: core_schema.CoreSchema | None = None
1051 var_kwargs_schema: core_schema.CoreSchema | None = None
1053 for name, p in sig.parameters.items():
1054 if p.annotation is sig.empty:
1055 annotation = Any
1056 else:
1057 annotation = type_hints[name]
1059 parameter_mode = mode_lookup.get(p.kind)
1060 if parameter_mode is not None:
1061 arg_schema = self._generate_parameter_schema(name, annotation, p.default, parameter_mode)
1062 arguments_list.append(arg_schema)
1063 elif p.kind == Parameter.VAR_POSITIONAL:
1064 var_args_schema = self.generate_schema(annotation)
1065 else:
1066 assert p.kind == Parameter.VAR_KEYWORD, p.kind
1067 var_kwargs_schema = self.generate_schema(annotation)
1069 return_schema: core_schema.CoreSchema | None = None
1070 config_wrapper = self.config_wrapper
1071 if config_wrapper.validate_return:
1072 return_hint = type_hints.get('return')
1073 if return_hint is not None:
1074 return_schema = self.generate_schema(return_hint)
1076 return core_schema.call_schema(
1077 core_schema.arguments_schema(
1078 arguments_list,
1079 var_args_schema=var_args_schema,
1080 var_kwargs_schema=var_kwargs_schema,
1081 populate_by_name=config_wrapper.populate_by_name,
1082 ),
1083 function,
1084 return_schema=return_schema,
1085 )
1087 def _unsubstituted_typevar_schema(self, typevar: typing.TypeVar) -> core_schema.CoreSchema:
1088 assert isinstance(typevar, typing.TypeVar)
1090 if typevar.__bound__:
1091 return self.generate_schema(typevar.__bound__)
1092 elif typevar.__constraints__:
1093 return self._union_schema(typing.Union[typevar.__constraints__]) # type: ignore
1094 else:
1095 return core_schema.AnySchema(type='any')
1097 def _get_or_cache_recursive_ref(self, cls: type[Any]) -> tuple[str, core_schema.DefinitionReferenceSchema | None]:
1098 obj_ref = get_type_ref(cls)
1099 if obj_ref in self.recursion_cache:
1100 return obj_ref, self.recursion_cache[obj_ref]
1101 else:
1102 self.recursion_cache[obj_ref] = core_schema.definition_reference_schema(obj_ref)
1103 return obj_ref, None
1106_VALIDATOR_F_MATCH: Mapping[
1107 tuple[FieldValidatorModes, Literal['no-info', 'general', 'field']],
1108 Callable[[Callable[..., Any], core_schema.CoreSchema], core_schema.CoreSchema],
1109] = {
1110 ('before', 'no-info'): core_schema.no_info_before_validator_function,
1111 ('after', 'no-info'): core_schema.no_info_after_validator_function,
1112 ('plain', 'no-info'): lambda f, _: core_schema.no_info_plain_validator_function(f),
1113 ('wrap', 'no-info'): core_schema.no_info_wrap_validator_function,
1114 ('before', 'general'): core_schema.general_before_validator_function,
1115 ('after', 'general'): core_schema.general_after_validator_function,
1116 ('plain', 'general'): lambda f, _: core_schema.general_plain_validator_function(f),
1117 ('wrap', 'general'): core_schema.general_wrap_validator_function,
1118 ('before', 'field'): core_schema.field_before_validator_function,
1119 ('after', 'field'): core_schema.field_after_validator_function,
1120 ('plain', 'field'): lambda f, _: core_schema.field_plain_validator_function(f),
1121 ('wrap', 'field'): core_schema.field_wrap_validator_function,
1122}
1125def apply_validators(
1126 schema: core_schema.CoreSchema,
1127 validators: Iterable[Decorator[RootValidatorDecoratorInfo]]
1128 | Iterable[Decorator[ValidatorDecoratorInfo]]
1129 | Iterable[Decorator[FieldValidatorDecoratorInfo]],
1130) -> core_schema.CoreSchema:
1131 """
1132 Apply validators to a schema.
1133 """
1134 for validator in validators:
1135 info_arg = inspect_validator(validator.func, validator.info.mode)
1136 if not info_arg:
1137 val_type: Literal['no-info', 'general', 'field'] = 'no-info'
1138 elif isinstance(validator.info, (FieldValidatorDecoratorInfo, ValidatorDecoratorInfo)):
1139 val_type = 'field'
1140 else:
1141 val_type = 'general'
1142 schema = _VALIDATOR_F_MATCH[(validator.info.mode, val_type)](validator.func, schema)
1143 return schema
1146def _validators_require_validate_default(validators: Iterable[Decorator[ValidatorDecoratorInfo]]) -> bool:
1147 """
1148 In v1, if any of the validators for a field had `always=True`, the default value would be validated.
1150 This serves as an auxiliary function for re-implementing that logic, by looping over a provided
1151 collection of (v1-style) ValidatorDecoratorInfo's and checking if any of them have `always=True`.
1153 We should be able to drop this function and the associated logic calling it once we drop support
1154 for v1-style validator decorators. (Or we can extend it and keep it if we add something equivalent
1155 to the v1-validator `always` kwarg to `field_validator`.)
1156 """
1157 for validator in validators:
1158 if validator.info.always:
1159 return True
1160 return False
1163def apply_field_serializers(
1164 schema: core_schema.CoreSchema, serializers: list[Decorator[FieldSerializerDecoratorInfo]]
1165) -> core_schema.CoreSchema:
1166 """
1167 Apply field serializers to a schema.
1168 """
1169 if serializers:
1170 # use the last serializer to make it easy to override a serializer set on a parent model
1171 serializer = serializers[-1]
1172 is_field_serializer, info_arg = inspect_field_serializer(serializer.func, serializer.info.mode)
1173 if serializer.info.mode == 'wrap':
1174 schema['serialization'] = core_schema.wrap_serializer_function_ser_schema(
1175 serializer.func,
1176 is_field_serializer=is_field_serializer,
1177 info_arg=info_arg,
1178 json_return_type=serializer.info.json_return_type,
1179 when_used=serializer.info.when_used,
1180 )
1181 else:
1182 assert serializer.info.mode == 'plain'
1183 schema['serialization'] = core_schema.plain_serializer_function_ser_schema(
1184 serializer.func,
1185 is_field_serializer=is_field_serializer,
1186 info_arg=info_arg,
1187 json_return_type=serializer.info.json_return_type,
1188 when_used=serializer.info.when_used,
1189 )
1190 return schema
1193def apply_model_serializers(
1194 schema: core_schema.CoreSchema, serializers: Iterable[Decorator[ModelSerializerDecoratorInfo]]
1195) -> core_schema.CoreSchema:
1196 """
1197 Apply model serializers to a schema.
1198 """
1199 if serializers:
1200 serializer = list(serializers)[-1]
1201 info_arg = inspect_model_serializer(serializer.func, serializer.info.mode)
1202 if serializer.info.mode == 'wrap':
1203 ser_schema: core_schema.SerSchema = core_schema.wrap_serializer_function_ser_schema(
1204 serializer.func,
1205 info_arg=info_arg,
1206 json_return_type=serializer.info.json_return_type,
1207 )
1208 else:
1209 # plain
1210 ser_schema = core_schema.plain_serializer_function_ser_schema(
1211 serializer.func,
1212 info_arg=info_arg,
1213 json_return_type=serializer.info.json_return_type,
1214 )
1215 schema['serialization'] = ser_schema
1216 return schema
1219def apply_model_validators(
1220 schema: core_schema.CoreSchema, validators: Iterable[Decorator[ModelValidatorDecoratorInfo]]
1221) -> core_schema.CoreSchema:
1222 """
1223 Apply model validators to a schema.
1224 """
1225 for validator in validators:
1226 info_arg = inspect_validator(validator.func, validator.info.mode)
1227 if validator.info.mode == 'wrap':
1228 if info_arg:
1229 schema = core_schema.general_wrap_validator_function(function=validator.func, schema=schema)
1230 else:
1231 schema = core_schema.no_info_wrap_validator_function(function=validator.func, schema=schema)
1232 elif validator.info.mode == 'before':
1233 if info_arg:
1234 schema = core_schema.general_before_validator_function(function=validator.func, schema=schema)
1235 else:
1236 schema = core_schema.no_info_before_validator_function(function=validator.func, schema=schema)
1237 else:
1238 assert validator.info.mode == 'after'
1239 if info_arg:
1240 schema = core_schema.general_after_validator_function(function=validator.func, schema=schema)
1241 else:
1242 schema = core_schema.no_info_after_validator_function(function=validator.func, schema=schema)
1243 return schema
1246def apply_annotations(
1247 get_inner_schema: ModifyCoreSchemaWrapHandler,
1248 annotations: typing.Iterable[Any],
1249 definitions: dict[str, core_schema.CoreSchema],
1250) -> ModifyCoreSchemaWrapHandler:
1251 """
1252 Apply arguments from `Annotated` or from `FieldInfo` to a schema.
1253 """
1254 for annotation in annotations:
1255 if annotation is None:
1256 continue
1257 get_inner_schema = get_wrapped_inner_schema(get_inner_schema, annotation, definitions)
1259 return get_inner_schema
1262def get_wrapped_inner_schema(
1263 get_inner_schema: ModifyCoreSchemaWrapHandler, annotation: Any, definitions: dict[str, core_schema.CoreSchema]
1264) -> Callable[[Any], core_schema.CoreSchema]:
1265 metadata_get_schema: GetCoreSchemaFunction = getattr(annotation, '__get_pydantic_core_schema__', None) or (
1266 lambda source, handler: handler(source)
1267 )
1269 def _new_inner_schema_handler(source: Any) -> core_schema.CoreSchema:
1270 schema = metadata_get_schema(source, get_inner_schema)
1271 schema = apply_single_annotation(schema, annotation, definitions)
1273 metadata_js_function = _extract_get_pydantic_json_schema(annotation)
1274 if metadata_js_function is not None:
1275 metadata = CoreMetadataHandler(schema).metadata
1276 metadata['pydantic_js_functions'] = metadata.get('pydantic_js_functions', [])
1277 metadata['pydantic_js_functions'].append(metadata_js_function)
1278 return schema
1280 return _new_inner_schema_handler
1283def apply_single_annotation(
1284 schema: core_schema.CoreSchema, metadata: Any, definitions: dict[str, core_schema.CoreSchema]
1285) -> core_schema.CoreSchema:
1286 if isinstance(metadata, GroupedMetadata):
1287 for a in metadata:
1288 schema = apply_single_annotation(schema, a, definitions)
1289 elif isinstance(metadata, FieldInfo):
1290 for field_metadata in metadata.metadata:
1291 schema = apply_single_annotation(schema, field_metadata, definitions)
1292 if metadata.discriminator is not None:
1293 schema = _discriminated_union.apply_discriminator(schema, metadata.discriminator, definitions)
1294 # TODO setting a default here needs to be tested
1295 return wrap_default(metadata, schema)
1297 if isinstance(metadata, PydanticGeneralMetadata):
1298 metadata_dict = metadata.__dict__
1299 elif isinstance(metadata, (BaseMetadata, PydanticMetadata)):
1300 metadata_dict = dataclasses.asdict(metadata) # type: ignore[call-overload]
1301 elif isinstance(metadata, type) and issubclass(metadata, PydanticMetadata):
1302 # also support PydanticMetadata classes being used without initialisation,
1303 # e.g. `Annotated[int, Strict]` as well as `Annotated[int, Strict()]`
1304 metadata_dict = {k: v for k, v in vars(metadata).items() if not k.startswith('_')}
1305 else:
1306 # PEP 593: "If a library (or tool) encounters a typehint Annotated[T, x] and has no
1307 # special logic for metadata x, it should ignore it and simply treat the type as T."
1308 # Allow, but ignore, any unknown metadata.
1309 return schema
1311 # TODO we need a way to remove metadata which this line currently prevents
1312 metadata_dict = {k: v for k, v in metadata_dict.items() if v is not None}
1313 if not metadata_dict:
1314 return schema
1316 handler = CoreMetadataHandler(schema)
1317 update_schema_function = handler.metadata.get('pydantic_cs_update_function')
1318 if update_schema_function is not None:
1319 new_schema = update_schema_function(schema, **metadata_dict)
1320 if new_schema is not None:
1321 schema = new_schema
1322 else:
1323 if schema['type'] == 'nullable':
1324 # for nullable schemas, metadata is automatically applied to the inner schema
1325 # TODO need to do the same for lists, tuples and more
1326 schema['schema'].update(metadata_dict)
1327 else:
1328 schema.update(metadata_dict) # type: ignore[typeddict-item]
1329 try:
1330 SchemaValidator(schema)
1331 except SchemaError as e:
1332 # TODO: Generate an easier-to-understand ValueError here saying the field constraints are not enforced
1333 # The relevant test is: `tests.test_schema.test_unenforced_constraints_schema
1334 raise e
1335 return schema
1338def wrap_default(field_info: FieldInfo, schema: core_schema.CoreSchema) -> core_schema.CoreSchema:
1339 if field_info.default_factory:
1340 return core_schema.with_default_schema(
1341 schema, default_factory=field_info.default_factory, validate_default=field_info.validate_default
1342 )
1343 elif field_info.default is not Undefined:
1344 return core_schema.with_default_schema(
1345 schema, default=field_info.default, validate_default=field_info.validate_default
1346 )
1347 else:
1348 return schema
1351def get_first_arg(type_: Any) -> Any:
1352 """
1353 Get the first argument from a typing object, e.g. `List[int]` -> `int`, or `Any` if no argument.
1354 """
1355 try:
1356 return get_args(type_)[0]
1357 except IndexError:
1358 return Any
1361def _extract_get_pydantic_json_schema(tp: Any) -> GetJsonSchemaFunction | None:
1362 """Extract `__get_pydantic_json_schema__` from a type, handling the deprecated `__modify_schema__`"""
1363 js_modify_function = getattr(tp, '__get_pydantic_json_schema__', None)
1365 if js_modify_function is None and hasattr(tp, '__modify_schema__'):
1366 warnings.warn(
1367 'The __modify_schema__ method is deprecated, use __get_pydantic_json_schema__ instead',
1368 DeprecationWarning,
1369 )
1370 return lambda c, h: tp.__modify_schema__(h(c))
1372 return js_modify_function
1375class _CommonField(TypedDict):
1376 schema: core_schema.CoreSchema
1377 validation_alias: str | list[str | int] | list[list[str | int]] | None
1378 serialization_alias: str | None
1379 serialization_exclude: bool | None
1380 frozen: bool | None
1381 metadata: Any
1384def _common_field(
1385 schema: core_schema.CoreSchema,
1386 *,
1387 validation_alias: str | list[str | int] | list[list[str | int]] | None = None,
1388 serialization_alias: str | None = None,
1389 serialization_exclude: bool | None = None,
1390 frozen: bool | None = None,
1391 metadata: Any = None,
1392) -> _CommonField:
1393 return {
1394 'schema': schema,
1395 'validation_alias': validation_alias,
1396 'serialization_alias': serialization_alias,
1397 'serialization_exclude': serialization_exclude,
1398 'frozen': frozen,
1399 'metadata': metadata,
1400 }
1403def generate_computed_field(d: dict[str, Decorator[ComputedFieldInfo]]) -> list[core_schema.ComputedField] | None:
1404 r = [
1405 core_schema.computed_field(
1406 d.cls_var_name,
1407 json_return_type=d.info.json_return_type,
1408 alias=d.info.alias,
1409 )
1410 for d in d.values()
1411 ]
1412 return r