1"""Convert python types to pydantic-core schema."""
2
3from __future__ import annotations as _annotations
4
5import collections.abc
6import dataclasses
7import datetime
8import inspect
9import os
10import pathlib
11import re
12import sys
13import typing
14import warnings
15from collections.abc import Generator, Iterable, Iterator, Mapping
16from contextlib import contextmanager
17from copy import copy
18from decimal import Decimal
19from enum import Enum
20from fractions import Fraction
21from functools import partial
22from inspect import Parameter, _ParameterKind, signature
23from ipaddress import IPv4Address, IPv4Interface, IPv4Network, IPv6Address, IPv6Interface, IPv6Network
24from itertools import chain
25from operator import attrgetter
26from types import FunctionType, GenericAlias, LambdaType, MethodType
27from typing import (
28 TYPE_CHECKING,
29 Any,
30 Callable,
31 Final,
32 ForwardRef,
33 Literal,
34 TypeVar,
35 Union,
36 cast,
37 overload,
38)
39from uuid import UUID
40from zoneinfo import ZoneInfo
41
42import typing_extensions
43from pydantic_core import (
44 MISSING,
45 CoreSchema,
46 MultiHostUrl,
47 PydanticCustomError,
48 PydanticSerializationUnexpectedValue,
49 PydanticUndefined,
50 Url,
51 core_schema,
52 to_jsonable_python,
53)
54from typing_extensions import TypeAlias, TypeAliasType, get_args, get_origin, is_typeddict
55from typing_inspection import typing_objects
56from typing_inspection.introspection import AnnotationSource, get_literal_values, is_union_origin
57
58from ..aliases import AliasChoices, AliasPath
59from ..annotated_handlers import GetCoreSchemaHandler, GetJsonSchemaHandler
60from ..config import ConfigDict, JsonDict, JsonEncoder, JsonSchemaExtraCallable
61from ..errors import PydanticSchemaGenerationError, PydanticUndefinedAnnotation, PydanticUserError
62from ..functional_validators import AfterValidator, BeforeValidator, FieldValidatorModes, PlainValidator, WrapValidator
63from ..json_schema import JsonSchemaValue
64from ..version import version_short
65from ..warnings import (
66 ArbitraryTypeWarning,
67 PydanticDeprecatedSince20,
68 TypedDictExtraConfigWarning,
69 UnsupportedFieldAttributeWarning,
70)
71from . import _decorators, _discriminated_union, _known_annotated_metadata, _repr, _typing_extra
72from ._config import ConfigWrapper, ConfigWrapperStack
73from ._core_metadata import CoreMetadata, update_core_metadata
74from ._core_utils import (
75 get_ref,
76 get_type_ref,
77 is_list_like_schema_with_items_schema,
78)
79from ._decorators import (
80 Decorator,
81 DecoratorInfos,
82 FieldSerializerDecoratorInfo,
83 FieldValidatorDecoratorInfo,
84 ModelSerializerDecoratorInfo,
85 ModelValidatorDecoratorInfo,
86 RootValidatorDecoratorInfo,
87 ValidatorDecoratorInfo,
88 get_attribute_from_bases,
89 inspect_field_serializer,
90 inspect_model_serializer,
91 inspect_validator,
92)
93from ._docs_extraction import extract_docstrings_from_cls
94from ._fields import (
95 collect_dataclass_fields,
96 rebuild_dataclass_fields,
97 rebuild_model_fields,
98 takes_validated_data_argument,
99 update_field_from_config,
100)
101from ._forward_ref import PydanticRecursiveRef
102from ._generics import get_standard_typevars_map, replace_types
103from ._import_utils import import_cached_base_model, import_cached_field_info
104from ._mock_val_ser import MockCoreSchema
105from ._namespace_utils import NamespacesTuple, NsResolver
106from ._schema_gather import MissingDefinitionError, gather_schemas_for_cleaning
107from ._schema_generation_shared import CallbackGetCoreSchemaHandler
108from ._utils import lenient_issubclass, smart_deepcopy
109
110if TYPE_CHECKING:
111 from ..fields import ComputedFieldInfo, FieldInfo
112 from ..main import BaseModel
113 from ..types import Discriminator
114 from ._dataclasses import StandardDataclass
115 from ._schema_generation_shared import GetJsonSchemaFunction
116
117_SUPPORTS_TYPEDDICT = sys.version_info >= (3, 12)
118
119FieldDecoratorInfo = Union[ValidatorDecoratorInfo, FieldValidatorDecoratorInfo, FieldSerializerDecoratorInfo]
120FieldDecoratorInfoType = TypeVar('FieldDecoratorInfoType', bound=FieldDecoratorInfo)
121AnyFieldDecorator = Union[
122 Decorator[ValidatorDecoratorInfo],
123 Decorator[FieldValidatorDecoratorInfo],
124 Decorator[FieldSerializerDecoratorInfo],
125]
126
127ModifyCoreSchemaWrapHandler: TypeAlias = GetCoreSchemaHandler
128GetCoreSchemaFunction: TypeAlias = Callable[[Any, ModifyCoreSchemaWrapHandler], core_schema.CoreSchema]
129ParametersCallback: TypeAlias = "Callable[[int, str, Any], Literal['skip'] | None]"
130
131TUPLE_TYPES: list[type] = [typing.Tuple, tuple] # noqa: UP006
132LIST_TYPES: list[type] = [typing.List, list, collections.abc.MutableSequence] # noqa: UP006
133SET_TYPES: list[type] = [typing.Set, set, collections.abc.MutableSet] # noqa: UP006
134FROZEN_SET_TYPES: list[type] = [typing.FrozenSet, frozenset, collections.abc.Set] # noqa: UP006
135DICT_TYPES: list[type] = [typing.Dict, dict] # noqa: UP006
136IP_TYPES: list[type] = [IPv4Address, IPv4Interface, IPv4Network, IPv6Address, IPv6Interface, IPv6Network]
137SEQUENCE_TYPES: list[type] = [typing.Sequence, collections.abc.Sequence]
138ITERABLE_TYPES: list[type] = [typing.Iterable, collections.abc.Iterable, typing.Generator, collections.abc.Generator]
139TYPE_TYPES: list[type] = [typing.Type, type] # noqa: UP006
140PATTERN_TYPES: list[type] = [typing.Pattern, re.Pattern]
141PATH_TYPES: list[type] = [
142 os.PathLike,
143 pathlib.Path,
144 pathlib.PurePath,
145 pathlib.PosixPath,
146 pathlib.PurePosixPath,
147 pathlib.PureWindowsPath,
148]
149MAPPING_TYPES = [
150 typing.Mapping,
151 typing.MutableMapping,
152 collections.abc.Mapping,
153 collections.abc.MutableMapping,
154 collections.OrderedDict,
155 typing_extensions.OrderedDict,
156 typing.DefaultDict, # noqa: UP006
157 collections.defaultdict,
158]
159COUNTER_TYPES = [collections.Counter, typing.Counter]
160DEQUE_TYPES: list[type] = [collections.deque, typing.Deque] # noqa: UP006
161
162# Note: This does not play very well with type checkers. For example,
163# `a: LambdaType = lambda x: x` will raise a type error by Pyright.
164ValidateCallSupportedTypes = Union[
165 LambdaType,
166 FunctionType,
167 MethodType,
168 partial,
169]
170
171VALIDATE_CALL_SUPPORTED_TYPES = get_args(ValidateCallSupportedTypes)
172UNSUPPORTED_STANDALONE_FIELDINFO_ATTRIBUTES: list[tuple[str, Any]] = [
173 ('alias', None),
174 ('validation_alias', None),
175 ('serialization_alias', None),
176 # will be set if any alias is set, so disable it to avoid double warnings:
177 # 'alias_priority',
178 ('default', PydanticUndefined),
179 ('default_factory', None),
180 ('exclude', None),
181 ('deprecated', None),
182 ('repr', True),
183 ('validate_default', None),
184 ('frozen', None),
185 ('init', None),
186 ('init_var', None),
187 ('kw_only', None),
188]
189"""`FieldInfo` attributes (and their default value) that can't be used outside of a model (e.g. in a type adapter or a PEP 695 type alias)."""
190
191_mode_to_validator: dict[
192 FieldValidatorModes, type[BeforeValidator | AfterValidator | PlainValidator | WrapValidator]
193] = {'before': BeforeValidator, 'after': AfterValidator, 'plain': PlainValidator, 'wrap': WrapValidator}
194
195
196def check_validator_fields_against_field_name(
197 info: FieldDecoratorInfo,
198 field: str,
199) -> bool:
200 """Check if field name is in validator fields.
201
202 Args:
203 info: The field info.
204 field: The field name to check.
205
206 Returns:
207 `True` if field name is in validator fields, `False` otherwise.
208 """
209 fields = info.fields
210 return '*' in fields or field in fields
211
212
213def check_decorator_fields_exist(decorators: Iterable[AnyFieldDecorator], fields: Iterable[str]) -> None:
214 """Check if the defined fields in decorators exist in `fields` param.
215
216 It ignores the check for a decorator if the decorator has `*` as field or `check_fields=False`.
217
218 Args:
219 decorators: An iterable of decorators.
220 fields: An iterable of fields name.
221
222 Raises:
223 PydanticUserError: If one of the field names does not exist in `fields` param.
224 """
225 fields = set(fields)
226 for dec in decorators:
227 if '*' in dec.info.fields:
228 continue
229 if dec.info.check_fields is False:
230 continue
231 for field in dec.info.fields:
232 if field not in fields:
233 raise PydanticUserError(
234 f'Decorators defined with incorrect fields: {dec.cls_ref}.{dec.cls_var_name}'
235 " (use check_fields=False if you're inheriting from the model and intended this)",
236 code='decorator-missing-field',
237 )
238
239
240def filter_field_decorator_info_by_field(
241 validator_functions: Iterable[Decorator[FieldDecoratorInfoType]], field: str
242) -> list[Decorator[FieldDecoratorInfoType]]:
243 return [dec for dec in validator_functions if check_validator_fields_against_field_name(dec.info, field)]
244
245
246def apply_each_item_validators(
247 schema: core_schema.CoreSchema,
248 each_item_validators: list[Decorator[ValidatorDecoratorInfo]],
249) -> core_schema.CoreSchema:
250 # This V1 compatibility shim should eventually be removed
251
252 # fail early if each_item_validators is empty
253 if not each_item_validators:
254 return schema
255
256 # push down any `each_item=True` validators
257 # note that this won't work for any Annotated types that get wrapped by a function validator
258 # but that's okay because that didn't exist in V1
259 if schema['type'] == 'nullable':
260 schema['schema'] = apply_each_item_validators(schema['schema'], each_item_validators)
261 return schema
262 elif schema['type'] == 'tuple':
263 if (variadic_item_index := schema.get('variadic_item_index')) is not None:
264 schema['items_schema'][variadic_item_index] = apply_validators(
265 schema['items_schema'][variadic_item_index],
266 each_item_validators,
267 )
268 elif is_list_like_schema_with_items_schema(schema):
269 inner_schema = schema.get('items_schema', core_schema.any_schema())
270 schema['items_schema'] = apply_validators(inner_schema, each_item_validators)
271 elif schema['type'] == 'dict':
272 inner_schema = schema.get('values_schema', core_schema.any_schema())
273 schema['values_schema'] = apply_validators(inner_schema, each_item_validators)
274 else:
275 raise TypeError(
276 f'`@validator(..., each_item=True)` cannot be applied to fields with a schema of {schema["type"]}'
277 )
278 return schema
279
280
281def _extract_json_schema_info_from_field_info(
282 info: FieldInfo | ComputedFieldInfo,
283) -> tuple[JsonDict | None, JsonDict | JsonSchemaExtraCallable | None]:
284 json_schema_updates = {
285 'title': info.title,
286 'description': info.description,
287 'deprecated': bool(info.deprecated) or info.deprecated == '' or None,
288 'examples': to_jsonable_python(info.examples),
289 }
290 json_schema_updates = {k: v for k, v in json_schema_updates.items() if v is not None}
291 return (json_schema_updates or None, info.json_schema_extra)
292
293
294JsonEncoders = dict[type[Any], JsonEncoder]
295
296
297def _add_custom_serialization_from_json_encoders(
298 json_encoders: JsonEncoders | None, tp: Any, schema: CoreSchema
299) -> CoreSchema:
300 """Iterate over the json_encoders and add the first matching encoder to the schema.
301
302 Args:
303 json_encoders: A dictionary of types and their encoder functions.
304 tp: The type to check for a matching encoder.
305 schema: The schema to add the encoder to.
306 """
307 if not json_encoders:
308 return schema
309 if 'serialization' in schema:
310 return schema
311 # Check the class type and its superclasses for a matching encoder
312 # Decimal.__class__.__mro__ (and probably other cases) doesn't include Decimal itself
313 # if the type is a GenericAlias (e.g. from list[int]) we need to use __class__ instead of .__mro__
314 for base in (tp, *getattr(tp, '__mro__', tp.__class__.__mro__)[:-1]):
315 encoder = json_encoders.get(base)
316 if encoder is None:
317 continue
318
319 warnings.warn(
320 f'`json_encoders` is deprecated. See https://docs.pydantic.dev/{version_short()}/concepts/serialization/#custom-serializers for alternatives',
321 PydanticDeprecatedSince20,
322 )
323
324 # TODO: in theory we should check that the schema accepts a serialization key
325 schema['serialization'] = core_schema.plain_serializer_function_ser_schema(encoder, when_used='json')
326 return schema
327
328 return schema
329
330
331class InvalidSchemaError(Exception):
332 """The core schema is invalid."""
333
334
335class GenerateSchema:
336 """Generate core schema for a Pydantic model, dataclass and types like `str`, `datetime`, ... ."""
337
338 __slots__ = (
339 '_config_wrapper_stack',
340 '_ns_resolver',
341 '_typevars_map',
342 'field_name_stack',
343 'model_type_stack',
344 'defs',
345 )
346
347 def __init__(
348 self,
349 config_wrapper: ConfigWrapper,
350 ns_resolver: NsResolver | None = None,
351 typevars_map: Mapping[TypeVar, Any] | None = None,
352 ) -> None:
353 # we need a stack for recursing into nested models
354 self._config_wrapper_stack = ConfigWrapperStack(config_wrapper)
355 self._ns_resolver = ns_resolver or NsResolver()
356 self._typevars_map = typevars_map
357 self.field_name_stack = _FieldNameStack()
358 self.model_type_stack = _ModelTypeStack()
359 self.defs = _Definitions()
360
361 def __init_subclass__(cls) -> None:
362 super().__init_subclass__()
363 warnings.warn(
364 'Subclassing `GenerateSchema` is not supported. The API is highly subject to change in minor versions.',
365 UserWarning,
366 stacklevel=2,
367 )
368
369 @property
370 def _config_wrapper(self) -> ConfigWrapper:
371 return self._config_wrapper_stack.tail
372
373 @property
374 def _types_namespace(self) -> NamespacesTuple:
375 return self._ns_resolver.types_namespace
376
377 @property
378 def _arbitrary_types(self) -> bool:
379 return self._config_wrapper.arbitrary_types_allowed
380
381 # the following methods can be overridden but should be considered
382 # unstable / private APIs
383 def _list_schema(self, items_type: Any) -> CoreSchema:
384 return core_schema.list_schema(self.generate_schema(items_type))
385
386 def _dict_schema(self, keys_type: Any, values_type: Any) -> CoreSchema:
387 return core_schema.dict_schema(self.generate_schema(keys_type), self.generate_schema(values_type))
388
389 def _set_schema(self, items_type: Any) -> CoreSchema:
390 return core_schema.set_schema(self.generate_schema(items_type))
391
392 def _frozenset_schema(self, items_type: Any) -> CoreSchema:
393 return core_schema.frozenset_schema(self.generate_schema(items_type))
394
395 def _enum_schema(self, enum_type: type[Enum]) -> CoreSchema:
396 cases: list[Any] = list(enum_type.__members__.values())
397
398 enum_ref = get_type_ref(enum_type)
399 description = None if not enum_type.__doc__ else inspect.cleandoc(enum_type.__doc__)
400 if (
401 description == 'An enumeration.'
402 ): # This is the default value provided by enum.EnumMeta.__new__; don't use it
403 description = None
404 js_updates = {'title': enum_type.__name__, 'description': description}
405 js_updates = {k: v for k, v in js_updates.items() if v is not None}
406
407 sub_type: Literal['str', 'int', 'float'] | None = None
408 if issubclass(enum_type, int):
409 sub_type = 'int'
410 value_ser_type: core_schema.SerSchema = core_schema.simple_ser_schema('int')
411 elif issubclass(enum_type, str):
412 # this handles `StrEnum` (3.11 only), and also `Foobar(str, Enum)`
413 sub_type = 'str'
414 value_ser_type = core_schema.simple_ser_schema('str')
415 elif issubclass(enum_type, float):
416 sub_type = 'float'
417 value_ser_type = core_schema.simple_ser_schema('float')
418 else:
419 # TODO this is an ugly hack, how do we trigger an Any schema for serialization?
420 value_ser_type = core_schema.plain_serializer_function_ser_schema(lambda x: x)
421
422 if cases:
423
424 def get_json_schema(schema: CoreSchema, handler: GetJsonSchemaHandler) -> JsonSchemaValue:
425 json_schema = handler(schema)
426 original_schema = handler.resolve_ref_schema(json_schema)
427 original_schema.update(js_updates)
428 return json_schema
429
430 # we don't want to add the missing to the schema if it's the default one
431 default_missing = getattr(enum_type._missing_, '__func__', None) is Enum._missing_.__func__ # pyright: ignore[reportFunctionMemberAccess]
432 enum_schema = core_schema.enum_schema(
433 enum_type,
434 cases,
435 sub_type=sub_type,
436 missing=None if default_missing else enum_type._missing_,
437 ref=enum_ref,
438 metadata={'pydantic_js_functions': [get_json_schema]},
439 )
440
441 if self._config_wrapper.use_enum_values:
442 enum_schema = core_schema.no_info_after_validator_function(
443 attrgetter('value'), enum_schema, serialization=value_ser_type
444 )
445
446 return enum_schema
447
448 else:
449
450 def get_json_schema_no_cases(_, handler: GetJsonSchemaHandler) -> JsonSchemaValue:
451 json_schema = handler(core_schema.enum_schema(enum_type, cases, sub_type=sub_type, ref=enum_ref))
452 original_schema = handler.resolve_ref_schema(json_schema)
453 original_schema.update(js_updates)
454 return json_schema
455
456 # Use an isinstance check for enums with no cases.
457 # The most important use case for this is creating TypeVar bounds for generics that should
458 # be restricted to enums. This is more consistent than it might seem at first, since you can only
459 # subclass enum.Enum (or subclasses of enum.Enum) if all parent classes have no cases.
460 # We use the get_json_schema function when an Enum subclass has been declared with no cases
461 # so that we can still generate a valid json schema.
462 return core_schema.is_instance_schema(
463 enum_type,
464 metadata={'pydantic_js_functions': [get_json_schema_no_cases]},
465 )
466
467 def _ip_schema(self, tp: Any) -> CoreSchema:
468 from ._validators import IP_VALIDATOR_LOOKUP, IpType
469
470 ip_type_json_schema_format: dict[type[IpType], str] = {
471 IPv4Address: 'ipv4',
472 IPv4Network: 'ipv4network',
473 IPv4Interface: 'ipv4interface',
474 IPv6Address: 'ipv6',
475 IPv6Network: 'ipv6network',
476 IPv6Interface: 'ipv6interface',
477 }
478
479 def ser_ip(ip: Any, info: core_schema.SerializationInfo) -> str | IpType:
480 if not isinstance(ip, (tp, str)):
481 raise PydanticSerializationUnexpectedValue(
482 f"Expected `{tp}` but got `{type(ip)}` with value `'{ip}'` - serialized value may not be as expected."
483 )
484 if info.mode == 'python':
485 return ip
486 return str(ip)
487
488 return core_schema.lax_or_strict_schema(
489 lax_schema=core_schema.no_info_plain_validator_function(IP_VALIDATOR_LOOKUP[tp]),
490 strict_schema=core_schema.json_or_python_schema(
491 json_schema=core_schema.no_info_after_validator_function(tp, core_schema.str_schema()),
492 python_schema=core_schema.is_instance_schema(tp),
493 ),
494 serialization=core_schema.plain_serializer_function_ser_schema(ser_ip, info_arg=True, when_used='always'),
495 metadata={
496 'pydantic_js_functions': [lambda _1, _2: {'type': 'string', 'format': ip_type_json_schema_format[tp]}]
497 },
498 )
499
500 def _path_schema(self, tp: Any, path_type: Any) -> CoreSchema:
501 if tp is os.PathLike and (path_type not in {str, bytes} and not typing_objects.is_any(path_type)):
502 raise PydanticUserError(
503 '`os.PathLike` can only be used with `str`, `bytes` or `Any`', code='schema-for-unknown-type'
504 )
505
506 path_constructor = pathlib.PurePath if tp is os.PathLike else tp
507 strict_inner_schema = (
508 core_schema.bytes_schema(strict=True) if (path_type is bytes) else core_schema.str_schema(strict=True)
509 )
510 lax_inner_schema = core_schema.bytes_schema() if (path_type is bytes) else core_schema.str_schema()
511
512 def path_validator(input_value: str | bytes) -> os.PathLike[Any]: # type: ignore
513 try:
514 if path_type is bytes:
515 if isinstance(input_value, bytes):
516 try:
517 input_value = input_value.decode()
518 except UnicodeDecodeError as e:
519 raise PydanticCustomError('bytes_type', 'Input must be valid bytes') from e
520 else:
521 raise PydanticCustomError('bytes_type', 'Input must be bytes')
522 elif not isinstance(input_value, str):
523 raise PydanticCustomError('path_type', 'Input is not a valid path')
524
525 return path_constructor(input_value) # type: ignore
526 except TypeError as e:
527 raise PydanticCustomError('path_type', 'Input is not a valid path') from e
528
529 def ser_path(path: Any, info: core_schema.SerializationInfo) -> str | os.PathLike[Any]:
530 if not isinstance(path, (tp, str)):
531 raise PydanticSerializationUnexpectedValue(
532 f"Expected `{tp}` but got `{type(path)}` with value `'{path}'` - serialized value may not be as expected."
533 )
534 if info.mode == 'python':
535 return path
536 return str(path)
537
538 instance_schema = core_schema.json_or_python_schema(
539 json_schema=core_schema.no_info_after_validator_function(path_validator, lax_inner_schema),
540 python_schema=core_schema.is_instance_schema(tp),
541 )
542
543 schema = core_schema.lax_or_strict_schema(
544 lax_schema=core_schema.union_schema(
545 [
546 instance_schema,
547 core_schema.no_info_after_validator_function(path_validator, strict_inner_schema),
548 ],
549 custom_error_type='path_type',
550 custom_error_message=f'Input is not a valid path for {tp}',
551 ),
552 strict_schema=instance_schema,
553 serialization=core_schema.plain_serializer_function_ser_schema(ser_path, info_arg=True, when_used='always'),
554 metadata={'pydantic_js_functions': [lambda source, handler: {**handler(source), 'format': 'path'}]},
555 )
556 return schema
557
558 def _deque_schema(self, items_type: Any) -> CoreSchema:
559 from ._serializers import serialize_sequence_via_list
560 from ._validators import deque_validator
561
562 item_type_schema = self.generate_schema(items_type)
563
564 # we have to use a lax list schema here, because we need to validate the deque's
565 # items via a list schema, but it's ok if the deque itself is not a list
566 list_schema = core_schema.list_schema(item_type_schema, strict=False)
567
568 check_instance = core_schema.json_or_python_schema(
569 json_schema=list_schema,
570 python_schema=core_schema.is_instance_schema(collections.deque, cls_repr='Deque'),
571 )
572
573 lax_schema = core_schema.no_info_wrap_validator_function(deque_validator, list_schema)
574
575 return core_schema.lax_or_strict_schema(
576 lax_schema=lax_schema,
577 strict_schema=core_schema.chain_schema([check_instance, lax_schema]),
578 serialization=core_schema.wrap_serializer_function_ser_schema(
579 serialize_sequence_via_list, schema=item_type_schema, info_arg=True
580 ),
581 )
582
583 def _mapping_schema(self, tp: Any, keys_type: Any, values_type: Any) -> CoreSchema:
584 from ._validators import MAPPING_ORIGIN_MAP, defaultdict_validator, get_defaultdict_default_default_factory
585
586 mapped_origin = MAPPING_ORIGIN_MAP[tp]
587 keys_schema = self.generate_schema(keys_type)
588 with warnings.catch_warnings():
589 # We kind of abused `Field()` default factories to be able to specify
590 # the `defaultdict`'s `default_factory`. As a consequence, we get warnings
591 # as normally `FieldInfo.default_factory` is unsupported in the context where
592 # `Field()` is used and our only solution is to ignore them (note that this might
593 # wrongfully ignore valid warnings, e.g. if the `value_type` is a PEP 695 type alias
594 # with unsupported metadata).
595 warnings.simplefilter('ignore', category=UnsupportedFieldAttributeWarning)
596 values_schema = self.generate_schema(values_type)
597 dict_schema = core_schema.dict_schema(keys_schema, values_schema, strict=False)
598
599 if mapped_origin is dict:
600 schema = dict_schema
601 else:
602 check_instance = core_schema.json_or_python_schema(
603 json_schema=dict_schema,
604 python_schema=core_schema.is_instance_schema(mapped_origin),
605 )
606
607 if tp is collections.defaultdict:
608 default_default_factory = get_defaultdict_default_default_factory(values_type)
609 coerce_instance_wrap = partial(
610 core_schema.no_info_wrap_validator_function,
611 partial(defaultdict_validator, default_default_factory=default_default_factory),
612 )
613 else:
614 coerce_instance_wrap = partial(core_schema.no_info_after_validator_function, mapped_origin)
615
616 lax_schema = coerce_instance_wrap(dict_schema)
617 strict_schema = core_schema.chain_schema([check_instance, lax_schema])
618
619 schema = core_schema.lax_or_strict_schema(
620 lax_schema=lax_schema,
621 strict_schema=strict_schema,
622 serialization=core_schema.wrap_serializer_function_ser_schema(
623 lambda v, h: h(v), schema=dict_schema, info_arg=False
624 ),
625 )
626
627 return schema
628
629 def _fraction_schema(self) -> CoreSchema:
630 """Support for [`fractions.Fraction`][fractions.Fraction]."""
631 from ._validators import fraction_validator
632
633 # TODO: note, this is a fairly common pattern, re lax / strict for attempted type coercion,
634 # can we use a helper function to reduce boilerplate?
635 return core_schema.lax_or_strict_schema(
636 lax_schema=core_schema.no_info_plain_validator_function(fraction_validator),
637 strict_schema=core_schema.json_or_python_schema(
638 json_schema=core_schema.no_info_plain_validator_function(fraction_validator),
639 python_schema=core_schema.is_instance_schema(Fraction),
640 ),
641 # use str serialization to guarantee round trip behavior
642 serialization=core_schema.to_string_ser_schema(when_used='always'),
643 metadata={'pydantic_js_functions': [lambda _1, _2: {'type': 'string', 'format': 'fraction'}]},
644 )
645
646 def _arbitrary_type_schema(self, tp: Any) -> CoreSchema:
647 if not isinstance(tp, type):
648 warnings.warn(
649 f'{tp!r} is not a Python type (it may be an instance of an object),'
650 ' Pydantic will allow any object with no validation since we cannot even'
651 ' enforce that the input is an instance of the given type.'
652 ' To get rid of this error wrap the type with `pydantic.SkipValidation`.',
653 ArbitraryTypeWarning,
654 )
655 return core_schema.any_schema()
656 return core_schema.is_instance_schema(tp)
657
658 def _unknown_type_schema(self, obj: Any) -> CoreSchema:
659 raise PydanticSchemaGenerationError(
660 f'Unable to generate pydantic-core schema for {obj!r}. '
661 'Set `arbitrary_types_allowed=True` in the model_config to ignore this error'
662 ' or implement `__get_pydantic_core_schema__` on your type to fully support it.'
663 '\n\nIf you got this error by calling handler(<some type>) within'
664 ' `__get_pydantic_core_schema__` then you likely need to call'
665 ' `handler.generate_schema(<some type>)` since we do not call'
666 ' `__get_pydantic_core_schema__` on `<some type>` otherwise to avoid infinite recursion.'
667 )
668
669 def _apply_discriminator_to_union(
670 self, schema: CoreSchema, discriminator: str | Discriminator | None
671 ) -> CoreSchema:
672 if discriminator is None:
673 return schema
674 try:
675 return _discriminated_union.apply_discriminator(
676 schema,
677 discriminator,
678 self.defs._definitions,
679 )
680 except _discriminated_union.MissingDefinitionForUnionRef:
681 # defer until defs are resolved
682 _discriminated_union.set_discriminator_in_metadata(
683 schema,
684 discriminator,
685 )
686 return schema
687
688 def clean_schema(self, schema: CoreSchema) -> CoreSchema:
689 return self.defs.finalize_schema(schema)
690
691 def _add_js_function(self, metadata_schema: CoreSchema, js_function: Callable[..., Any]) -> None:
692 metadata = metadata_schema.get('metadata', {})
693 pydantic_js_functions = metadata.setdefault('pydantic_js_functions', [])
694 # because of how we generate core schemas for nested generic models
695 # we can end up adding `BaseModel.__get_pydantic_json_schema__` multiple times
696 # this check may fail to catch duplicates if the function is a `functools.partial`
697 # or something like that, but if it does it'll fail by inserting the duplicate
698 if js_function not in pydantic_js_functions:
699 pydantic_js_functions.append(js_function)
700 metadata_schema['metadata'] = metadata
701
702 def generate_schema(
703 self,
704 obj: Any,
705 ) -> core_schema.CoreSchema:
706 """Generate core schema.
707
708 Args:
709 obj: The object to generate core schema for.
710
711 Returns:
712 The generated core schema.
713
714 Raises:
715 PydanticUndefinedAnnotation:
716 If it is not possible to evaluate forward reference.
717 PydanticSchemaGenerationError:
718 If it is not possible to generate pydantic-core schema.
719 TypeError:
720 - If `alias_generator` returns a disallowed type (must be str, AliasPath or AliasChoices).
721 - If V1 style validator with `each_item=True` applied on a wrong field.
722 PydanticUserError:
723 - If `typing.TypedDict` is used instead of `typing_extensions.TypedDict` on Python < 3.12.
724 - If `__modify_schema__` method is used instead of `__get_pydantic_json_schema__`.
725 """
726 schema = self._generate_schema_from_get_schema_method(obj, obj)
727
728 if schema is None:
729 schema = self._generate_schema_inner(obj)
730
731 metadata_js_function = _extract_get_pydantic_json_schema(obj)
732 if metadata_js_function is not None:
733 metadata_schema = resolve_original_schema(schema, self.defs)
734 if metadata_schema:
735 self._add_js_function(metadata_schema, metadata_js_function)
736
737 schema = _add_custom_serialization_from_json_encoders(self._config_wrapper.json_encoders, obj, schema)
738
739 return schema
740
741 def _model_schema(self, cls: type[BaseModel]) -> core_schema.CoreSchema:
742 """Generate schema for a Pydantic model."""
743 BaseModel_ = import_cached_base_model()
744
745 with self.defs.get_schema_or_ref(cls) as (model_ref, maybe_schema):
746 if maybe_schema is not None:
747 return maybe_schema
748
749 schema = cls.__dict__.get('__pydantic_core_schema__')
750 if schema is not None and not isinstance(schema, MockCoreSchema):
751 if schema['type'] == 'definitions':
752 schema = self.defs.unpack_definitions(schema)
753 ref = get_ref(schema)
754 if ref:
755 return self.defs.create_definition_reference_schema(schema)
756 else:
757 return schema
758
759 config_wrapper = ConfigWrapper(cls.model_config, check=False)
760
761 with self._config_wrapper_stack.push(config_wrapper), self._ns_resolver.push(cls):
762 core_config = self._config_wrapper.core_config(title=cls.__name__)
763
764 if cls.__pydantic_fields_complete__ or cls is BaseModel_:
765 fields = getattr(cls, '__pydantic_fields__', {})
766 else:
767 if not hasattr(cls, '__pydantic_fields__'):
768 # This happens when we have a loop in the schema generation:
769 # class Base[T](BaseModel):
770 # t: T
771 #
772 # class Other(BaseModel):
773 # b: 'Base[Other]'
774 # When we build fields for `Other`, we evaluate the forward annotation.
775 # At this point, `Other` doesn't have the model fields set. We create
776 # `Base[Other]`; model fields are successfully built, and we try to generate
777 # a schema for `t: Other`. As `Other.__pydantic_fields__` aren't set, we abort.
778 raise PydanticUndefinedAnnotation(
779 name=cls.__name__,
780 message=f'Class {cls.__name__!r} is not defined',
781 )
782 try:
783 fields = rebuild_model_fields(
784 cls,
785 config_wrapper=self._config_wrapper,
786 ns_resolver=self._ns_resolver,
787 typevars_map=self._typevars_map or {},
788 )
789 except NameError as e:
790 raise PydanticUndefinedAnnotation.from_name_error(e) from e
791
792 decorators = cls.__pydantic_decorators__
793 computed_fields = decorators.computed_fields
794 check_decorator_fields_exist(
795 chain(
796 decorators.field_validators.values(),
797 decorators.field_serializers.values(),
798 decorators.validators.values(),
799 ),
800 {*fields.keys(), *computed_fields.keys()},
801 )
802
803 model_validators = decorators.model_validators.values()
804
805 extras_schema = None
806 extras_keys_schema = None
807 if core_config.get('extra_fields_behavior') == 'allow':
808 assert cls.__mro__[0] is cls
809 assert cls.__mro__[-1] is object
810 for candidate_cls in cls.__mro__[:-1]:
811 extras_annotation = getattr(candidate_cls, '__annotations__', {}).get(
812 '__pydantic_extra__', None
813 )
814 if extras_annotation is not None:
815 if isinstance(extras_annotation, str):
816 extras_annotation = _typing_extra.eval_type_backport(
817 _typing_extra._make_forward_ref(
818 extras_annotation, is_argument=False, is_class=True
819 ),
820 *self._types_namespace,
821 )
822 tp = get_origin(extras_annotation)
823 if tp not in DICT_TYPES:
824 raise PydanticSchemaGenerationError(
825 'The type annotation for `__pydantic_extra__` must be `dict[str, ...]`'
826 )
827 extra_keys_type, extra_items_type = self._get_args_resolving_forward_refs(
828 extras_annotation,
829 required=True,
830 )
831 if extra_keys_type is not str:
832 extras_keys_schema = self.generate_schema(extra_keys_type)
833 if not typing_objects.is_any(extra_items_type):
834 extras_schema = self.generate_schema(extra_items_type)
835 if extras_keys_schema is not None or extras_schema is not None:
836 break
837
838 generic_origin: type[BaseModel] | None = getattr(cls, '__pydantic_generic_metadata__', {}).get('origin')
839
840 if cls.__pydantic_root_model__:
841 # FIXME: should the common field metadata be used here?
842 inner_schema, _ = self._common_field_schema('root', fields['root'], decorators)
843 inner_schema = apply_model_validators(inner_schema, model_validators, 'inner')
844 model_schema = core_schema.model_schema(
845 cls,
846 inner_schema,
847 generic_origin=generic_origin,
848 custom_init=getattr(cls, '__pydantic_custom_init__', None),
849 root_model=True,
850 post_init=getattr(cls, '__pydantic_post_init__', None),
851 config=core_config,
852 ref=model_ref,
853 )
854 else:
855 fields_schema: core_schema.CoreSchema = core_schema.model_fields_schema(
856 {k: self._generate_md_field_schema(k, v, decorators) for k, v in fields.items()},
857 computed_fields=[
858 self._computed_field_schema(d, decorators.field_serializers)
859 for d in computed_fields.values()
860 ],
861 extras_schema=extras_schema,
862 extras_keys_schema=extras_keys_schema,
863 model_name=cls.__name__,
864 )
865 inner_schema = apply_validators(fields_schema, decorators.root_validators.values())
866 inner_schema = apply_model_validators(inner_schema, model_validators, 'inner')
867
868 model_schema = core_schema.model_schema(
869 cls,
870 inner_schema,
871 generic_origin=generic_origin,
872 custom_init=getattr(cls, '__pydantic_custom_init__', None),
873 root_model=False,
874 post_init=getattr(cls, '__pydantic_post_init__', None),
875 config=core_config,
876 ref=model_ref,
877 )
878
879 schema = self._apply_model_serializers(model_schema, decorators.model_serializers.values())
880 schema = apply_model_validators(schema, model_validators, 'outer')
881 return self.defs.create_definition_reference_schema(schema)
882
883 def _resolve_self_type(self, obj: Any) -> Any:
884 obj = self.model_type_stack.get()
885 if obj is None:
886 raise PydanticUserError('`typing.Self` is invalid in this context', code='invalid-self-type')
887 return obj
888
889 def _generate_schema_from_get_schema_method(self, obj: Any, source: Any) -> core_schema.CoreSchema | None:
890 BaseModel_ = import_cached_base_model()
891
892 get_schema = getattr(obj, '__get_pydantic_core_schema__', None)
893 is_base_model_get_schema = (
894 getattr(get_schema, '__func__', None) is BaseModel_.__get_pydantic_core_schema__.__func__ # pyright: ignore[reportFunctionMemberAccess]
895 )
896
897 if (
898 get_schema is not None
899 # BaseModel.__get_pydantic_core_schema__ is defined for backwards compatibility,
900 # to allow existing code to call `super().__get_pydantic_core_schema__` in Pydantic
901 # model that overrides `__get_pydantic_core_schema__`. However, it raises a deprecation
902 # warning stating that the method will be removed, and during the core schema gen we actually
903 # don't call the method:
904 and not is_base_model_get_schema
905 ):
906 # Some referenceable types might have a `__get_pydantic_core_schema__` method
907 # defined on it by users (e.g. on a dataclass). This generally doesn't play well
908 # as these types are already recognized by the `GenerateSchema` class and isn't ideal
909 # as we might end up calling `get_schema_or_ref` (expensive) on types that are actually
910 # not referenceable:
911 with self.defs.get_schema_or_ref(obj) as (_, maybe_schema):
912 if maybe_schema is not None:
913 return maybe_schema
914
915 if obj is source:
916 ref_mode = 'unpack'
917 else:
918 ref_mode = 'to-def'
919 schema = get_schema(
920 source, CallbackGetCoreSchemaHandler(self._generate_schema_inner, self, ref_mode=ref_mode)
921 )
922 if schema['type'] == 'definitions':
923 schema = self.defs.unpack_definitions(schema)
924
925 ref = get_ref(schema)
926 if ref:
927 return self.defs.create_definition_reference_schema(schema)
928
929 # Note: if schema is of type `'definition-ref'`, we might want to copy it as a
930 # safety measure (because these are inlined in place -- i.e. mutated directly)
931 return schema
932
933 if get_schema is None and (validators := getattr(obj, '__get_validators__', None)) is not None:
934 from pydantic.v1 import BaseModel as BaseModelV1
935
936 if issubclass(obj, BaseModelV1):
937 warnings.warn(
938 f'Mixing V1 models and V2 models (or constructs, like `TypeAdapter`) is not supported. Please upgrade `{obj.__name__}` to V2.',
939 UserWarning,
940 )
941 else:
942 warnings.warn(
943 '`__get_validators__` is deprecated and will be removed, use `__get_pydantic_core_schema__` instead.',
944 PydanticDeprecatedSince20,
945 )
946 return core_schema.chain_schema([core_schema.with_info_plain_validator_function(v) for v in validators()])
947
948 def _resolve_forward_ref(self, obj: Any) -> Any:
949 # we assume that types_namespace has the target of forward references in its scope,
950 # but this could fail, for example, if calling Validator on an imported type which contains
951 # forward references to other types only defined in the module from which it was imported
952 # `Validator(SomeImportedTypeAliasWithAForwardReference)`
953 # or the equivalent for BaseModel
954 # class Model(BaseModel):
955 # x: SomeImportedTypeAliasWithAForwardReference
956 try:
957 obj = _typing_extra.eval_type_backport(obj, *self._types_namespace)
958 except NameError as e:
959 raise PydanticUndefinedAnnotation.from_name_error(e) from e
960
961 # if obj is still a ForwardRef, it means we can't evaluate it, raise PydanticUndefinedAnnotation
962 if isinstance(obj, ForwardRef):
963 raise PydanticUndefinedAnnotation(obj.__forward_arg__, f'Unable to evaluate forward reference {obj}')
964
965 if self._typevars_map:
966 obj = replace_types(obj, self._typevars_map)
967
968 return obj
969
970 @overload
971 def _get_args_resolving_forward_refs(self, obj: Any, required: Literal[True]) -> tuple[Any, ...]: ...
972
973 @overload
974 def _get_args_resolving_forward_refs(self, obj: Any) -> tuple[Any, ...] | None: ...
975
976 def _get_args_resolving_forward_refs(self, obj: Any, required: bool = False) -> tuple[Any, ...] | None:
977 args = get_args(obj)
978 if args:
979 if isinstance(obj, GenericAlias):
980 # PEP 585 generic aliases don't convert args to ForwardRefs, unlike `typing.List/Dict` etc.
981 args = (_typing_extra._make_forward_ref(a) if isinstance(a, str) else a for a in args)
982 args = tuple(self._resolve_forward_ref(a) if isinstance(a, ForwardRef) else a for a in args)
983 elif required: # pragma: no cover
984 raise TypeError(f'Expected {obj} to have generic parameters but it had none')
985 return args
986
987 def _get_first_arg_or_any(self, obj: Any) -> Any:
988 args = self._get_args_resolving_forward_refs(obj)
989 if not args:
990 return Any
991 return args[0]
992
993 def _get_first_two_args_or_any(self, obj: Any) -> tuple[Any, Any]:
994 args = self._get_args_resolving_forward_refs(obj)
995 if not args:
996 return (Any, Any)
997 if len(args) < 2:
998 origin = get_origin(obj)
999 raise TypeError(f'Expected two type arguments for {origin}, got 1')
1000 return args[0], args[1]
1001
1002 def _generate_schema_inner(self, obj: Any) -> core_schema.CoreSchema:
1003 if typing_objects.is_self(obj):
1004 obj = self._resolve_self_type(obj)
1005
1006 if typing_objects.is_annotated(get_origin(obj)):
1007 return self._annotated_schema(obj)
1008
1009 if isinstance(obj, dict):
1010 # we assume this is already a valid schema
1011 return obj # type: ignore[return-value]
1012
1013 if isinstance(obj, str):
1014 obj = ForwardRef(obj)
1015
1016 if isinstance(obj, ForwardRef):
1017 return self.generate_schema(self._resolve_forward_ref(obj))
1018
1019 BaseModel = import_cached_base_model()
1020
1021 if lenient_issubclass(obj, BaseModel):
1022 with self.model_type_stack.push(obj):
1023 return self._model_schema(obj)
1024
1025 if isinstance(obj, PydanticRecursiveRef):
1026 return core_schema.definition_reference_schema(schema_ref=obj.type_ref)
1027
1028 return self.match_type(obj)
1029
1030 def match_type(self, obj: Any) -> core_schema.CoreSchema: # noqa: C901
1031 """Main mapping of types to schemas.
1032
1033 The general structure is a series of if statements starting with the simple cases
1034 (non-generic primitive types) and then handling generics and other more complex cases.
1035
1036 Each case either generates a schema directly, calls into a public user-overridable method
1037 (like `GenerateSchema.tuple_variable_schema`) or calls into a private method that handles some
1038 boilerplate before calling into the user-facing method (e.g. `GenerateSchema._tuple_schema`).
1039
1040 The idea is that we'll evolve this into adding more and more user facing methods over time
1041 as they get requested and we figure out what the right API for them is.
1042 """
1043 if obj is str:
1044 return core_schema.str_schema()
1045 elif obj is bytes:
1046 return core_schema.bytes_schema()
1047 elif obj is int:
1048 return core_schema.int_schema()
1049 elif obj is float:
1050 return core_schema.float_schema()
1051 elif obj is bool:
1052 return core_schema.bool_schema()
1053 elif obj is complex:
1054 return core_schema.complex_schema()
1055 elif typing_objects.is_any(obj) or obj is object:
1056 return core_schema.any_schema()
1057 elif obj is datetime.date:
1058 return core_schema.date_schema()
1059 elif obj is datetime.datetime:
1060 return core_schema.datetime_schema()
1061 elif obj is datetime.time:
1062 return core_schema.time_schema()
1063 elif obj is datetime.timedelta:
1064 return core_schema.timedelta_schema()
1065 elif obj is Decimal:
1066 return core_schema.decimal_schema()
1067 elif obj is UUID:
1068 return core_schema.uuid_schema()
1069 elif obj is Url:
1070 return core_schema.url_schema()
1071 elif obj is Fraction:
1072 return self._fraction_schema()
1073 elif obj is MultiHostUrl:
1074 return core_schema.multi_host_url_schema()
1075 elif obj is None or obj is _typing_extra.NoneType:
1076 return core_schema.none_schema()
1077 if obj is MISSING:
1078 return core_schema.missing_sentinel_schema()
1079 elif obj in IP_TYPES:
1080 return self._ip_schema(obj)
1081 elif obj in TUPLE_TYPES:
1082 return self._tuple_schema(obj)
1083 elif obj in LIST_TYPES:
1084 return self._list_schema(Any)
1085 elif obj in SET_TYPES:
1086 return self._set_schema(Any)
1087 elif obj in FROZEN_SET_TYPES:
1088 return self._frozenset_schema(Any)
1089 elif obj in SEQUENCE_TYPES:
1090 return self._sequence_schema(Any)
1091 elif obj in ITERABLE_TYPES:
1092 return self._iterable_schema(obj)
1093 elif obj in DICT_TYPES:
1094 return self._dict_schema(Any, Any)
1095 elif obj in PATH_TYPES:
1096 return self._path_schema(obj, Any)
1097 elif obj in DEQUE_TYPES:
1098 return self._deque_schema(Any)
1099 elif obj in MAPPING_TYPES:
1100 return self._mapping_schema(obj, Any, Any)
1101 elif obj in COUNTER_TYPES:
1102 return self._mapping_schema(obj, Any, int)
1103 elif typing_objects.is_typealiastype(obj):
1104 return self._type_alias_type_schema(obj)
1105 elif obj is type:
1106 return self._type_schema()
1107 elif _typing_extra.is_callable(obj):
1108 return core_schema.callable_schema()
1109 elif typing_objects.is_literal(get_origin(obj)):
1110 return self._literal_schema(obj)
1111 elif is_typeddict(obj):
1112 return self._typed_dict_schema(obj, None)
1113 elif _typing_extra.is_namedtuple(obj):
1114 return self._namedtuple_schema(obj, None)
1115 elif typing_objects.is_newtype(obj):
1116 # NewType, can't use isinstance because it fails <3.10
1117 return self.generate_schema(obj.__supertype__)
1118 elif obj in PATTERN_TYPES:
1119 return self._pattern_schema(obj)
1120 elif _typing_extra.is_hashable(obj):
1121 return self._hashable_schema()
1122 elif isinstance(obj, typing.TypeVar):
1123 return self._unsubstituted_typevar_schema(obj)
1124 elif _typing_extra.is_finalvar(obj):
1125 if obj is Final:
1126 return core_schema.any_schema()
1127 return self.generate_schema(
1128 self._get_first_arg_or_any(obj),
1129 )
1130 elif isinstance(obj, VALIDATE_CALL_SUPPORTED_TYPES):
1131 return self._call_schema(obj)
1132 elif inspect.isclass(obj) and issubclass(obj, Enum):
1133 return self._enum_schema(obj)
1134 elif obj is ZoneInfo:
1135 return self._zoneinfo_schema()
1136
1137 # dataclasses.is_dataclass coerces dc instances to types, but we only handle
1138 # the case of a dc type here
1139 if dataclasses.is_dataclass(obj):
1140 return self._dataclass_schema(obj, None) # pyright: ignore[reportArgumentType]
1141
1142 origin = get_origin(obj)
1143 if origin is not None:
1144 return self._match_generic_type(obj, origin)
1145
1146 if self._arbitrary_types:
1147 return self._arbitrary_type_schema(obj)
1148 return self._unknown_type_schema(obj)
1149
1150 def _match_generic_type(self, obj: Any, origin: Any) -> CoreSchema: # noqa: C901
1151 # Need to handle generic dataclasses before looking for the schema properties because attribute accesses
1152 # on _GenericAlias delegate to the origin type, so lose the information about the concrete parametrization
1153 # As a result, currently, there is no way to cache the schema for generic dataclasses. This may be possible
1154 # to resolve by modifying the value returned by `Generic.__class_getitem__`, but that is a dangerous game.
1155 if dataclasses.is_dataclass(origin):
1156 return self._dataclass_schema(obj, origin) # pyright: ignore[reportArgumentType]
1157 if _typing_extra.is_namedtuple(origin):
1158 return self._namedtuple_schema(obj, origin)
1159
1160 schema = self._generate_schema_from_get_schema_method(origin, obj)
1161 if schema is not None:
1162 return schema
1163
1164 if typing_objects.is_typealiastype(origin):
1165 return self._type_alias_type_schema(obj)
1166 elif is_union_origin(origin):
1167 return self._union_schema(obj)
1168 elif origin in TUPLE_TYPES:
1169 return self._tuple_schema(obj)
1170 elif origin in LIST_TYPES:
1171 return self._list_schema(self._get_first_arg_or_any(obj))
1172 elif origin in SET_TYPES:
1173 return self._set_schema(self._get_first_arg_or_any(obj))
1174 elif origin in FROZEN_SET_TYPES:
1175 return self._frozenset_schema(self._get_first_arg_or_any(obj))
1176 elif origin in DICT_TYPES:
1177 return self._dict_schema(*self._get_first_two_args_or_any(obj))
1178 elif origin in PATH_TYPES:
1179 return self._path_schema(origin, self._get_first_arg_or_any(obj))
1180 elif origin in DEQUE_TYPES:
1181 return self._deque_schema(self._get_first_arg_or_any(obj))
1182 elif origin in MAPPING_TYPES:
1183 return self._mapping_schema(origin, *self._get_first_two_args_or_any(obj))
1184 elif origin in COUNTER_TYPES:
1185 return self._mapping_schema(origin, self._get_first_arg_or_any(obj), int)
1186 elif is_typeddict(origin):
1187 return self._typed_dict_schema(obj, origin)
1188 elif origin in TYPE_TYPES:
1189 return self._subclass_schema(obj)
1190 elif origin in SEQUENCE_TYPES:
1191 return self._sequence_schema(self._get_first_arg_or_any(obj))
1192 elif origin in ITERABLE_TYPES:
1193 return self._iterable_schema(obj)
1194 elif origin in PATTERN_TYPES:
1195 return self._pattern_schema(obj)
1196
1197 if self._arbitrary_types:
1198 return self._arbitrary_type_schema(origin)
1199 return self._unknown_type_schema(obj)
1200
1201 def _generate_td_field_schema(
1202 self,
1203 name: str,
1204 field_info: FieldInfo,
1205 decorators: DecoratorInfos,
1206 *,
1207 required: bool = True,
1208 ) -> core_schema.TypedDictField:
1209 """Prepare a TypedDictField to represent a model or typeddict field."""
1210 schema, metadata = self._common_field_schema(name, field_info, decorators)
1211 return core_schema.typed_dict_field(
1212 schema,
1213 required=False if not field_info.is_required() else required,
1214 serialization_exclude=field_info.exclude,
1215 validation_alias=_convert_to_aliases(field_info.validation_alias),
1216 serialization_alias=field_info.serialization_alias,
1217 serialization_exclude_if=field_info.exclude_if,
1218 metadata=metadata,
1219 )
1220
1221 def _generate_md_field_schema(
1222 self,
1223 name: str,
1224 field_info: FieldInfo,
1225 decorators: DecoratorInfos,
1226 ) -> core_schema.ModelField:
1227 """Prepare a ModelField to represent a model field."""
1228 schema, metadata = self._common_field_schema(name, field_info, decorators)
1229 return core_schema.model_field(
1230 schema,
1231 serialization_exclude=field_info.exclude,
1232 validation_alias=_convert_to_aliases(field_info.validation_alias),
1233 serialization_alias=field_info.serialization_alias,
1234 serialization_exclude_if=field_info.exclude_if,
1235 frozen=field_info.frozen,
1236 metadata=metadata,
1237 )
1238
1239 def _generate_dc_field_schema(
1240 self,
1241 name: str,
1242 field_info: FieldInfo,
1243 decorators: DecoratorInfos,
1244 ) -> core_schema.DataclassField:
1245 """Prepare a DataclassField to represent the parameter/field, of a dataclass."""
1246 schema, metadata = self._common_field_schema(name, field_info, decorators)
1247 return core_schema.dataclass_field(
1248 name,
1249 schema,
1250 init=field_info.init,
1251 init_only=field_info.init_var or None,
1252 kw_only=None if field_info.kw_only else False,
1253 serialization_exclude=field_info.exclude,
1254 validation_alias=_convert_to_aliases(field_info.validation_alias),
1255 serialization_alias=field_info.serialization_alias,
1256 serialization_exclude_if=field_info.exclude_if,
1257 frozen=field_info.frozen,
1258 metadata=metadata,
1259 )
1260
1261 def _common_field_schema( # C901
1262 self, name: str, field_info: FieldInfo, decorators: DecoratorInfos
1263 ) -> tuple[CoreSchema, dict[str, Any]]:
1264 source_type, annotations = field_info.annotation, field_info.metadata
1265
1266 def set_discriminator(schema: CoreSchema) -> CoreSchema:
1267 schema = self._apply_discriminator_to_union(schema, field_info.discriminator)
1268 return schema
1269
1270 # Convert `@field_validator` decorators to `Before/After/Plain/WrapValidator` instances:
1271 validators_from_decorators = [
1272 _mode_to_validator[decorator.info.mode]._from_decorator(decorator)
1273 for decorator in filter_field_decorator_info_by_field(decorators.field_validators.values(), name)
1274 ]
1275
1276 with self.field_name_stack.push(name):
1277 if field_info.discriminator is not None:
1278 schema = self._apply_annotations(
1279 source_type, annotations + validators_from_decorators, transform_inner_schema=set_discriminator
1280 )
1281 else:
1282 schema = self._apply_annotations(
1283 source_type,
1284 annotations + validators_from_decorators,
1285 )
1286
1287 # This V1 compatibility shim should eventually be removed
1288 # push down any `each_item=True` validators
1289 # note that this won't work for any Annotated types that get wrapped by a function validator
1290 # but that's okay because that didn't exist in V1
1291 this_field_validators = filter_field_decorator_info_by_field(decorators.validators.values(), name)
1292 if _validators_require_validate_default(this_field_validators):
1293 field_info.validate_default = True
1294 each_item_validators = [v for v in this_field_validators if v.info.each_item is True]
1295 this_field_validators = [v for v in this_field_validators if v not in each_item_validators]
1296 schema = apply_each_item_validators(schema, each_item_validators)
1297
1298 schema = apply_validators(schema, this_field_validators)
1299
1300 # the default validator needs to go outside of any other validators
1301 # so that it is the topmost validator for the field validator
1302 # which uses it to check if the field has a default value or not
1303 if not field_info.is_required():
1304 schema = wrap_default(field_info, schema)
1305
1306 schema = self._apply_field_serializers(
1307 schema, filter_field_decorator_info_by_field(decorators.field_serializers.values(), name)
1308 )
1309
1310 pydantic_js_updates, pydantic_js_extra = _extract_json_schema_info_from_field_info(field_info)
1311 core_metadata: dict[str, Any] = {}
1312 update_core_metadata(
1313 core_metadata, pydantic_js_updates=pydantic_js_updates, pydantic_js_extra=pydantic_js_extra
1314 )
1315
1316 return schema, core_metadata
1317
1318 def _union_schema(self, union_type: Any) -> core_schema.CoreSchema:
1319 """Generate schema for a Union."""
1320 args = self._get_args_resolving_forward_refs(union_type, required=True)
1321 choices: list[CoreSchema] = []
1322 nullable = False
1323 for arg in args:
1324 if arg is None or arg is _typing_extra.NoneType:
1325 nullable = True
1326 else:
1327 choices.append(self.generate_schema(arg))
1328
1329 if len(choices) == 1:
1330 s = choices[0]
1331 else:
1332 choices_with_tags: list[CoreSchema | tuple[CoreSchema, str]] = []
1333 for choice in choices:
1334 tag = cast(CoreMetadata, choice.get('metadata', {})).get('pydantic_internal_union_tag_key')
1335 if tag is not None:
1336 choices_with_tags.append((choice, tag))
1337 else:
1338 choices_with_tags.append(choice)
1339 s = core_schema.union_schema(choices_with_tags)
1340
1341 if nullable:
1342 s = core_schema.nullable_schema(s)
1343 return s
1344
1345 def _type_alias_type_schema(self, obj: TypeAliasType) -> CoreSchema:
1346 with self.defs.get_schema_or_ref(obj) as (ref, maybe_schema):
1347 if maybe_schema is not None:
1348 return maybe_schema
1349
1350 origin: TypeAliasType = get_origin(obj) or obj
1351 typevars_map = get_standard_typevars_map(obj)
1352
1353 with self._ns_resolver.push(origin):
1354 try:
1355 annotation = _typing_extra.eval_type(origin.__value__, *self._types_namespace)
1356 except NameError as e:
1357 raise PydanticUndefinedAnnotation.from_name_error(e) from e
1358 annotation = replace_types(annotation, typevars_map)
1359 schema = self.generate_schema(annotation)
1360 assert schema['type'] != 'definitions'
1361 schema['ref'] = ref # type: ignore
1362 return self.defs.create_definition_reference_schema(schema)
1363
1364 def _literal_schema(self, literal_type: Any) -> CoreSchema:
1365 """Generate schema for a Literal."""
1366 expected = list(get_literal_values(literal_type, type_check=False, unpack_type_aliases='eager'))
1367 assert expected, f'literal "expected" cannot be empty, obj={literal_type}'
1368 schema = core_schema.literal_schema(expected)
1369
1370 if self._config_wrapper.use_enum_values and any(isinstance(v, Enum) for v in expected):
1371 schema = core_schema.no_info_after_validator_function(
1372 lambda v: v.value if isinstance(v, Enum) else v, schema
1373 )
1374
1375 return schema
1376
1377 def _typed_dict_schema(self, typed_dict_cls: Any, origin: Any) -> core_schema.CoreSchema:
1378 """Generate a core schema for a `TypedDict` class.
1379
1380 To be able to build a `DecoratorInfos` instance for the `TypedDict` class (which will include
1381 validators, serializers, etc.), we need to have access to the original bases of the class
1382 (see https://docs.python.org/3/library/types.html#types.get_original_bases).
1383 However, the `__orig_bases__` attribute was only added in 3.12 (https://github.com/python/cpython/pull/103698).
1384
1385 For this reason, we require Python 3.12 (or using the `typing_extensions` backport).
1386 """
1387 FieldInfo = import_cached_field_info()
1388
1389 with (
1390 self.model_type_stack.push(typed_dict_cls),
1391 self.defs.get_schema_or_ref(typed_dict_cls) as (
1392 typed_dict_ref,
1393 maybe_schema,
1394 ),
1395 ):
1396 if maybe_schema is not None:
1397 return maybe_schema
1398
1399 typevars_map = get_standard_typevars_map(typed_dict_cls)
1400 if origin is not None:
1401 typed_dict_cls = origin
1402
1403 if not _SUPPORTS_TYPEDDICT and type(typed_dict_cls).__module__ == 'typing':
1404 raise PydanticUserError(
1405 'Please use `typing_extensions.TypedDict` instead of `typing.TypedDict` on Python < 3.12.',
1406 code='typed-dict-version',
1407 )
1408
1409 try:
1410 # if a typed dictionary class doesn't have config, we use the parent's config, hence a default of `None`
1411 # see https://github.com/pydantic/pydantic/issues/10917
1412 config: ConfigDict | None = get_attribute_from_bases(typed_dict_cls, '__pydantic_config__')
1413 except AttributeError:
1414 config = None
1415
1416 with self._config_wrapper_stack.push(config):
1417 core_config = self._config_wrapper.core_config(title=typed_dict_cls.__name__)
1418
1419 required_keys: frozenset[str] = typed_dict_cls.__required_keys__
1420
1421 fields: dict[str, core_schema.TypedDictField] = {}
1422
1423 decorators = DecoratorInfos.build(typed_dict_cls)
1424 decorators.update_from_config(self._config_wrapper)
1425
1426 if self._config_wrapper.use_attribute_docstrings:
1427 field_docstrings = extract_docstrings_from_cls(typed_dict_cls, use_inspect=True)
1428 else:
1429 field_docstrings = None
1430
1431 try:
1432 annotations = _typing_extra.get_cls_type_hints(typed_dict_cls, ns_resolver=self._ns_resolver)
1433 except NameError as e:
1434 raise PydanticUndefinedAnnotation.from_name_error(e) from e
1435
1436 readonly_fields: list[str] = []
1437
1438 for field_name, annotation in annotations.items():
1439 field_info = FieldInfo.from_annotation(annotation, _source=AnnotationSource.TYPED_DICT)
1440 field_info.annotation = replace_types(field_info.annotation, typevars_map)
1441
1442 required = (
1443 field_name in required_keys or 'required' in field_info._qualifiers
1444 ) and 'not_required' not in field_info._qualifiers
1445 if 'read_only' in field_info._qualifiers:
1446 readonly_fields.append(field_name)
1447
1448 if (
1449 field_docstrings is not None
1450 and field_info.description is None
1451 and field_name in field_docstrings
1452 ):
1453 field_info.description = field_docstrings[field_name]
1454 update_field_from_config(self._config_wrapper, field_name, field_info)
1455
1456 fields[field_name] = self._generate_td_field_schema(
1457 field_name, field_info, decorators, required=required
1458 )
1459
1460 if readonly_fields:
1461 fields_repr = ', '.join(repr(f) for f in readonly_fields)
1462 plural = len(readonly_fields) >= 2
1463 warnings.warn(
1464 f'Item{"s" if plural else ""} {fields_repr} on TypedDict class {typed_dict_cls.__name__!r} '
1465 f'{"are" if plural else "is"} using the `ReadOnly` qualifier. Pydantic will not protect items '
1466 'from any mutation on dictionary instances.',
1467 UserWarning,
1468 )
1469
1470 extra_behavior: core_schema.ExtraBehavior = 'ignore'
1471 extras_schema: CoreSchema | None = None # For 'allow', equivalent to `Any` - no validation performed.
1472
1473 # `__closed__` is `None` when not specified (equivalent to `False`):
1474 is_closed = bool(getattr(typed_dict_cls, '__closed__', False))
1475 extra_items = getattr(typed_dict_cls, '__extra_items__', typing_extensions.NoExtraItems)
1476 if is_closed:
1477 extra_behavior = 'forbid'
1478 extras_schema = None
1479 elif not typing_objects.is_noextraitems(extra_items):
1480 extra_behavior = 'allow'
1481 extras_schema = self.generate_schema(replace_types(extra_items, typevars_map))
1482
1483 if (config_extra := self._config_wrapper.extra) in ('allow', 'forbid'):
1484 if is_closed and config_extra == 'allow':
1485 warnings.warn(
1486 f"TypedDict class {typed_dict_cls.__qualname__!r} is closed, but 'extra' configuration "
1487 "is set to `'allow'`. The 'extra' configuration value will be ignored.",
1488 category=TypedDictExtraConfigWarning,
1489 )
1490 elif not typing_objects.is_noextraitems(extra_items) and config_extra == 'forbid':
1491 warnings.warn(
1492 f"TypedDict class {typed_dict_cls.__qualname__!r} allows extra items, but 'extra' configuration "
1493 "is set to `'forbid'`. The 'extra' configuration value will be ignored.",
1494 category=TypedDictExtraConfigWarning,
1495 )
1496 else:
1497 extra_behavior = config_extra
1498
1499 td_schema = core_schema.typed_dict_schema(
1500 fields,
1501 cls=typed_dict_cls,
1502 computed_fields=[
1503 self._computed_field_schema(d, decorators.field_serializers)
1504 for d in decorators.computed_fields.values()
1505 ],
1506 extra_behavior=extra_behavior,
1507 extras_schema=extras_schema,
1508 ref=typed_dict_ref,
1509 config=core_config,
1510 )
1511
1512 schema = self._apply_model_serializers(td_schema, decorators.model_serializers.values())
1513 schema = apply_model_validators(schema, decorators.model_validators.values(), 'all')
1514 return self.defs.create_definition_reference_schema(schema)
1515
1516 def _namedtuple_schema(self, namedtuple_cls: Any, origin: Any) -> core_schema.CoreSchema:
1517 """Generate schema for a NamedTuple."""
1518 with (
1519 self.model_type_stack.push(namedtuple_cls),
1520 self.defs.get_schema_or_ref(namedtuple_cls) as (
1521 namedtuple_ref,
1522 maybe_schema,
1523 ),
1524 ):
1525 if maybe_schema is not None:
1526 return maybe_schema
1527 typevars_map = get_standard_typevars_map(namedtuple_cls)
1528 if origin is not None:
1529 namedtuple_cls = origin
1530
1531 try:
1532 annotations = _typing_extra.get_cls_type_hints(namedtuple_cls, ns_resolver=self._ns_resolver)
1533 except NameError as e:
1534 raise PydanticUndefinedAnnotation.from_name_error(e) from e
1535 if not annotations:
1536 # annotations is empty, happens if namedtuple_cls defined via collections.namedtuple(...)
1537 annotations: dict[str, Any] = dict.fromkeys(namedtuple_cls._fields, Any)
1538
1539 if typevars_map:
1540 annotations = {
1541 field_name: replace_types(annotation, typevars_map)
1542 for field_name, annotation in annotations.items()
1543 }
1544
1545 arguments_schema = core_schema.arguments_schema(
1546 [
1547 self._generate_parameter_schema(
1548 field_name,
1549 annotation,
1550 source=AnnotationSource.NAMED_TUPLE,
1551 default=namedtuple_cls._field_defaults.get(field_name, Parameter.empty),
1552 )
1553 for field_name, annotation in annotations.items()
1554 ],
1555 metadata={'pydantic_js_prefer_positional_arguments': True},
1556 )
1557 schema = core_schema.call_schema(arguments_schema, namedtuple_cls, ref=namedtuple_ref)
1558 return self.defs.create_definition_reference_schema(schema)
1559
1560 def _generate_parameter_schema(
1561 self,
1562 name: str,
1563 annotation: type[Any],
1564 source: AnnotationSource,
1565 default: Any = Parameter.empty,
1566 mode: Literal['positional_only', 'positional_or_keyword', 'keyword_only'] | None = None,
1567 ) -> core_schema.ArgumentsParameter:
1568 """Generate the definition of a field in a namedtuple or a parameter in a function signature.
1569
1570 This definition is meant to be used for the `'arguments'` core schema, which will be replaced
1571 in V3 by the `'arguments-v3`'.
1572 """
1573 FieldInfo = import_cached_field_info()
1574
1575 if default is Parameter.empty:
1576 field = FieldInfo.from_annotation(annotation, _source=source)
1577 else:
1578 field = FieldInfo.from_annotated_attribute(annotation, default, _source=source)
1579
1580 assert field.annotation is not None, 'field.annotation should not be None when generating a schema'
1581 update_field_from_config(self._config_wrapper, name, field)
1582
1583 with self.field_name_stack.push(name):
1584 schema = self._apply_annotations(
1585 field.annotation,
1586 [field],
1587 # Because we pass `field` as metadata above (required for attributes relevant for
1588 # JSON Scheme generation), we need to ignore the potential warnings about `FieldInfo`
1589 # attributes that will not be used:
1590 check_unsupported_field_info_attributes=False,
1591 )
1592
1593 if not field.is_required():
1594 schema = wrap_default(field, schema)
1595
1596 parameter_schema = core_schema.arguments_parameter(
1597 name,
1598 schema,
1599 mode=mode,
1600 alias=_convert_to_aliases(field.validation_alias),
1601 )
1602
1603 return parameter_schema
1604
1605 def _generate_parameter_v3_schema(
1606 self,
1607 name: str,
1608 annotation: Any,
1609 source: AnnotationSource,
1610 mode: Literal[
1611 'positional_only',
1612 'positional_or_keyword',
1613 'keyword_only',
1614 'var_args',
1615 'var_kwargs_uniform',
1616 'var_kwargs_unpacked_typed_dict',
1617 ],
1618 default: Any = Parameter.empty,
1619 ) -> core_schema.ArgumentsV3Parameter:
1620 """Generate the definition of a parameter in a function signature.
1621
1622 This definition is meant to be used for the `'arguments-v3'` core schema, which will replace
1623 the `'arguments`' schema in V3.
1624 """
1625 FieldInfo = import_cached_field_info()
1626
1627 if default is Parameter.empty:
1628 field = FieldInfo.from_annotation(annotation, _source=source)
1629 else:
1630 field = FieldInfo.from_annotated_attribute(annotation, default, _source=source)
1631 update_field_from_config(self._config_wrapper, name, field)
1632
1633 with self.field_name_stack.push(name):
1634 schema = self._apply_annotations(
1635 field.annotation,
1636 [field],
1637 # Because we pass `field` as metadata above (required for attributes relevant for
1638 # JSON Scheme generation), we need to ignore the potential warnings about `FieldInfo`
1639 # attributes that will not be used:
1640 check_unsupported_field_info_attributes=False,
1641 )
1642
1643 if not field.is_required():
1644 schema = wrap_default(field, schema)
1645
1646 parameter_schema = core_schema.arguments_v3_parameter(
1647 name=name,
1648 schema=schema,
1649 mode=mode,
1650 alias=_convert_to_aliases(field.validation_alias),
1651 )
1652
1653 return parameter_schema
1654
1655 def _tuple_schema(self, tuple_type: Any) -> core_schema.CoreSchema:
1656 """Generate schema for a Tuple, e.g. `tuple[int, str]` or `tuple[int, ...]`."""
1657 # TODO: do we really need to resolve type vars here?
1658 typevars_map = get_standard_typevars_map(tuple_type)
1659 params = self._get_args_resolving_forward_refs(tuple_type)
1660
1661 if typevars_map and params:
1662 params = tuple(replace_types(param, typevars_map) for param in params)
1663
1664 # NOTE: subtle difference: `tuple[()]` gives `params=()`, whereas `typing.Tuple[()]` gives `params=((),)`
1665 # This is only true for <3.11, on Python 3.11+ `typing.Tuple[()]` gives `params=()`
1666 if not params:
1667 if tuple_type in TUPLE_TYPES:
1668 return core_schema.tuple_schema([core_schema.any_schema()], variadic_item_index=0)
1669 else:
1670 # special case for `tuple[()]` which means `tuple[]` - an empty tuple
1671 return core_schema.tuple_schema([])
1672 elif params[-1] is Ellipsis:
1673 if len(params) == 2:
1674 return core_schema.tuple_schema([self.generate_schema(params[0])], variadic_item_index=0)
1675 else:
1676 # TODO: something like https://github.com/pydantic/pydantic/issues/5952
1677 raise ValueError('Variable tuples can only have one type')
1678 elif len(params) == 1 and params[0] == ():
1679 # special case for `tuple[()]` which means `tuple[]` - an empty tuple
1680 # NOTE: This conditional can be removed when we drop support for Python 3.10.
1681 return core_schema.tuple_schema([])
1682 else:
1683 return core_schema.tuple_schema([self.generate_schema(param) for param in params])
1684
1685 def _type_schema(self) -> core_schema.CoreSchema:
1686 return core_schema.custom_error_schema(
1687 core_schema.is_instance_schema(type),
1688 custom_error_type='is_type',
1689 custom_error_message='Input should be a type',
1690 )
1691
1692 def _zoneinfo_schema(self) -> core_schema.CoreSchema:
1693 """Generate schema for a zone_info.ZoneInfo object"""
1694 from ._validators import validate_str_is_valid_iana_tz
1695
1696 metadata = {'pydantic_js_functions': [lambda _1, _2: {'type': 'string', 'format': 'zoneinfo'}]}
1697 return core_schema.no_info_plain_validator_function(
1698 validate_str_is_valid_iana_tz,
1699 serialization=core_schema.to_string_ser_schema(),
1700 metadata=metadata,
1701 )
1702
1703 def _union_is_subclass_schema(self, union_type: Any) -> core_schema.CoreSchema:
1704 """Generate schema for `type[Union[X, ...]]`."""
1705 args = self._get_args_resolving_forward_refs(union_type, required=True)
1706 return core_schema.union_schema([self.generate_schema(type[args]) for args in args])
1707
1708 def _subclass_schema(self, type_: Any) -> core_schema.CoreSchema:
1709 """Generate schema for a type, e.g. `type[int]`."""
1710 type_param = self._get_first_arg_or_any(type_)
1711
1712 # Assume `type[Annotated[<typ>, ...]]` is equivalent to `type[<typ>]`:
1713 type_param = _typing_extra.annotated_type(type_param) or type_param
1714
1715 if typing_objects.is_any(type_param):
1716 return self._type_schema()
1717 elif typing_objects.is_typealiastype(type_param):
1718 return self.generate_schema(type[type_param.__value__])
1719 elif typing_objects.is_typevar(type_param):
1720 if type_param.__bound__:
1721 if is_union_origin(get_origin(type_param.__bound__)):
1722 return self._union_is_subclass_schema(type_param.__bound__)
1723 return core_schema.is_subclass_schema(type_param.__bound__)
1724 elif type_param.__constraints__:
1725 return core_schema.union_schema([self.generate_schema(type[c]) for c in type_param.__constraints__])
1726 else:
1727 return self._type_schema()
1728 elif is_union_origin(get_origin(type_param)):
1729 return self._union_is_subclass_schema(type_param)
1730 else:
1731 if typing_objects.is_self(type_param):
1732 type_param = self._resolve_self_type(type_param)
1733 if _typing_extra.is_generic_alias(type_param):
1734 raise PydanticUserError(
1735 'Subscripting `type[]` with an already parametrized type is not supported. '
1736 f'Instead of using type[{type_param!r}], use type[{_repr.display_as_type(get_origin(type_param))}].',
1737 code=None,
1738 )
1739 if not inspect.isclass(type_param):
1740 # when using type[None], this doesn't type convert to type[NoneType], and None isn't a class
1741 # so we handle it manually here
1742 if type_param is None:
1743 return core_schema.is_subclass_schema(_typing_extra.NoneType)
1744 raise TypeError(f'Expected a class, got {type_param!r}')
1745 return core_schema.is_subclass_schema(type_param)
1746
1747 def _sequence_schema(self, items_type: Any) -> core_schema.CoreSchema:
1748 """Generate schema for a Sequence, e.g. `Sequence[int]`."""
1749 from ._serializers import serialize_sequence_via_list
1750
1751 item_type_schema = self.generate_schema(items_type)
1752 list_schema = core_schema.list_schema(item_type_schema)
1753
1754 json_schema = smart_deepcopy(list_schema)
1755 python_schema = core_schema.is_instance_schema(typing.Sequence, cls_repr='Sequence')
1756 if not typing_objects.is_any(items_type):
1757 from ._validators import sequence_validator
1758
1759 python_schema = core_schema.chain_schema(
1760 [python_schema, core_schema.no_info_wrap_validator_function(sequence_validator, list_schema)],
1761 )
1762
1763 serialization = core_schema.wrap_serializer_function_ser_schema(
1764 serialize_sequence_via_list, schema=item_type_schema, info_arg=True
1765 )
1766 return core_schema.json_or_python_schema(
1767 json_schema=json_schema, python_schema=python_schema, serialization=serialization
1768 )
1769
1770 def _iterable_schema(self, type_: Any) -> core_schema.GeneratorSchema:
1771 """Generate a schema for an `Iterable`."""
1772 item_type = self._get_first_arg_or_any(type_)
1773
1774 return core_schema.generator_schema(self.generate_schema(item_type))
1775
1776 def _pattern_schema(self, pattern_type: Any) -> core_schema.CoreSchema:
1777 from . import _validators
1778
1779 metadata = {'pydantic_js_functions': [lambda _1, _2: {'type': 'string', 'format': 'regex'}]}
1780 ser = core_schema.plain_serializer_function_ser_schema(
1781 attrgetter('pattern'), when_used='json', return_schema=core_schema.str_schema()
1782 )
1783 if pattern_type is typing.Pattern or pattern_type is re.Pattern:
1784 # bare type
1785 return core_schema.no_info_plain_validator_function(
1786 _validators.pattern_either_validator, serialization=ser, metadata=metadata
1787 )
1788
1789 param = self._get_args_resolving_forward_refs(
1790 pattern_type,
1791 required=True,
1792 )[0]
1793 if param is str:
1794 return core_schema.no_info_plain_validator_function(
1795 _validators.pattern_str_validator, serialization=ser, metadata=metadata
1796 )
1797 elif param is bytes:
1798 return core_schema.no_info_plain_validator_function(
1799 _validators.pattern_bytes_validator, serialization=ser, metadata=metadata
1800 )
1801 else:
1802 raise PydanticSchemaGenerationError(f'Unable to generate pydantic-core schema for {pattern_type!r}.')
1803
1804 def _hashable_schema(self) -> core_schema.CoreSchema:
1805 return core_schema.custom_error_schema(
1806 schema=core_schema.json_or_python_schema(
1807 json_schema=core_schema.chain_schema(
1808 [core_schema.any_schema(), core_schema.is_instance_schema(collections.abc.Hashable)]
1809 ),
1810 python_schema=core_schema.is_instance_schema(collections.abc.Hashable),
1811 ),
1812 custom_error_type='is_hashable',
1813 custom_error_message='Input should be hashable',
1814 )
1815
1816 def _dataclass_schema(
1817 self, dataclass: type[StandardDataclass], origin: type[StandardDataclass] | None
1818 ) -> core_schema.CoreSchema:
1819 """Generate schema for a dataclass."""
1820 with (
1821 self.model_type_stack.push(dataclass),
1822 self.defs.get_schema_or_ref(dataclass) as (
1823 dataclass_ref,
1824 maybe_schema,
1825 ),
1826 ):
1827 if maybe_schema is not None:
1828 return maybe_schema
1829
1830 schema = dataclass.__dict__.get('__pydantic_core_schema__')
1831 if schema is not None and not isinstance(schema, MockCoreSchema):
1832 if schema['type'] == 'definitions':
1833 schema = self.defs.unpack_definitions(schema)
1834 ref = get_ref(schema)
1835 if ref:
1836 return self.defs.create_definition_reference_schema(schema)
1837 else:
1838 return schema
1839
1840 typevars_map = get_standard_typevars_map(dataclass)
1841 if origin is not None:
1842 dataclass = origin
1843
1844 # if (plain) dataclass doesn't have config, we use the parent's config, hence a default of `None`
1845 # (Pydantic dataclasses have an empty dict config by default).
1846 # see https://github.com/pydantic/pydantic/issues/10917
1847 config = getattr(dataclass, '__pydantic_config__', None)
1848
1849 from ..dataclasses import is_pydantic_dataclass
1850
1851 with self._ns_resolver.push(dataclass), self._config_wrapper_stack.push(config):
1852 if is_pydantic_dataclass(dataclass):
1853 if dataclass.__pydantic_fields_complete__():
1854 # Copy the field info instances to avoid mutating the `FieldInfo` instances
1855 # of the generic dataclass generic origin (e.g. `apply_typevars_map` below).
1856 # Note that we don't apply `deepcopy` on `__pydantic_fields__` because we
1857 # don't want to copy the `FieldInfo` attributes:
1858 fields = {
1859 f_name: copy(field_info) for f_name, field_info in dataclass.__pydantic_fields__.items()
1860 }
1861 if typevars_map:
1862 for field in fields.values():
1863 field.apply_typevars_map(typevars_map, *self._types_namespace)
1864 else:
1865 try:
1866 fields = rebuild_dataclass_fields(
1867 dataclass,
1868 config_wrapper=self._config_wrapper,
1869 ns_resolver=self._ns_resolver,
1870 typevars_map=typevars_map or {},
1871 )
1872 except NameError as e:
1873 raise PydanticUndefinedAnnotation.from_name_error(e) from e
1874 else:
1875 fields = collect_dataclass_fields(
1876 dataclass,
1877 typevars_map=typevars_map,
1878 config_wrapper=self._config_wrapper,
1879 )
1880
1881 if self._config_wrapper.extra == 'allow':
1882 # disallow combination of init=False on a dataclass field and extra='allow' on a dataclass
1883 for field_name, field in fields.items():
1884 if field.init is False:
1885 raise PydanticUserError(
1886 f'Field {field_name} has `init=False` and dataclass has config setting `extra="allow"`. '
1887 f'This combination is not allowed.',
1888 code='dataclass-init-false-extra-allow',
1889 )
1890
1891 decorators = dataclass.__dict__.get('__pydantic_decorators__')
1892 if decorators is None:
1893 decorators = DecoratorInfos.build(dataclass)
1894 decorators.update_from_config(self._config_wrapper)
1895 # Move kw_only=False args to the start of the list, as this is how vanilla dataclasses work.
1896 # Note that when kw_only is missing or None, it is treated as equivalent to kw_only=True
1897 args = sorted(
1898 (self._generate_dc_field_schema(k, v, decorators) for k, v in fields.items()),
1899 key=lambda a: a.get('kw_only') is not False,
1900 )
1901 has_post_init = hasattr(dataclass, '__post_init__')
1902 has_slots = hasattr(dataclass, '__slots__')
1903
1904 args_schema = core_schema.dataclass_args_schema(
1905 dataclass.__name__,
1906 args,
1907 computed_fields=[
1908 self._computed_field_schema(d, decorators.field_serializers)
1909 for d in decorators.computed_fields.values()
1910 ],
1911 collect_init_only=has_post_init,
1912 )
1913
1914 inner_schema = apply_validators(args_schema, decorators.root_validators.values())
1915
1916 model_validators = decorators.model_validators.values()
1917 inner_schema = apply_model_validators(inner_schema, model_validators, 'inner')
1918
1919 core_config = self._config_wrapper.core_config(title=dataclass.__name__)
1920
1921 dc_schema = core_schema.dataclass_schema(
1922 dataclass,
1923 inner_schema,
1924 generic_origin=origin,
1925 post_init=has_post_init,
1926 ref=dataclass_ref,
1927 fields=[field.name for field in dataclasses.fields(dataclass)],
1928 slots=has_slots,
1929 config=core_config,
1930 # we don't use a custom __setattr__ for dataclasses, so we must
1931 # pass along the frozen config setting to the pydantic-core schema
1932 frozen=self._config_wrapper_stack.tail.frozen,
1933 )
1934 schema = self._apply_model_serializers(dc_schema, decorators.model_serializers.values())
1935 schema = apply_model_validators(schema, model_validators, 'outer')
1936 return self.defs.create_definition_reference_schema(schema)
1937
1938 def _call_schema(self, function: ValidateCallSupportedTypes) -> core_schema.CallSchema:
1939 """Generate schema for a Callable.
1940
1941 TODO support functional validators once we support them in Config
1942 """
1943 arguments_schema = self._arguments_schema(function)
1944
1945 return_schema: core_schema.CoreSchema | None = None
1946 config_wrapper = self._config_wrapper
1947 if config_wrapper.validate_return:
1948 sig = signature(function)
1949 return_hint = sig.return_annotation
1950 if return_hint is not sig.empty:
1951 globalns, localns = self._types_namespace
1952 type_hints = _typing_extra.get_function_type_hints(
1953 function, globalns=globalns, localns=localns, include_keys={'return'}
1954 )
1955 return_schema = self.generate_schema(type_hints['return'])
1956
1957 return core_schema.call_schema(
1958 arguments_schema,
1959 function,
1960 return_schema=return_schema,
1961 )
1962
1963 def _arguments_schema(
1964 self, function: ValidateCallSupportedTypes, parameters_callback: ParametersCallback | None = None
1965 ) -> core_schema.ArgumentsSchema:
1966 """Generate schema for a Signature."""
1967 mode_lookup: dict[_ParameterKind, Literal['positional_only', 'positional_or_keyword', 'keyword_only']] = {
1968 Parameter.POSITIONAL_ONLY: 'positional_only',
1969 Parameter.POSITIONAL_OR_KEYWORD: 'positional_or_keyword',
1970 Parameter.KEYWORD_ONLY: 'keyword_only',
1971 }
1972
1973 sig = signature(function)
1974 globalns, localns = self._types_namespace
1975 type_hints = _typing_extra.get_function_type_hints(function, globalns=globalns, localns=localns)
1976
1977 arguments_list: list[core_schema.ArgumentsParameter] = []
1978 var_args_schema: core_schema.CoreSchema | None = None
1979 var_kwargs_schema: core_schema.CoreSchema | None = None
1980 var_kwargs_mode: core_schema.VarKwargsMode | None = None
1981
1982 for i, (name, p) in enumerate(sig.parameters.items()):
1983 if p.annotation is sig.empty:
1984 annotation = typing.cast(Any, Any)
1985 else:
1986 annotation = type_hints[name]
1987
1988 if parameters_callback is not None:
1989 result = parameters_callback(i, name, annotation)
1990 if result == 'skip':
1991 continue
1992
1993 parameter_mode = mode_lookup.get(p.kind)
1994 if parameter_mode is not None:
1995 arg_schema = self._generate_parameter_schema(
1996 name, annotation, AnnotationSource.FUNCTION, p.default, parameter_mode
1997 )
1998 arguments_list.append(arg_schema)
1999 elif p.kind == Parameter.VAR_POSITIONAL:
2000 var_args_schema = self.generate_schema(annotation)
2001 else:
2002 assert p.kind == Parameter.VAR_KEYWORD, p.kind
2003
2004 unpack_type = _typing_extra.unpack_type(annotation)
2005 if unpack_type is not None:
2006 origin = get_origin(unpack_type) or unpack_type
2007 if not is_typeddict(origin):
2008 raise PydanticUserError(
2009 f'Expected a `TypedDict` class inside `Unpack[...]`, got {unpack_type!r}',
2010 code='unpack-typed-dict',
2011 )
2012 non_pos_only_param_names = {
2013 name for name, p in sig.parameters.items() if p.kind != Parameter.POSITIONAL_ONLY
2014 }
2015 overlapping_params = non_pos_only_param_names.intersection(origin.__annotations__)
2016 if overlapping_params:
2017 raise PydanticUserError(
2018 f'Typed dictionary {origin.__name__!r} overlaps with parameter'
2019 f'{"s" if len(overlapping_params) >= 2 else ""} '
2020 f'{", ".join(repr(p) for p in sorted(overlapping_params))}',
2021 code='overlapping-unpack-typed-dict',
2022 )
2023
2024 var_kwargs_mode = 'unpacked-typed-dict'
2025 var_kwargs_schema = self._typed_dict_schema(unpack_type, get_origin(unpack_type))
2026 else:
2027 var_kwargs_mode = 'uniform'
2028 var_kwargs_schema = self.generate_schema(annotation)
2029
2030 return core_schema.arguments_schema(
2031 arguments_list,
2032 var_args_schema=var_args_schema,
2033 var_kwargs_mode=var_kwargs_mode,
2034 var_kwargs_schema=var_kwargs_schema,
2035 validate_by_name=self._config_wrapper.validate_by_name,
2036 )
2037
2038 def _arguments_v3_schema(
2039 self, function: ValidateCallSupportedTypes, parameters_callback: ParametersCallback | None = None
2040 ) -> core_schema.ArgumentsV3Schema:
2041 mode_lookup: dict[
2042 _ParameterKind, Literal['positional_only', 'positional_or_keyword', 'var_args', 'keyword_only']
2043 ] = {
2044 Parameter.POSITIONAL_ONLY: 'positional_only',
2045 Parameter.POSITIONAL_OR_KEYWORD: 'positional_or_keyword',
2046 Parameter.VAR_POSITIONAL: 'var_args',
2047 Parameter.KEYWORD_ONLY: 'keyword_only',
2048 }
2049
2050 sig = signature(function)
2051 globalns, localns = self._types_namespace
2052 type_hints = _typing_extra.get_function_type_hints(function, globalns=globalns, localns=localns)
2053
2054 parameters_list: list[core_schema.ArgumentsV3Parameter] = []
2055
2056 for i, (name, p) in enumerate(sig.parameters.items()):
2057 if parameters_callback is not None:
2058 result = parameters_callback(i, name, p.annotation)
2059 if result == 'skip':
2060 continue
2061
2062 if p.annotation is Parameter.empty:
2063 annotation = typing.cast(Any, Any)
2064 else:
2065 annotation = type_hints[name]
2066
2067 parameter_mode = mode_lookup.get(p.kind)
2068 if parameter_mode is None:
2069 assert p.kind == Parameter.VAR_KEYWORD, p.kind
2070
2071 unpack_type = _typing_extra.unpack_type(annotation)
2072 if unpack_type is not None:
2073 origin = get_origin(unpack_type) or unpack_type
2074 if not is_typeddict(origin):
2075 raise PydanticUserError(
2076 f'Expected a `TypedDict` class inside `Unpack[...]`, got {unpack_type!r}',
2077 code='unpack-typed-dict',
2078 )
2079 non_pos_only_param_names = {
2080 name for name, p in sig.parameters.items() if p.kind != Parameter.POSITIONAL_ONLY
2081 }
2082 overlapping_params = non_pos_only_param_names.intersection(origin.__annotations__)
2083 if overlapping_params:
2084 raise PydanticUserError(
2085 f'Typed dictionary {origin.__name__!r} overlaps with parameter'
2086 f'{"s" if len(overlapping_params) >= 2 else ""} '
2087 f'{", ".join(repr(p) for p in sorted(overlapping_params))}',
2088 code='overlapping-unpack-typed-dict',
2089 )
2090 parameter_mode = 'var_kwargs_unpacked_typed_dict'
2091 annotation = unpack_type
2092 else:
2093 parameter_mode = 'var_kwargs_uniform'
2094
2095 parameters_list.append(
2096 self._generate_parameter_v3_schema(
2097 name, annotation, AnnotationSource.FUNCTION, parameter_mode, default=p.default
2098 )
2099 )
2100
2101 return core_schema.arguments_v3_schema(
2102 parameters_list,
2103 validate_by_name=self._config_wrapper.validate_by_name,
2104 )
2105
2106 def _unsubstituted_typevar_schema(self, typevar: typing.TypeVar) -> core_schema.CoreSchema:
2107 try:
2108 has_default = typevar.has_default() # pyright: ignore[reportAttributeAccessIssue]
2109 except AttributeError:
2110 # Happens if using `typing.TypeVar` (and not `typing_extensions`) on Python < 3.13
2111 pass
2112 else:
2113 if has_default:
2114 return self.generate_schema(typevar.__default__) # pyright: ignore[reportAttributeAccessIssue]
2115
2116 if constraints := typevar.__constraints__:
2117 return self._union_schema(typing.Union[constraints])
2118
2119 if bound := typevar.__bound__:
2120 schema = self.generate_schema(bound)
2121 schema['serialization'] = core_schema.simple_ser_schema('any')
2122 return schema
2123
2124 return core_schema.any_schema()
2125
2126 def _computed_field_schema(
2127 self,
2128 d: Decorator[ComputedFieldInfo],
2129 field_serializers: dict[str, Decorator[FieldSerializerDecoratorInfo]],
2130 ) -> core_schema.ComputedField:
2131 if d.info.return_type is not PydanticUndefined:
2132 return_type = d.info.return_type
2133 else:
2134 try:
2135 # Do not pass in globals as the function could be defined in a different module.
2136 # Instead, let `get_callable_return_type` infer the globals to use, but still pass
2137 # in locals that may contain a parent/rebuild namespace:
2138 return_type = _decorators.get_callable_return_type(d.func, localns=self._types_namespace.locals)
2139 except NameError as e:
2140 raise PydanticUndefinedAnnotation.from_name_error(e) from e
2141 if return_type is PydanticUndefined:
2142 raise PydanticUserError(
2143 'Computed field is missing return type annotation or specifying `return_type`'
2144 ' to the `@computed_field` decorator (e.g. `@computed_field(return_type=int | str)`)',
2145 code='model-field-missing-annotation',
2146 )
2147
2148 return_type = replace_types(return_type, self._typevars_map)
2149 # Create a new ComputedFieldInfo so that different type parametrizations of the same
2150 # generic model's computed field can have different return types.
2151 d.info = dataclasses.replace(d.info, return_type=return_type)
2152 return_type_schema = self.generate_schema(return_type)
2153 # Apply serializers to computed field if there exist
2154 return_type_schema = self._apply_field_serializers(
2155 return_type_schema,
2156 filter_field_decorator_info_by_field(field_serializers.values(), d.cls_var_name),
2157 )
2158
2159 pydantic_js_updates, pydantic_js_extra = _extract_json_schema_info_from_field_info(d.info)
2160 core_metadata: dict[str, Any] = {}
2161 update_core_metadata(
2162 core_metadata,
2163 pydantic_js_updates={'readOnly': True, **(pydantic_js_updates if pydantic_js_updates else {})},
2164 pydantic_js_extra=pydantic_js_extra,
2165 )
2166 return core_schema.computed_field(
2167 d.cls_var_name, return_schema=return_type_schema, alias=d.info.alias, metadata=core_metadata
2168 )
2169
2170 def _annotated_schema(self, annotated_type: Any) -> core_schema.CoreSchema:
2171 """Generate schema for an Annotated type, e.g. `Annotated[int, Field(...)]` or `Annotated[int, Gt(0)]`."""
2172 FieldInfo = import_cached_field_info()
2173 source_type, *annotations = self._get_args_resolving_forward_refs(
2174 annotated_type,
2175 required=True,
2176 )
2177 schema = self._apply_annotations(source_type, annotations)
2178 # put the default validator last so that TypeAdapter.get_default_value() works
2179 # even if there are function validators involved
2180 for annotation in annotations:
2181 if isinstance(annotation, FieldInfo):
2182 schema = wrap_default(annotation, schema)
2183 return schema
2184
2185 def _apply_annotations(
2186 self,
2187 source_type: Any,
2188 annotations: list[Any],
2189 transform_inner_schema: Callable[[CoreSchema], CoreSchema] = lambda x: x,
2190 check_unsupported_field_info_attributes: bool = True,
2191 ) -> CoreSchema:
2192 """Apply arguments from `Annotated` or from `FieldInfo` to a schema.
2193
2194 This gets called by `GenerateSchema._annotated_schema` but differs from it in that it does
2195 not expect `source_type` to be an `Annotated` object, it expects it to be the first argument of that
2196 (in other words, `GenerateSchema._annotated_schema` just unpacks `Annotated`, this process it).
2197 """
2198 annotations = list(_known_annotated_metadata.expand_grouped_metadata(annotations))
2199
2200 pydantic_js_annotation_functions: list[GetJsonSchemaFunction] = []
2201
2202 def inner_handler(obj: Any) -> CoreSchema:
2203 schema = self._generate_schema_from_get_schema_method(obj, source_type)
2204
2205 if schema is None:
2206 schema = self._generate_schema_inner(obj)
2207
2208 metadata_js_function = _extract_get_pydantic_json_schema(obj)
2209 if metadata_js_function is not None:
2210 metadata_schema = resolve_original_schema(schema, self.defs)
2211 if metadata_schema is not None:
2212 self._add_js_function(metadata_schema, metadata_js_function)
2213 return transform_inner_schema(schema)
2214
2215 get_inner_schema = CallbackGetCoreSchemaHandler(inner_handler, self)
2216
2217 for annotation in annotations:
2218 if annotation is None:
2219 continue
2220 get_inner_schema = self._get_wrapped_inner_schema(
2221 get_inner_schema,
2222 annotation,
2223 pydantic_js_annotation_functions,
2224 check_unsupported_field_info_attributes=check_unsupported_field_info_attributes,
2225 )
2226
2227 schema = get_inner_schema(source_type)
2228 if pydantic_js_annotation_functions:
2229 core_metadata = schema.setdefault('metadata', {})
2230 update_core_metadata(core_metadata, pydantic_js_annotation_functions=pydantic_js_annotation_functions)
2231 return _add_custom_serialization_from_json_encoders(self._config_wrapper.json_encoders, source_type, schema)
2232
2233 def _apply_single_annotation(
2234 self,
2235 schema: core_schema.CoreSchema,
2236 metadata: Any,
2237 check_unsupported_field_info_attributes: bool = True,
2238 ) -> core_schema.CoreSchema:
2239 FieldInfo = import_cached_field_info()
2240
2241 if isinstance(metadata, FieldInfo):
2242 if (
2243 check_unsupported_field_info_attributes
2244 # HACK: we don't want to emit the warning for `FieldInfo` subclasses, because FastAPI does weird manipulations
2245 # with its subclasses and their annotations:
2246 and type(metadata) is FieldInfo
2247 ):
2248 for attr, value in (unsupported_attributes := self._get_unsupported_field_info_attributes(metadata)):
2249 warnings.warn(
2250 f'The {attr!r} attribute with value {value!r} was provided to the `Field()` function, '
2251 f'which has no effect in the context it was used. {attr!r} is field-specific metadata, '
2252 'and can only be attached to a model field using `Annotated` metadata or by assignment. '
2253 'This may have happened because an `Annotated` type alias using the `type` statement was '
2254 'used, or if the `Field()` function was attached to a single member of a union type.',
2255 category=UnsupportedFieldAttributeWarning,
2256 )
2257
2258 if (
2259 metadata.default_factory_takes_validated_data
2260 and self.model_type_stack.get() is None
2261 and 'defaut_factory' not in unsupported_attributes
2262 ):
2263 warnings.warn(
2264 "A 'default_factory' taking validated data as an argument was provided to the `Field()` function, "
2265 'but no validated data is available in the context it was used.',
2266 category=UnsupportedFieldAttributeWarning,
2267 )
2268
2269 for field_metadata in metadata.metadata:
2270 schema = self._apply_single_annotation(schema, field_metadata)
2271
2272 if metadata.discriminator is not None:
2273 schema = self._apply_discriminator_to_union(schema, metadata.discriminator)
2274 return schema
2275
2276 if schema['type'] == 'nullable':
2277 # for nullable schemas, metadata is automatically applied to the inner schema
2278 inner = schema.get('schema', core_schema.any_schema())
2279 inner = self._apply_single_annotation(inner, metadata)
2280 if inner:
2281 schema['schema'] = inner
2282 return schema
2283
2284 original_schema = schema
2285 ref = schema.get('ref')
2286 if ref is not None:
2287 schema = schema.copy()
2288 new_ref = ref + f'_{repr(metadata)}'
2289 if (existing := self.defs.get_schema_from_ref(new_ref)) is not None:
2290 return existing
2291 schema['ref'] = new_ref # pyright: ignore[reportGeneralTypeIssues]
2292 elif schema['type'] == 'definition-ref':
2293 ref = schema['schema_ref']
2294 if (referenced_schema := self.defs.get_schema_from_ref(ref)) is not None:
2295 schema = referenced_schema.copy()
2296 new_ref = ref + f'_{repr(metadata)}'
2297 if (existing := self.defs.get_schema_from_ref(new_ref)) is not None:
2298 return existing
2299 schema['ref'] = new_ref # pyright: ignore[reportGeneralTypeIssues]
2300
2301 maybe_updated_schema = _known_annotated_metadata.apply_known_metadata(metadata, schema)
2302
2303 if maybe_updated_schema is not None:
2304 return maybe_updated_schema
2305 return original_schema
2306
2307 def _apply_single_annotation_json_schema(
2308 self, schema: core_schema.CoreSchema, metadata: Any
2309 ) -> core_schema.CoreSchema:
2310 FieldInfo = import_cached_field_info()
2311
2312 if isinstance(metadata, FieldInfo):
2313 for field_metadata in metadata.metadata:
2314 schema = self._apply_single_annotation_json_schema(schema, field_metadata)
2315
2316 pydantic_js_updates, pydantic_js_extra = _extract_json_schema_info_from_field_info(metadata)
2317 core_metadata = schema.setdefault('metadata', {})
2318 update_core_metadata(
2319 core_metadata, pydantic_js_updates=pydantic_js_updates, pydantic_js_extra=pydantic_js_extra
2320 )
2321 return schema
2322
2323 def _get_unsupported_field_info_attributes(self, field_info: FieldInfo) -> list[tuple[str, Any]]:
2324 """Get the list of unsupported `FieldInfo` attributes when not directly used in `Annotated` for field annotations."""
2325 unused_metadata: list[tuple[str, Any]] = []
2326 for unused_metadata_name, unset_value in UNSUPPORTED_STANDALONE_FIELDINFO_ATTRIBUTES:
2327 if (
2328 (unused_metadata_value := getattr(field_info, unused_metadata_name)) is not unset_value
2329 # `default` and `default_factory` can still be used with a type adapter, so only include them
2330 # if used with a model-like class:
2331 and (
2332 unused_metadata_name not in ('default', 'default_factory')
2333 or self.model_type_stack.get() is not None
2334 )
2335 # Setting `alias` will set `validation/serialization_alias` as well, so we want to avoid duplicate warnings:
2336 and (
2337 unused_metadata_name not in ('validation_alias', 'serialization_alias')
2338 or 'alias' not in field_info._attributes_set
2339 )
2340 ):
2341 unused_metadata.append((unused_metadata_name, unused_metadata_value))
2342
2343 return unused_metadata
2344
2345 def _get_wrapped_inner_schema(
2346 self,
2347 get_inner_schema: GetCoreSchemaHandler,
2348 annotation: Any,
2349 pydantic_js_annotation_functions: list[GetJsonSchemaFunction],
2350 check_unsupported_field_info_attributes: bool = False,
2351 ) -> CallbackGetCoreSchemaHandler:
2352 annotation_get_schema: GetCoreSchemaFunction | None = getattr(annotation, '__get_pydantic_core_schema__', None)
2353
2354 def new_handler(source: Any) -> core_schema.CoreSchema:
2355 if annotation_get_schema is not None:
2356 schema = annotation_get_schema(source, get_inner_schema)
2357 else:
2358 schema = get_inner_schema(source)
2359 schema = self._apply_single_annotation(
2360 schema,
2361 annotation,
2362 check_unsupported_field_info_attributes=check_unsupported_field_info_attributes,
2363 )
2364 schema = self._apply_single_annotation_json_schema(schema, annotation)
2365
2366 metadata_js_function = _extract_get_pydantic_json_schema(annotation)
2367 if metadata_js_function is not None:
2368 pydantic_js_annotation_functions.append(metadata_js_function)
2369 return schema
2370
2371 return CallbackGetCoreSchemaHandler(new_handler, self)
2372
2373 def _apply_field_serializers(
2374 self,
2375 schema: core_schema.CoreSchema,
2376 serializers: list[Decorator[FieldSerializerDecoratorInfo]],
2377 ) -> core_schema.CoreSchema:
2378 """Apply field serializers to a schema."""
2379 if serializers:
2380 schema = copy(schema)
2381 if schema['type'] == 'definitions':
2382 inner_schema = schema['schema']
2383 schema['schema'] = self._apply_field_serializers(inner_schema, serializers)
2384 return schema
2385 elif 'ref' in schema:
2386 schema = self.defs.create_definition_reference_schema(schema)
2387
2388 # use the last serializer to make it easy to override a serializer set on a parent model
2389 serializer = serializers[-1]
2390 is_field_serializer, info_arg = inspect_field_serializer(serializer.func, serializer.info.mode)
2391
2392 if serializer.info.return_type is not PydanticUndefined:
2393 return_type = serializer.info.return_type
2394 else:
2395 try:
2396 # Do not pass in globals as the function could be defined in a different module.
2397 # Instead, let `get_callable_return_type` infer the globals to use, but still pass
2398 # in locals that may contain a parent/rebuild namespace:
2399 return_type = _decorators.get_callable_return_type(
2400 serializer.func, localns=self._types_namespace.locals
2401 )
2402 except NameError as e:
2403 raise PydanticUndefinedAnnotation.from_name_error(e) from e
2404
2405 if return_type is PydanticUndefined:
2406 return_schema = None
2407 else:
2408 return_schema = self.generate_schema(return_type)
2409
2410 if serializer.info.mode == 'wrap':
2411 schema['serialization'] = core_schema.wrap_serializer_function_ser_schema(
2412 serializer.func,
2413 is_field_serializer=is_field_serializer,
2414 info_arg=info_arg,
2415 return_schema=return_schema,
2416 when_used=serializer.info.when_used,
2417 )
2418 else:
2419 assert serializer.info.mode == 'plain'
2420 schema['serialization'] = core_schema.plain_serializer_function_ser_schema(
2421 serializer.func,
2422 is_field_serializer=is_field_serializer,
2423 info_arg=info_arg,
2424 return_schema=return_schema,
2425 when_used=serializer.info.when_used,
2426 )
2427 return schema
2428
2429 def _apply_model_serializers(
2430 self, schema: core_schema.CoreSchema, serializers: Iterable[Decorator[ModelSerializerDecoratorInfo]]
2431 ) -> core_schema.CoreSchema:
2432 """Apply model serializers to a schema."""
2433 ref: str | None = schema.pop('ref', None) # type: ignore
2434 if serializers:
2435 serializer = list(serializers)[-1]
2436 info_arg = inspect_model_serializer(serializer.func, serializer.info.mode)
2437
2438 if serializer.info.return_type is not PydanticUndefined:
2439 return_type = serializer.info.return_type
2440 else:
2441 try:
2442 # Do not pass in globals as the function could be defined in a different module.
2443 # Instead, let `get_callable_return_type` infer the globals to use, but still pass
2444 # in locals that may contain a parent/rebuild namespace:
2445 return_type = _decorators.get_callable_return_type(
2446 serializer.func, localns=self._types_namespace.locals
2447 )
2448 except NameError as e:
2449 raise PydanticUndefinedAnnotation.from_name_error(e) from e
2450
2451 if return_type is PydanticUndefined:
2452 return_schema = None
2453 else:
2454 return_schema = self.generate_schema(return_type)
2455
2456 if serializer.info.mode == 'wrap':
2457 ser_schema: core_schema.SerSchema = core_schema.wrap_serializer_function_ser_schema(
2458 serializer.func,
2459 info_arg=info_arg,
2460 return_schema=return_schema,
2461 when_used=serializer.info.when_used,
2462 )
2463 else:
2464 # plain
2465 ser_schema = core_schema.plain_serializer_function_ser_schema(
2466 serializer.func,
2467 info_arg=info_arg,
2468 return_schema=return_schema,
2469 when_used=serializer.info.when_used,
2470 )
2471 schema['serialization'] = ser_schema
2472 if ref:
2473 schema['ref'] = ref # type: ignore
2474 return schema
2475
2476
2477_VALIDATOR_F_MATCH: Mapping[
2478 tuple[FieldValidatorModes, Literal['no-info', 'with-info']],
2479 Callable[[Callable[..., Any], core_schema.CoreSchema], core_schema.CoreSchema],
2480] = {
2481 ('before', 'no-info'): lambda f, schema: core_schema.no_info_before_validator_function(f, schema),
2482 ('after', 'no-info'): lambda f, schema: core_schema.no_info_after_validator_function(f, schema),
2483 ('plain', 'no-info'): lambda f, _: core_schema.no_info_plain_validator_function(f),
2484 ('wrap', 'no-info'): lambda f, schema: core_schema.no_info_wrap_validator_function(f, schema),
2485 ('before', 'with-info'): lambda f, schema: core_schema.with_info_before_validator_function(f, schema),
2486 ('after', 'with-info'): lambda f, schema: core_schema.with_info_after_validator_function(f, schema),
2487 ('plain', 'with-info'): lambda f, _: core_schema.with_info_plain_validator_function(f),
2488 ('wrap', 'with-info'): lambda f, schema: core_schema.with_info_wrap_validator_function(f, schema),
2489}
2490
2491
2492# TODO V3: this function is only used for deprecated decorators. It should
2493# be removed once we drop support for those.
2494def apply_validators(
2495 schema: core_schema.CoreSchema,
2496 validators: Iterable[Decorator[RootValidatorDecoratorInfo]]
2497 | Iterable[Decorator[ValidatorDecoratorInfo]]
2498 | Iterable[Decorator[FieldValidatorDecoratorInfo]],
2499) -> core_schema.CoreSchema:
2500 """Apply validators to a schema.
2501
2502 Args:
2503 schema: The schema to apply validators on.
2504 validators: An iterable of validators.
2505 field_name: The name of the field if validators are being applied to a model field.
2506
2507 Returns:
2508 The updated schema.
2509 """
2510 for validator in validators:
2511 info_arg = inspect_validator(validator.func, validator.info.mode)
2512 val_type = 'with-info' if info_arg else 'no-info'
2513
2514 schema = _VALIDATOR_F_MATCH[(validator.info.mode, val_type)](validator.func, schema)
2515 return schema
2516
2517
2518def _validators_require_validate_default(validators: Iterable[Decorator[ValidatorDecoratorInfo]]) -> bool:
2519 """In v1, if any of the validators for a field had `always=True`, the default value would be validated.
2520
2521 This serves as an auxiliary function for re-implementing that logic, by looping over a provided
2522 collection of (v1-style) ValidatorDecoratorInfo's and checking if any of them have `always=True`.
2523
2524 We should be able to drop this function and the associated logic calling it once we drop support
2525 for v1-style validator decorators. (Or we can extend it and keep it if we add something equivalent
2526 to the v1-validator `always` kwarg to `field_validator`.)
2527 """
2528 for validator in validators:
2529 if validator.info.always:
2530 return True
2531 return False
2532
2533
2534def _convert_to_aliases(
2535 alias: str | AliasChoices | AliasPath | None,
2536) -> str | list[str | int] | list[list[str | int]] | None:
2537 if isinstance(alias, (AliasChoices, AliasPath)):
2538 return alias.convert_to_aliases()
2539 else:
2540 return alias
2541
2542
2543def apply_model_validators(
2544 schema: core_schema.CoreSchema,
2545 validators: Iterable[Decorator[ModelValidatorDecoratorInfo]],
2546 mode: Literal['inner', 'outer', 'all'],
2547) -> core_schema.CoreSchema:
2548 """Apply model validators to a schema.
2549
2550 If mode == 'inner', only "before" validators are applied
2551 If mode == 'outer', validators other than "before" are applied
2552 If mode == 'all', all validators are applied
2553
2554 Args:
2555 schema: The schema to apply validators on.
2556 validators: An iterable of validators.
2557 mode: The validator mode.
2558
2559 Returns:
2560 The updated schema.
2561 """
2562 ref: str | None = schema.pop('ref', None) # type: ignore
2563 for validator in validators:
2564 if mode == 'inner' and validator.info.mode != 'before':
2565 continue
2566 if mode == 'outer' and validator.info.mode == 'before':
2567 continue
2568 info_arg = inspect_validator(validator.func, validator.info.mode)
2569 if validator.info.mode == 'wrap':
2570 if info_arg:
2571 schema = core_schema.with_info_wrap_validator_function(function=validator.func, schema=schema)
2572 else:
2573 schema = core_schema.no_info_wrap_validator_function(function=validator.func, schema=schema)
2574 elif validator.info.mode == 'before':
2575 if info_arg:
2576 schema = core_schema.with_info_before_validator_function(function=validator.func, schema=schema)
2577 else:
2578 schema = core_schema.no_info_before_validator_function(function=validator.func, schema=schema)
2579 else:
2580 assert validator.info.mode == 'after'
2581 if info_arg:
2582 schema = core_schema.with_info_after_validator_function(function=validator.func, schema=schema)
2583 else:
2584 schema = core_schema.no_info_after_validator_function(function=validator.func, schema=schema)
2585 if ref:
2586 schema['ref'] = ref # type: ignore
2587 return schema
2588
2589
2590def wrap_default(field_info: FieldInfo, schema: core_schema.CoreSchema) -> core_schema.CoreSchema:
2591 """Wrap schema with default schema if default value or `default_factory` are available.
2592
2593 Args:
2594 field_info: The field info object.
2595 schema: The schema to apply default on.
2596
2597 Returns:
2598 Updated schema by default value or `default_factory`.
2599 """
2600 if field_info.default_factory:
2601 return core_schema.with_default_schema(
2602 schema,
2603 default_factory=field_info.default_factory,
2604 default_factory_takes_data=takes_validated_data_argument(field_info.default_factory),
2605 validate_default=field_info.validate_default,
2606 )
2607 elif field_info.default is not PydanticUndefined:
2608 return core_schema.with_default_schema(
2609 schema, default=field_info.default, validate_default=field_info.validate_default
2610 )
2611 else:
2612 return schema
2613
2614
2615def _extract_get_pydantic_json_schema(tp: Any) -> GetJsonSchemaFunction | None:
2616 """Extract `__get_pydantic_json_schema__` from a type, handling the deprecated `__modify_schema__`."""
2617 js_modify_function = getattr(tp, '__get_pydantic_json_schema__', None)
2618
2619 if hasattr(tp, '__modify_schema__'):
2620 BaseModel = import_cached_base_model()
2621
2622 has_custom_v2_modify_js_func = (
2623 js_modify_function is not None
2624 and BaseModel.__get_pydantic_json_schema__.__func__ # type: ignore
2625 not in (js_modify_function, getattr(js_modify_function, '__func__', None))
2626 )
2627
2628 if not has_custom_v2_modify_js_func:
2629 cls_name = getattr(tp, '__name__', None)
2630 raise PydanticUserError(
2631 f'The `__modify_schema__` method is not supported in Pydantic v2. '
2632 f'Use `__get_pydantic_json_schema__` instead{f" in class `{cls_name}`" if cls_name else ""}.',
2633 code='custom-json-schema',
2634 )
2635
2636 if (origin := get_origin(tp)) is not None:
2637 # Generic aliases proxy attribute access to the origin, *except* dunder attributes,
2638 # such as `__get_pydantic_json_schema__`, hence the explicit check.
2639 return _extract_get_pydantic_json_schema(origin)
2640
2641 if js_modify_function is None:
2642 return None
2643
2644 return js_modify_function
2645
2646
2647def resolve_original_schema(schema: CoreSchema, definitions: _Definitions) -> CoreSchema | None:
2648 if schema['type'] == 'definition-ref':
2649 return definitions.get_schema_from_ref(schema['schema_ref'])
2650 elif schema['type'] == 'definitions':
2651 return schema['schema']
2652 else:
2653 return schema
2654
2655
2656def _inlining_behavior(
2657 def_ref: core_schema.DefinitionReferenceSchema,
2658) -> Literal['inline', 'keep', 'preserve_metadata']:
2659 """Determine the inlining behavior of the `'definition-ref'` schema.
2660
2661 - If no `'serialization'` schema and no metadata is attached, the schema can safely be inlined.
2662 - If it has metadata but only related to the deferred discriminator application, it can be inlined
2663 provided that such metadata is kept.
2664 - Otherwise, the schema should not be inlined. Doing so would remove the `'serialization'` schema or metadata.
2665 """
2666 if 'serialization' in def_ref:
2667 return 'keep'
2668 metadata = def_ref.get('metadata')
2669 if not metadata:
2670 return 'inline'
2671 if len(metadata) == 1 and 'pydantic_internal_union_discriminator' in metadata:
2672 return 'preserve_metadata'
2673 return 'keep'
2674
2675
2676class _Definitions:
2677 """Keeps track of references and definitions."""
2678
2679 _recursively_seen: set[str]
2680 """A set of recursively seen references.
2681
2682 When a referenceable type is encountered, the `get_schema_or_ref` context manager is
2683 entered to compute the reference. If the type references itself by some way (e.g. for
2684 a dataclass a Pydantic model, the class can be referenced as a field annotation),
2685 entering the context manager again will yield a `'definition-ref'` schema that should
2686 short-circuit the normal generation process, as the reference was already in this set.
2687 """
2688
2689 _definitions: dict[str, core_schema.CoreSchema]
2690 """A mapping of references to their corresponding schema.
2691
2692 When a schema for a referenceable type is generated, it is stored in this mapping. If the
2693 same type is encountered again, the reference is yielded by the `get_schema_or_ref` context
2694 manager.
2695 """
2696
2697 def __init__(self) -> None:
2698 self._recursively_seen = set()
2699 self._definitions = {}
2700
2701 @contextmanager
2702 def get_schema_or_ref(self, tp: Any, /) -> Generator[tuple[str, core_schema.DefinitionReferenceSchema | None]]:
2703 """Get a definition for `tp` if one exists.
2704
2705 If a definition exists, a tuple of `(ref_string, CoreSchema)` is returned.
2706 If no definition exists yet, a tuple of `(ref_string, None)` is returned.
2707
2708 Note that the returned `CoreSchema` will always be a `DefinitionReferenceSchema`,
2709 not the actual definition itself.
2710
2711 This should be called for any type that can be identified by reference.
2712 This includes any recursive types.
2713
2714 At present the following types can be named/recursive:
2715
2716 - Pydantic model
2717 - Pydantic and stdlib dataclasses
2718 - Typed dictionaries
2719 - Named tuples
2720 - `TypeAliasType` instances
2721 - Enums
2722 """
2723 ref = get_type_ref(tp)
2724 # return the reference if we're either (1) in a cycle or (2) it the reference was already encountered:
2725 if ref in self._recursively_seen or ref in self._definitions:
2726 yield (ref, core_schema.definition_reference_schema(ref))
2727 else:
2728 self._recursively_seen.add(ref)
2729 try:
2730 yield (ref, None)
2731 finally:
2732 self._recursively_seen.discard(ref)
2733
2734 def get_schema_from_ref(self, ref: str) -> CoreSchema | None:
2735 """Resolve the schema from the given reference."""
2736 return self._definitions.get(ref)
2737
2738 def create_definition_reference_schema(self, schema: CoreSchema) -> core_schema.DefinitionReferenceSchema:
2739 """Store the schema as a definition and return a `'definition-reference'` schema pointing to it.
2740
2741 The schema must have a reference attached to it.
2742 """
2743 ref = schema['ref'] # pyright: ignore
2744 self._definitions[ref] = schema
2745 return core_schema.definition_reference_schema(ref)
2746
2747 def unpack_definitions(self, schema: core_schema.DefinitionsSchema) -> CoreSchema:
2748 """Store the definitions of the `'definitions'` core schema and return the inner core schema."""
2749 for def_schema in schema['definitions']:
2750 self._definitions[def_schema['ref']] = def_schema # pyright: ignore
2751 return schema['schema']
2752
2753 def finalize_schema(self, schema: CoreSchema) -> CoreSchema:
2754 """Finalize the core schema.
2755
2756 This traverses the core schema and referenced definitions, replaces `'definition-ref'` schemas
2757 by the referenced definition if possible, and applies deferred discriminators.
2758 """
2759 definitions = self._definitions
2760 try:
2761 gather_result = gather_schemas_for_cleaning(
2762 schema,
2763 definitions=definitions,
2764 )
2765 except MissingDefinitionError as e:
2766 raise InvalidSchemaError from e
2767
2768 remaining_defs: dict[str, CoreSchema] = {}
2769
2770 # Note: this logic doesn't play well when core schemas with deferred discriminator metadata
2771 # and references are encountered. See the `test_deferred_discriminated_union_and_references()` test.
2772 for ref, inlinable_def_ref in gather_result['collected_references'].items():
2773 if inlinable_def_ref is not None and (inlining_behavior := _inlining_behavior(inlinable_def_ref)) != 'keep':
2774 if inlining_behavior == 'inline':
2775 # `ref` was encountered, and only once:
2776 # - `inlinable_def_ref` is a `'definition-ref'` schema and is guaranteed to be
2777 # the only one. Transform it into the definition it points to.
2778 # - Do not store the definition in the `remaining_defs`.
2779 inlinable_def_ref.clear() # pyright: ignore[reportAttributeAccessIssue]
2780 inlinable_def_ref.update(self._resolve_definition(ref, definitions)) # pyright: ignore
2781 elif inlining_behavior == 'preserve_metadata':
2782 # `ref` was encountered, and only once, but contains discriminator metadata.
2783 # We will do the same thing as if `inlining_behavior` was `'inline'`, but make
2784 # sure to keep the metadata for the deferred discriminator application logic below.
2785 meta = inlinable_def_ref.pop('metadata')
2786 inlinable_def_ref.clear() # pyright: ignore[reportAttributeAccessIssue]
2787 inlinable_def_ref.update(self._resolve_definition(ref, definitions)) # pyright: ignore
2788 inlinable_def_ref['metadata'] = meta
2789 else:
2790 # `ref` was encountered, at least two times (or only once, but with metadata or a serialization schema):
2791 # - Do not inline the `'definition-ref'` schemas (they are not provided in the gather result anyway).
2792 # - Store the the definition in the `remaining_defs`
2793 remaining_defs[ref] = self._resolve_definition(ref, definitions)
2794
2795 for cs in gather_result['deferred_discriminator_schemas']:
2796 discriminator: str | None = cs['metadata'].pop('pydantic_internal_union_discriminator', None) # pyright: ignore[reportTypedDictNotRequiredAccess]
2797 if discriminator is None:
2798 # This can happen in rare scenarios, when a deferred schema is present multiple times in the
2799 # gather result (e.g. when using the `Sequence` type -- see `test_sequence_discriminated_union()`).
2800 # In this case, a previous loop iteration applied the discriminator and so we can just skip it here.
2801 continue
2802 applied = _discriminated_union.apply_discriminator(cs.copy(), discriminator, remaining_defs)
2803 # Mutate the schema directly to have the discriminator applied
2804 cs.clear() # pyright: ignore[reportAttributeAccessIssue]
2805 cs.update(applied) # pyright: ignore
2806
2807 if remaining_defs:
2808 schema = core_schema.definitions_schema(schema=schema, definitions=[*remaining_defs.values()])
2809 return schema
2810
2811 def _resolve_definition(self, ref: str, definitions: dict[str, CoreSchema]) -> CoreSchema:
2812 definition = definitions[ref]
2813 if definition['type'] != 'definition-ref':
2814 return definition
2815
2816 # Some `'definition-ref'` schemas might act as "intermediate" references (e.g. when using
2817 # a PEP 695 type alias (which is referenceable) that references another PEP 695 type alias):
2818 visited: set[str] = set()
2819 while definition['type'] == 'definition-ref' and _inlining_behavior(definition) == 'inline':
2820 schema_ref = definition['schema_ref']
2821 if schema_ref in visited:
2822 raise PydanticUserError(
2823 f'{ref} contains a circular reference to itself.', code='circular-reference-schema'
2824 )
2825 visited.add(schema_ref)
2826 definition = definitions[schema_ref]
2827 return {**definition, 'ref': ref} # pyright: ignore[reportReturnType]
2828
2829
2830class _FieldNameStack:
2831 __slots__ = ('_stack',)
2832
2833 def __init__(self) -> None:
2834 self._stack: list[str] = []
2835
2836 @contextmanager
2837 def push(self, field_name: str) -> Iterator[None]:
2838 self._stack.append(field_name)
2839 yield
2840 self._stack.pop()
2841
2842 def get(self) -> str | None:
2843 if self._stack:
2844 return self._stack[-1]
2845 else:
2846 return None
2847
2848
2849class _ModelTypeStack:
2850 __slots__ = ('_stack',)
2851
2852 def __init__(self) -> None:
2853 self._stack: list[type] = []
2854
2855 @contextmanager
2856 def push(self, type_obj: type) -> Iterator[None]:
2857 self._stack.append(type_obj)
2858 yield
2859 self._stack.pop()
2860
2861 def get(self) -> type | None:
2862 if self._stack:
2863 return self._stack[-1]
2864 else:
2865 return None