1"""Convert python types to pydantic-core schema."""
2
3from __future__ import annotations as _annotations
4
5import collections.abc
6import dataclasses
7import datetime
8import inspect
9import os
10import pathlib
11import re
12import sys
13import typing
14import warnings
15from collections.abc import Generator, Iterable, Iterator, Mapping
16from contextlib import contextmanager
17from copy import copy
18from decimal import Decimal
19from enum import Enum
20from fractions import Fraction
21from functools import partial
22from inspect import Parameter, _ParameterKind
23from ipaddress import IPv4Address, IPv4Interface, IPv4Network, IPv6Address, IPv6Interface, IPv6Network
24from itertools import chain
25from operator import attrgetter
26from types import FunctionType, GenericAlias, LambdaType, MethodType
27from typing import (
28 TYPE_CHECKING,
29 Any,
30 Callable,
31 Final,
32 ForwardRef,
33 Literal,
34 TypeVar,
35 Union,
36 cast,
37 overload,
38)
39from uuid import UUID
40from zoneinfo import ZoneInfo
41
42import typing_extensions
43from pydantic_core import (
44 MISSING,
45 CoreSchema,
46 MultiHostUrl,
47 PydanticCustomError,
48 PydanticSerializationUnexpectedValue,
49 PydanticUndefined,
50 Url,
51 core_schema,
52 to_jsonable_python,
53)
54from typing_extensions import TypeAlias, TypeAliasType, get_args, get_origin, is_typeddict
55from typing_inspection import typing_objects
56from typing_inspection.introspection import AnnotationSource, get_literal_values, is_union_origin
57
58from ..aliases import AliasChoices, AliasPath
59from ..annotated_handlers import GetCoreSchemaHandler, GetJsonSchemaHandler
60from ..config import ConfigDict, JsonDict, JsonEncoder, JsonSchemaExtraCallable
61from ..errors import (
62 PydanticForbiddenQualifier,
63 PydanticInvalidForJsonSchema,
64 PydanticSchemaGenerationError,
65 PydanticUndefinedAnnotation,
66 PydanticUserError,
67)
68from ..functional_validators import AfterValidator, BeforeValidator, FieldValidatorModes, PlainValidator, WrapValidator
69from ..json_schema import JsonSchemaValue
70from ..version import version_short
71from ..warnings import (
72 ArbitraryTypeWarning,
73 PydanticDeprecatedSince20,
74 TypedDictExtraConfigWarning,
75 UnsupportedFieldAttributeWarning,
76)
77from . import _decorators, _discriminated_union, _known_annotated_metadata, _repr, _typing_extra
78from ._config import ConfigWrapper, ConfigWrapperStack
79from ._core_metadata import CoreMetadata, update_core_metadata
80from ._core_utils import (
81 get_ref,
82 get_type_ref,
83 is_list_like_schema_with_items_schema,
84)
85from ._decorators import (
86 Decorator,
87 DecoratorInfos,
88 FieldSerializerDecoratorInfo,
89 FieldValidatorDecoratorInfo,
90 ModelSerializerDecoratorInfo,
91 ModelValidatorDecoratorInfo,
92 RootValidatorDecoratorInfo,
93 ValidatorDecoratorInfo,
94 get_attribute_from_bases,
95 inspect_field_serializer,
96 inspect_model_serializer,
97 inspect_validator,
98)
99from ._docs_extraction import extract_docstrings_from_cls
100from ._fields import (
101 collect_dataclass_fields,
102 rebuild_dataclass_fields,
103 rebuild_model_fields,
104 takes_validated_data_argument,
105 update_field_from_config,
106)
107from ._forward_ref import PydanticRecursiveRef
108from ._generics import get_standard_typevars_map, replace_types
109from ._import_utils import import_cached_base_model, import_cached_field_info
110from ._mock_val_ser import MockCoreSchema
111from ._namespace_utils import NamespacesTuple, NsResolver
112from ._schema_gather import MissingDefinitionError, gather_schemas_for_cleaning
113from ._schema_generation_shared import CallbackGetCoreSchemaHandler
114from ._utils import lenient_issubclass, smart_deepcopy
115
116if TYPE_CHECKING:
117 from ..fields import ComputedFieldInfo, FieldInfo
118 from ..main import BaseModel
119 from ..types import Discriminator
120 from ._dataclasses import StandardDataclass
121 from ._schema_generation_shared import GetJsonSchemaFunction
122
123_SUPPORTS_TYPEDDICT = sys.version_info >= (3, 12)
124
125FieldDecoratorInfo = Union[ValidatorDecoratorInfo, FieldValidatorDecoratorInfo, FieldSerializerDecoratorInfo]
126FieldDecoratorInfoType = TypeVar('FieldDecoratorInfoType', bound=FieldDecoratorInfo)
127AnyFieldDecorator = Union[
128 Decorator[ValidatorDecoratorInfo],
129 Decorator[FieldValidatorDecoratorInfo],
130 Decorator[FieldSerializerDecoratorInfo],
131]
132
133ModifyCoreSchemaWrapHandler: TypeAlias = GetCoreSchemaHandler
134GetCoreSchemaFunction: TypeAlias = Callable[[Any, ModifyCoreSchemaWrapHandler], core_schema.CoreSchema]
135ParametersCallback: TypeAlias = "Callable[[int, str, Any], Literal['skip'] | None]"
136
137TUPLE_TYPES: list[type] = [typing.Tuple, tuple] # noqa: UP006
138LIST_TYPES: list[type] = [typing.List, list, collections.abc.MutableSequence] # noqa: UP006
139SET_TYPES: list[type] = [typing.Set, set, collections.abc.MutableSet] # noqa: UP006
140FROZEN_SET_TYPES: list[type] = [typing.FrozenSet, frozenset, collections.abc.Set] # noqa: UP006
141DICT_TYPES: list[type] = [typing.Dict, dict] # noqa: UP006
142IP_TYPES: list[type] = [IPv4Address, IPv4Interface, IPv4Network, IPv6Address, IPv6Interface, IPv6Network]
143SEQUENCE_TYPES: list[type] = [typing.Sequence, collections.abc.Sequence]
144ITERABLE_TYPES: list[type] = [typing.Iterable, collections.abc.Iterable, typing.Generator, collections.abc.Generator]
145TYPE_TYPES: list[type] = [typing.Type, type] # noqa: UP006
146PATTERN_TYPES: list[type] = [typing.Pattern, re.Pattern]
147PATH_TYPES: list[type] = [
148 os.PathLike,
149 pathlib.Path,
150 pathlib.PurePath,
151 pathlib.PosixPath,
152 pathlib.PurePosixPath,
153 pathlib.PureWindowsPath,
154]
155MAPPING_TYPES = [
156 typing.Mapping,
157 typing.MutableMapping,
158 collections.abc.Mapping,
159 collections.abc.MutableMapping,
160 collections.OrderedDict,
161 typing_extensions.OrderedDict,
162 typing.DefaultDict, # noqa: UP006
163 collections.defaultdict,
164]
165COUNTER_TYPES = [collections.Counter, typing.Counter]
166DEQUE_TYPES: list[type] = [collections.deque, typing.Deque] # noqa: UP006
167
168# Note: This does not play very well with type checkers. For example,
169# `a: LambdaType = lambda x: x` will raise a type error by Pyright.
170ValidateCallSupportedTypes = Union[
171 LambdaType,
172 FunctionType,
173 MethodType,
174 partial,
175]
176
177VALIDATE_CALL_SUPPORTED_TYPES = get_args(ValidateCallSupportedTypes)
178UNSUPPORTED_STANDALONE_FIELDINFO_ATTRIBUTES: list[tuple[str, Any]] = [
179 ('alias', None),
180 ('validation_alias', None),
181 ('serialization_alias', None),
182 # will be set if any alias is set, so disable it to avoid double warnings:
183 # 'alias_priority',
184 ('default', PydanticUndefined),
185 ('default_factory', None),
186 ('exclude', None),
187 ('deprecated', None),
188 ('repr', True),
189 ('validate_default', None),
190 ('frozen', None),
191 ('init', None),
192 ('init_var', None),
193 ('kw_only', None),
194]
195"""`FieldInfo` attributes (and their default value) that can't be used outside of a model (e.g. in a type adapter or a PEP 695 type alias)."""
196
197_mode_to_validator: dict[
198 FieldValidatorModes, type[BeforeValidator | AfterValidator | PlainValidator | WrapValidator]
199] = {'before': BeforeValidator, 'after': AfterValidator, 'plain': PlainValidator, 'wrap': WrapValidator}
200
201
202def check_validator_fields_against_field_name(
203 info: FieldDecoratorInfo,
204 field: str,
205) -> bool:
206 """Check if field name is in validator fields.
207
208 Args:
209 info: The field info.
210 field: The field name to check.
211
212 Returns:
213 `True` if field name is in validator fields, `False` otherwise.
214 """
215 fields = info.fields
216 return '*' in fields or field in fields
217
218
219def check_decorator_fields_exist(decorators: Iterable[AnyFieldDecorator], fields: Iterable[str]) -> None:
220 """Check if the defined fields in decorators exist in `fields` param.
221
222 It ignores the check for a decorator if the decorator has `*` as field or `check_fields=False`.
223
224 Args:
225 decorators: An iterable of decorators.
226 fields: An iterable of fields name.
227
228 Raises:
229 PydanticUserError: If one of the field names does not exist in `fields` param.
230 """
231 fields = set(fields)
232 for dec in decorators:
233 if '*' in dec.info.fields:
234 continue
235 if dec.info.check_fields is False:
236 continue
237 for field in dec.info.fields:
238 if field not in fields:
239 raise PydanticUserError(
240 f'Decorators defined with incorrect fields: {dec.cls_ref}.{dec.cls_var_name}'
241 " (use check_fields=False if you're inheriting from the model and intended this)",
242 code='decorator-missing-field',
243 )
244
245
246def filter_field_decorator_info_by_field(
247 validator_functions: Iterable[Decorator[FieldDecoratorInfoType]], field: str
248) -> list[Decorator[FieldDecoratorInfoType]]:
249 return [dec for dec in validator_functions if check_validator_fields_against_field_name(dec.info, field)]
250
251
252def apply_each_item_validators(
253 schema: core_schema.CoreSchema,
254 each_item_validators: list[Decorator[ValidatorDecoratorInfo]],
255) -> core_schema.CoreSchema:
256 # This V1 compatibility shim should eventually be removed
257
258 # fail early if each_item_validators is empty
259 if not each_item_validators:
260 return schema
261
262 # push down any `each_item=True` validators
263 # note that this won't work for any Annotated types that get wrapped by a function validator
264 # but that's okay because that didn't exist in V1
265 if schema['type'] == 'nullable':
266 schema['schema'] = apply_each_item_validators(schema['schema'], each_item_validators)
267 return schema
268 elif schema['type'] == 'tuple':
269 if (variadic_item_index := schema.get('variadic_item_index')) is not None:
270 schema['items_schema'][variadic_item_index] = apply_validators(
271 schema['items_schema'][variadic_item_index],
272 each_item_validators,
273 )
274 elif is_list_like_schema_with_items_schema(schema):
275 inner_schema = schema.get('items_schema', core_schema.any_schema())
276 schema['items_schema'] = apply_validators(inner_schema, each_item_validators)
277 elif schema['type'] == 'dict':
278 inner_schema = schema.get('values_schema', core_schema.any_schema())
279 schema['values_schema'] = apply_validators(inner_schema, each_item_validators)
280 else:
281 raise TypeError(
282 f'`@validator(..., each_item=True)` cannot be applied to fields with a schema of {schema["type"]}'
283 )
284 return schema
285
286
287def _extract_json_schema_info_from_field_info(
288 info: FieldInfo | ComputedFieldInfo,
289) -> tuple[JsonDict | None, JsonDict | JsonSchemaExtraCallable | None]:
290 json_schema_updates = {
291 'title': info.title,
292 'description': info.description,
293 'deprecated': bool(info.deprecated) or info.deprecated == '' or None,
294 'examples': to_jsonable_python(info.examples),
295 }
296 json_schema_updates = {k: v for k, v in json_schema_updates.items() if v is not None}
297 return (json_schema_updates or None, info.json_schema_extra)
298
299
300JsonEncoders = dict[type[Any], JsonEncoder]
301
302
303def _add_custom_serialization_from_json_encoders(
304 json_encoders: JsonEncoders | None, tp: Any, schema: CoreSchema
305) -> CoreSchema:
306 """Iterate over the json_encoders and add the first matching encoder to the schema.
307
308 Args:
309 json_encoders: A dictionary of types and their encoder functions.
310 tp: The type to check for a matching encoder.
311 schema: The schema to add the encoder to.
312 """
313 if not json_encoders:
314 return schema
315 if 'serialization' in schema:
316 return schema
317 # Check the class type and its superclasses for a matching encoder
318 # Decimal.__class__.__mro__ (and probably other cases) doesn't include Decimal itself
319 # if the type is a GenericAlias (e.g. from list[int]) we need to use __class__ instead of .__mro__
320 for base in (tp, *getattr(tp, '__mro__', tp.__class__.__mro__)[:-1]):
321 encoder = json_encoders.get(base)
322 if encoder is None:
323 continue
324
325 warnings.warn(
326 f'`json_encoders` is deprecated. See https://docs.pydantic.dev/{version_short()}/concepts/serialization/#custom-serializers for alternatives',
327 PydanticDeprecatedSince20,
328 )
329
330 # TODO: in theory we should check that the schema accepts a serialization key
331 schema['serialization'] = core_schema.plain_serializer_function_ser_schema(encoder, when_used='json')
332 return schema
333
334 return schema
335
336
337GENERATE_SCHEMA_ERRORS = (
338 PydanticForbiddenQualifier,
339 PydanticInvalidForJsonSchema,
340 PydanticSchemaGenerationError,
341 PydanticUndefinedAnnotation,
342)
343"""Errors raised during core schema generation. This does *not* include `InvalidSchemaError`, which is raised during schema cleaning."""
344
345
346class InvalidSchemaError(Exception):
347 """The core schema is invalid."""
348
349
350class GenerateSchema:
351 """Generate core schema for a Pydantic model, dataclass and types like `str`, `datetime`, ... ."""
352
353 __slots__ = (
354 '_config_wrapper_stack',
355 '_ns_resolver',
356 '_typevars_map',
357 'field_name_stack',
358 'model_type_stack',
359 'defs',
360 )
361
362 def __init__(
363 self,
364 config_wrapper: ConfigWrapper,
365 ns_resolver: NsResolver | None = None,
366 typevars_map: Mapping[TypeVar, Any] | None = None,
367 ) -> None:
368 # we need a stack for recursing into nested models
369 self._config_wrapper_stack = ConfigWrapperStack(config_wrapper)
370 self._ns_resolver = ns_resolver or NsResolver()
371 self._typevars_map = typevars_map
372 self.field_name_stack = _FieldNameStack()
373 self.model_type_stack = _ModelTypeStack()
374 self.defs = _Definitions()
375
376 def __init_subclass__(cls) -> None:
377 super().__init_subclass__()
378 warnings.warn(
379 'Subclassing `GenerateSchema` is not supported. The API is highly subject to change in minor versions.',
380 UserWarning,
381 stacklevel=2,
382 )
383
384 @property
385 def _config_wrapper(self) -> ConfigWrapper:
386 return self._config_wrapper_stack.tail
387
388 @property
389 def _types_namespace(self) -> NamespacesTuple:
390 return self._ns_resolver.types_namespace
391
392 @property
393 def _arbitrary_types(self) -> bool:
394 return self._config_wrapper.arbitrary_types_allowed
395
396 # the following methods can be overridden but should be considered
397 # unstable / private APIs
398 def _list_schema(self, items_type: Any) -> CoreSchema:
399 return core_schema.list_schema(self.generate_schema(items_type))
400
401 def _dict_schema(self, keys_type: Any, values_type: Any) -> CoreSchema:
402 return core_schema.dict_schema(self.generate_schema(keys_type), self.generate_schema(values_type))
403
404 def _set_schema(self, items_type: Any) -> CoreSchema:
405 return core_schema.set_schema(self.generate_schema(items_type))
406
407 def _frozenset_schema(self, items_type: Any) -> CoreSchema:
408 return core_schema.frozenset_schema(self.generate_schema(items_type))
409
410 def _enum_schema(self, enum_type: type[Enum]) -> CoreSchema:
411 cases: list[Any] = list(enum_type.__members__.values())
412
413 enum_ref = get_type_ref(enum_type)
414 description = None if not enum_type.__doc__ else inspect.cleandoc(enum_type.__doc__)
415 if (
416 description == 'An enumeration.'
417 ): # This is the default value provided by enum.EnumMeta.__new__; don't use it
418 description = None
419 js_updates = {'title': enum_type.__name__, 'description': description}
420 js_updates = {k: v for k, v in js_updates.items() if v is not None}
421
422 sub_type: Literal['str', 'int', 'float'] | None = None
423 if issubclass(enum_type, int):
424 sub_type = 'int'
425 value_ser_type: core_schema.SerSchema = core_schema.simple_ser_schema('int')
426 elif issubclass(enum_type, str):
427 # this handles `StrEnum` (3.11 only), and also `Foobar(str, Enum)`
428 sub_type = 'str'
429 value_ser_type = core_schema.simple_ser_schema('str')
430 elif issubclass(enum_type, float):
431 sub_type = 'float'
432 value_ser_type = core_schema.simple_ser_schema('float')
433 else:
434 # TODO this is an ugly hack, how do we trigger an Any schema for serialization?
435 value_ser_type = core_schema.plain_serializer_function_ser_schema(lambda x: x)
436
437 if cases:
438
439 def get_json_schema(schema: CoreSchema, handler: GetJsonSchemaHandler) -> JsonSchemaValue:
440 json_schema = handler(schema)
441 original_schema = handler.resolve_ref_schema(json_schema)
442 original_schema.update(js_updates)
443 return json_schema
444
445 # we don't want to add the missing to the schema if it's the default one
446 default_missing = getattr(enum_type._missing_, '__func__', None) is Enum._missing_.__func__ # pyright: ignore[reportFunctionMemberAccess]
447 enum_schema = core_schema.enum_schema(
448 enum_type,
449 cases,
450 sub_type=sub_type,
451 missing=None if default_missing else enum_type._missing_,
452 ref=enum_ref,
453 metadata={'pydantic_js_functions': [get_json_schema]},
454 )
455
456 if self._config_wrapper.use_enum_values:
457 enum_schema = core_schema.no_info_after_validator_function(
458 attrgetter('value'), enum_schema, serialization=value_ser_type
459 )
460
461 return enum_schema
462
463 else:
464
465 def get_json_schema_no_cases(_, handler: GetJsonSchemaHandler) -> JsonSchemaValue:
466 json_schema = handler(core_schema.enum_schema(enum_type, cases, sub_type=sub_type, ref=enum_ref))
467 original_schema = handler.resolve_ref_schema(json_schema)
468 original_schema.update(js_updates)
469 return json_schema
470
471 # Use an isinstance check for enums with no cases.
472 # The most important use case for this is creating TypeVar bounds for generics that should
473 # be restricted to enums. This is more consistent than it might seem at first, since you can only
474 # subclass enum.Enum (or subclasses of enum.Enum) if all parent classes have no cases.
475 # We use the get_json_schema function when an Enum subclass has been declared with no cases
476 # so that we can still generate a valid json schema.
477 return core_schema.is_instance_schema(
478 enum_type,
479 metadata={'pydantic_js_functions': [get_json_schema_no_cases]},
480 )
481
482 def _ip_schema(self, tp: Any) -> CoreSchema:
483 from ._validators import IP_VALIDATOR_LOOKUP, IpType
484
485 ip_type_json_schema_format: dict[type[IpType], str] = {
486 IPv4Address: 'ipv4',
487 IPv4Network: 'ipv4network',
488 IPv4Interface: 'ipv4interface',
489 IPv6Address: 'ipv6',
490 IPv6Network: 'ipv6network',
491 IPv6Interface: 'ipv6interface',
492 }
493
494 def ser_ip(ip: Any, info: core_schema.SerializationInfo) -> str | IpType:
495 if not isinstance(ip, (tp, str)):
496 raise PydanticSerializationUnexpectedValue(
497 f"Expected `{tp}` but got `{type(ip)}` with value `'{ip}'` - serialized value may not be as expected."
498 )
499 if info.mode == 'python':
500 return ip
501 return str(ip)
502
503 return core_schema.lax_or_strict_schema(
504 lax_schema=core_schema.no_info_plain_validator_function(IP_VALIDATOR_LOOKUP[tp]),
505 strict_schema=core_schema.json_or_python_schema(
506 json_schema=core_schema.no_info_after_validator_function(tp, core_schema.str_schema()),
507 python_schema=core_schema.is_instance_schema(tp),
508 ),
509 serialization=core_schema.plain_serializer_function_ser_schema(ser_ip, info_arg=True, when_used='always'),
510 metadata={
511 'pydantic_js_functions': [lambda _1, _2: {'type': 'string', 'format': ip_type_json_schema_format[tp]}]
512 },
513 )
514
515 def _path_schema(self, tp: Any, path_type: Any) -> CoreSchema:
516 if tp is os.PathLike and (path_type not in {str, bytes} and not typing_objects.is_any(path_type)):
517 raise PydanticUserError(
518 '`os.PathLike` can only be used with `str`, `bytes` or `Any`', code='schema-for-unknown-type'
519 )
520
521 path_constructor = pathlib.PurePath if tp is os.PathLike else tp
522 strict_inner_schema = (
523 core_schema.bytes_schema(strict=True) if (path_type is bytes) else core_schema.str_schema(strict=True)
524 )
525 lax_inner_schema = core_schema.bytes_schema() if (path_type is bytes) else core_schema.str_schema()
526
527 def path_validator(input_value: str | bytes) -> os.PathLike[Any]: # type: ignore
528 try:
529 if path_type is bytes:
530 if isinstance(input_value, bytes):
531 try:
532 input_value = input_value.decode()
533 except UnicodeDecodeError as e:
534 raise PydanticCustomError('bytes_type', 'Input must be valid bytes') from e
535 else:
536 raise PydanticCustomError('bytes_type', 'Input must be bytes')
537 elif not isinstance(input_value, str):
538 raise PydanticCustomError('path_type', 'Input is not a valid path')
539
540 return path_constructor(input_value) # type: ignore
541 except TypeError as e:
542 raise PydanticCustomError('path_type', 'Input is not a valid path') from e
543
544 def ser_path(path: Any, info: core_schema.SerializationInfo) -> str | os.PathLike[Any]:
545 if not isinstance(path, (tp, str)):
546 raise PydanticSerializationUnexpectedValue(
547 f"Expected `{tp}` but got `{type(path)}` with value `'{path}'` - serialized value may not be as expected."
548 )
549 if info.mode == 'python':
550 return path
551 return str(path)
552
553 instance_schema = core_schema.json_or_python_schema(
554 json_schema=core_schema.no_info_after_validator_function(path_validator, lax_inner_schema),
555 python_schema=core_schema.is_instance_schema(tp),
556 )
557
558 schema = core_schema.lax_or_strict_schema(
559 lax_schema=core_schema.union_schema(
560 [
561 instance_schema,
562 core_schema.no_info_after_validator_function(path_validator, strict_inner_schema),
563 ],
564 custom_error_type='path_type',
565 custom_error_message=f'Input is not a valid path for {tp}',
566 ),
567 strict_schema=instance_schema,
568 serialization=core_schema.plain_serializer_function_ser_schema(ser_path, info_arg=True, when_used='always'),
569 metadata={'pydantic_js_functions': [lambda source, handler: {**handler(source), 'format': 'path'}]},
570 )
571 return schema
572
573 def _deque_schema(self, items_type: Any) -> CoreSchema:
574 from ._serializers import serialize_sequence_via_list
575 from ._validators import deque_validator
576
577 item_type_schema = self.generate_schema(items_type)
578
579 # we have to use a lax list schema here, because we need to validate the deque's
580 # items via a list schema, but it's ok if the deque itself is not a list
581 list_schema = core_schema.list_schema(item_type_schema, strict=False)
582
583 check_instance = core_schema.json_or_python_schema(
584 json_schema=list_schema,
585 python_schema=core_schema.is_instance_schema(collections.deque, cls_repr='Deque'),
586 )
587
588 lax_schema = core_schema.no_info_wrap_validator_function(deque_validator, list_schema)
589
590 return core_schema.lax_or_strict_schema(
591 lax_schema=lax_schema,
592 strict_schema=core_schema.chain_schema([check_instance, lax_schema]),
593 serialization=core_schema.wrap_serializer_function_ser_schema(
594 serialize_sequence_via_list, schema=item_type_schema, info_arg=True
595 ),
596 )
597
598 def _mapping_schema(self, tp: Any, keys_type: Any, values_type: Any) -> CoreSchema:
599 from ._validators import MAPPING_ORIGIN_MAP, defaultdict_validator, get_defaultdict_default_default_factory
600
601 mapped_origin = MAPPING_ORIGIN_MAP[tp]
602 keys_schema = self.generate_schema(keys_type)
603 with warnings.catch_warnings():
604 # We kind of abused `Field()` default factories to be able to specify
605 # the `defaultdict`'s `default_factory`. As a consequence, we get warnings
606 # as normally `FieldInfo.default_factory` is unsupported in the context where
607 # `Field()` is used and our only solution is to ignore them (note that this might
608 # wrongfully ignore valid warnings, e.g. if the `value_type` is a PEP 695 type alias
609 # with unsupported metadata).
610 warnings.simplefilter('ignore', category=UnsupportedFieldAttributeWarning)
611 values_schema = self.generate_schema(values_type)
612 dict_schema = core_schema.dict_schema(keys_schema, values_schema, strict=False)
613
614 if mapped_origin is dict:
615 schema = dict_schema
616 else:
617 check_instance = core_schema.json_or_python_schema(
618 json_schema=dict_schema,
619 python_schema=core_schema.is_instance_schema(mapped_origin),
620 )
621
622 if tp is collections.defaultdict:
623 default_default_factory = get_defaultdict_default_default_factory(values_type)
624 coerce_instance_wrap = partial(
625 core_schema.no_info_wrap_validator_function,
626 partial(defaultdict_validator, default_default_factory=default_default_factory),
627 )
628 else:
629 coerce_instance_wrap = partial(core_schema.no_info_after_validator_function, mapped_origin)
630
631 lax_schema = coerce_instance_wrap(dict_schema)
632 strict_schema = core_schema.chain_schema([check_instance, lax_schema])
633
634 schema = core_schema.lax_or_strict_schema(
635 lax_schema=lax_schema,
636 strict_schema=strict_schema,
637 serialization=core_schema.wrap_serializer_function_ser_schema(
638 lambda v, h: h(v), schema=dict_schema, info_arg=False
639 ),
640 )
641
642 return schema
643
644 def _fraction_schema(self) -> CoreSchema:
645 """Support for [`fractions.Fraction`][fractions.Fraction]."""
646 from ._validators import fraction_validator
647
648 # TODO: note, this is a fairly common pattern, re lax / strict for attempted type coercion,
649 # can we use a helper function to reduce boilerplate?
650 return core_schema.lax_or_strict_schema(
651 lax_schema=core_schema.no_info_plain_validator_function(fraction_validator),
652 strict_schema=core_schema.json_or_python_schema(
653 json_schema=core_schema.no_info_plain_validator_function(fraction_validator),
654 python_schema=core_schema.is_instance_schema(Fraction),
655 ),
656 # use str serialization to guarantee round trip behavior
657 serialization=core_schema.to_string_ser_schema(when_used='always'),
658 metadata={'pydantic_js_functions': [lambda _1, _2: {'type': 'string', 'format': 'fraction'}]},
659 )
660
661 def _arbitrary_type_schema(self, tp: Any) -> CoreSchema:
662 if not isinstance(tp, type):
663 warnings.warn(
664 f'{tp!r} is not a Python type (it may be an instance of an object),'
665 ' Pydantic will allow any object with no validation since we cannot even'
666 ' enforce that the input is an instance of the given type.'
667 ' To get rid of this error wrap the type with `pydantic.SkipValidation`.',
668 ArbitraryTypeWarning,
669 )
670 return core_schema.any_schema()
671 return core_schema.is_instance_schema(tp)
672
673 def _unknown_type_schema(self, obj: Any) -> CoreSchema:
674 raise PydanticSchemaGenerationError(
675 f'Unable to generate pydantic-core schema for {obj!r}. '
676 'Set `arbitrary_types_allowed=True` in the model_config to ignore this error'
677 ' or implement `__get_pydantic_core_schema__` on your type to fully support it.'
678 '\n\nIf you got this error by calling handler(<some type>) within'
679 ' `__get_pydantic_core_schema__` then you likely need to call'
680 ' `handler.generate_schema(<some type>)` since we do not call'
681 ' `__get_pydantic_core_schema__` on `<some type>` otherwise to avoid infinite recursion.'
682 )
683
684 def _apply_discriminator_to_union(
685 self, schema: CoreSchema, discriminator: str | Discriminator | None
686 ) -> CoreSchema:
687 if discriminator is None:
688 return schema
689 try:
690 return _discriminated_union.apply_discriminator(
691 schema,
692 discriminator,
693 self.defs._definitions,
694 )
695 except _discriminated_union.MissingDefinitionForUnionRef:
696 # defer until defs are resolved
697 _discriminated_union.set_discriminator_in_metadata(
698 schema,
699 discriminator,
700 )
701 return schema
702
703 def clean_schema(self, schema: CoreSchema) -> CoreSchema:
704 return self.defs.finalize_schema(schema)
705
706 def _add_js_function(self, metadata_schema: CoreSchema, js_function: Callable[..., Any]) -> None:
707 metadata = metadata_schema.get('metadata', {})
708 pydantic_js_functions = metadata.setdefault('pydantic_js_functions', [])
709 # because of how we generate core schemas for nested generic models
710 # we can end up adding `BaseModel.__get_pydantic_json_schema__` multiple times
711 # this check may fail to catch duplicates if the function is a `functools.partial`
712 # or something like that, but if it does it'll fail by inserting the duplicate
713 if js_function not in pydantic_js_functions:
714 pydantic_js_functions.append(js_function)
715 metadata_schema['metadata'] = metadata
716
717 def generate_schema(
718 self,
719 obj: Any,
720 ) -> core_schema.CoreSchema:
721 """Generate core schema.
722
723 Args:
724 obj: The object to generate core schema for.
725
726 Returns:
727 The generated core schema.
728
729 Raises:
730 PydanticUndefinedAnnotation:
731 If it is not possible to evaluate forward reference.
732 PydanticSchemaGenerationError:
733 If it is not possible to generate pydantic-core schema.
734 TypeError:
735 - If `alias_generator` returns a disallowed type (must be str, AliasPath or AliasChoices).
736 - If V1 style validator with `each_item=True` applied on a wrong field.
737 PydanticUserError:
738 - If `typing.TypedDict` is used instead of `typing_extensions.TypedDict` on Python < 3.12.
739 - If `__modify_schema__` method is used instead of `__get_pydantic_json_schema__`.
740 """
741 schema = self._generate_schema_from_get_schema_method(obj, obj)
742
743 if schema is None:
744 schema = self._generate_schema_inner(obj)
745
746 metadata_js_function = _extract_get_pydantic_json_schema(obj)
747 if metadata_js_function is not None:
748 metadata_schema = resolve_original_schema(schema, self.defs)
749 if metadata_schema:
750 self._add_js_function(metadata_schema, metadata_js_function)
751
752 schema = _add_custom_serialization_from_json_encoders(self._config_wrapper.json_encoders, obj, schema)
753
754 return schema
755
756 def _model_schema(self, cls: type[BaseModel]) -> core_schema.CoreSchema:
757 """Generate schema for a Pydantic model."""
758 BaseModel_ = import_cached_base_model()
759
760 with self.defs.get_schema_or_ref(cls) as (model_ref, maybe_schema):
761 if maybe_schema is not None:
762 return maybe_schema
763
764 schema = cls.__dict__.get('__pydantic_core_schema__')
765 if schema is not None and not isinstance(schema, MockCoreSchema):
766 if schema['type'] == 'definitions':
767 schema = self.defs.unpack_definitions(schema)
768 ref = get_ref(schema)
769 if ref:
770 return self.defs.create_definition_reference_schema(schema)
771 else:
772 return schema
773
774 config_wrapper = ConfigWrapper(cls.model_config, check=False)
775
776 with self._config_wrapper_stack.push(config_wrapper), self._ns_resolver.push(cls):
777 core_config = self._config_wrapper.core_config(title=cls.__name__)
778
779 if cls.__pydantic_fields_complete__ or cls is BaseModel_:
780 fields = getattr(cls, '__pydantic_fields__', {})
781 extra_info = getattr(cls, '__pydantic_extra_info__', None)
782 else:
783 if '__pydantic_fields__' not in cls.__dict__:
784 # This happens when we have a loop in the schema generation:
785 # class Base[T](BaseModel):
786 # t: T
787 #
788 # class Other(BaseModel):
789 # b: 'Base[Other]'
790 # When we build fields for `Other`, we evaluate the forward annotation.
791 # At this point, `Other` doesn't have the model fields set. We create
792 # `Base[Other]`; model fields are successfully built, and we try to generate
793 # a schema for `t: Other`. As `Other.__pydantic_fields__` aren't set, we abort.
794 raise PydanticUndefinedAnnotation(
795 name=cls.__name__,
796 message=f'Class {cls.__name__!r} is not defined',
797 )
798 try:
799 fields, extra_info = rebuild_model_fields(
800 cls,
801 config_wrapper=self._config_wrapper,
802 ns_resolver=self._ns_resolver,
803 typevars_map=self._typevars_map or {},
804 )
805 except NameError as e:
806 raise PydanticUndefinedAnnotation.from_name_error(e) from e
807
808 decorators = cls.__pydantic_decorators__
809 computed_fields = decorators.computed_fields
810 check_decorator_fields_exist(
811 chain(
812 decorators.field_validators.values(),
813 decorators.field_serializers.values(),
814 decorators.validators.values(),
815 ),
816 {*fields.keys(), *computed_fields.keys()},
817 )
818
819 model_validators = decorators.model_validators.values()
820
821 extras_schema = None
822 extras_keys_schema = None
823 if core_config.get('extra_fields_behavior') == 'allow' and extra_info is not None:
824 tp = get_origin(extra_info.annotation)
825 if tp not in DICT_TYPES:
826 raise PydanticSchemaGenerationError(
827 'The type annotation for `__pydantic_extra__` must be `dict[str, ...]`'
828 )
829 # See the comments in `_get_args_resolving_forward_refs()` for why we need
830 # to re-evaluate the annotation:
831 extra_keys_type, extra_items_type = self._get_args_resolving_forward_refs(
832 extra_info.annotation,
833 required=True,
834 )
835 if extra_keys_type is not str:
836 extras_keys_schema = self.generate_schema(extra_keys_type)
837 if not typing_objects.is_any(extra_items_type):
838 extras_schema = self.generate_schema(extra_items_type)
839
840 generic_origin: type[BaseModel] | None = getattr(cls, '__pydantic_generic_metadata__', {}).get('origin')
841
842 if cls.__pydantic_root_model__:
843 inner_schema, metadata = self._common_field_schema('root', fields['root'], decorators)
844 if cls.__doc__ and metadata.get('pydantic_js_updates', {}).get('description'):
845 # This is a bit of a leaky abstraction, but as the model docstring takes priority
846 # over the root field's description, we need to override it here. This can't be done
847 # in the JSON Schema generation logic because the metadata's `pydantic_js_updates` are
848 # applied last, and overrides any value previously set (so the description set from the
849 # docstring in `GenerateJsonSchema._update_class_schema()` is overridden):
850 update_core_metadata(
851 metadata, pydantic_js_updates={'description': inspect.cleandoc(cls.__doc__)}
852 )
853
854 inner_schema = apply_model_validators(inner_schema, model_validators, 'inner')
855 model_schema = core_schema.model_schema(
856 cls,
857 inner_schema,
858 generic_origin=generic_origin,
859 custom_init=getattr(cls, '__pydantic_custom_init__', None),
860 root_model=True,
861 post_init=getattr(cls, '__pydantic_post_init__', None),
862 config=core_config,
863 ref=model_ref,
864 metadata=metadata,
865 )
866 else:
867 fields_schema: core_schema.CoreSchema = core_schema.model_fields_schema(
868 {k: self._generate_md_field_schema(k, v, decorators) for k, v in fields.items()},
869 computed_fields=[
870 self._computed_field_schema(d, decorators.field_serializers)
871 for d in computed_fields.values()
872 ],
873 extras_schema=extras_schema,
874 extras_keys_schema=extras_keys_schema,
875 model_name=cls.__name__,
876 )
877 inner_schema = apply_validators(fields_schema, decorators.root_validators.values())
878 inner_schema = apply_model_validators(inner_schema, model_validators, 'inner')
879
880 model_schema = core_schema.model_schema(
881 cls,
882 inner_schema,
883 generic_origin=generic_origin,
884 custom_init=getattr(cls, '__pydantic_custom_init__', None),
885 root_model=False,
886 post_init=getattr(cls, '__pydantic_post_init__', None),
887 config=core_config,
888 ref=model_ref,
889 )
890
891 schema = self._apply_model_serializers(model_schema, decorators.model_serializers.values())
892 schema = apply_model_validators(schema, model_validators, 'outer')
893 return self.defs.create_definition_reference_schema(schema)
894
895 def _resolve_self_type(self, obj: Any) -> Any:
896 obj = self.model_type_stack.get()
897 if obj is None:
898 raise PydanticUserError('`typing.Self` is invalid in this context', code='invalid-self-type')
899 return obj
900
901 def _generate_schema_from_get_schema_method(self, obj: Any, source: Any) -> core_schema.CoreSchema | None:
902 BaseModel_ = import_cached_base_model()
903
904 get_schema = getattr(obj, '__get_pydantic_core_schema__', None)
905 is_base_model_get_schema = (
906 getattr(get_schema, '__func__', None) is BaseModel_.__get_pydantic_core_schema__.__func__ # pyright: ignore[reportFunctionMemberAccess]
907 )
908
909 if (
910 get_schema is not None
911 # BaseModel.__get_pydantic_core_schema__ is defined for backwards compatibility,
912 # to allow existing code to call `super().__get_pydantic_core_schema__` in Pydantic
913 # model that overrides `__get_pydantic_core_schema__`. However, it raises a deprecation
914 # warning stating that the method will be removed, and during the core schema gen we actually
915 # don't call the method:
916 and not is_base_model_get_schema
917 ):
918 # Some referenceable types might have a `__get_pydantic_core_schema__` method
919 # defined on it by users (e.g. on a dataclass). This generally doesn't play well
920 # as these types are already recognized by the `GenerateSchema` class and isn't ideal
921 # as we might end up calling `get_schema_or_ref` (expensive) on types that are actually
922 # not referenceable:
923 with self.defs.get_schema_or_ref(obj) as (_, maybe_schema):
924 if maybe_schema is not None:
925 return maybe_schema
926
927 if obj is source:
928 ref_mode = 'unpack'
929 else:
930 ref_mode = 'to-def'
931 schema = get_schema(
932 source, CallbackGetCoreSchemaHandler(self._generate_schema_inner, self, ref_mode=ref_mode)
933 )
934 if schema['type'] == 'definitions':
935 schema = self.defs.unpack_definitions(schema)
936
937 ref = get_ref(schema)
938 if ref:
939 return self.defs.create_definition_reference_schema(schema)
940
941 # Note: if schema is of type `'definition-ref'`, we might want to copy it as a
942 # safety measure (because these are inlined in place -- i.e. mutated directly)
943 return schema
944
945 if get_schema is None and (validators := getattr(obj, '__get_validators__', None)) is not None:
946 from pydantic.v1 import BaseModel as BaseModelV1
947
948 if issubclass(obj, BaseModelV1):
949 warnings.warn(
950 f'Mixing V1 models and V2 models (or constructs, like `TypeAdapter`) is not supported. Please upgrade `{obj.__name__}` to V2.',
951 UserWarning,
952 )
953 else:
954 warnings.warn(
955 '`__get_validators__` is deprecated and will be removed, use `__get_pydantic_core_schema__` instead.',
956 PydanticDeprecatedSince20,
957 )
958 return core_schema.chain_schema([core_schema.with_info_plain_validator_function(v) for v in validators()])
959
960 def _resolve_forward_ref(self, obj: Any) -> Any:
961 # we assume that types_namespace has the target of forward references in its scope,
962 # but this could fail, for example, if calling Validator on an imported type which contains
963 # forward references to other types only defined in the module from which it was imported
964 # `Validator(SomeImportedTypeAliasWithAForwardReference)`
965 # or the equivalent for BaseModel
966 # class Model(BaseModel):
967 # x: SomeImportedTypeAliasWithAForwardReference
968 try:
969 obj = _typing_extra.eval_type_backport(obj, *self._types_namespace)
970 except NameError as e:
971 raise PydanticUndefinedAnnotation.from_name_error(e) from e
972
973 # if obj is still a ForwardRef, it means we can't evaluate it, raise PydanticUndefinedAnnotation
974 if isinstance(obj, ForwardRef):
975 raise PydanticUndefinedAnnotation(obj.__forward_arg__, f'Unable to evaluate forward reference {obj}')
976
977 if self._typevars_map:
978 obj = replace_types(obj, self._typevars_map)
979
980 return obj
981
982 @overload
983 def _get_args_resolving_forward_refs(self, obj: Any, required: Literal[True]) -> tuple[Any, ...]: ...
984
985 @overload
986 def _get_args_resolving_forward_refs(self, obj: Any) -> tuple[Any, ...] | None: ...
987
988 def _get_args_resolving_forward_refs(self, obj: Any, required: bool = False) -> tuple[Any, ...] | None:
989 args = get_args(obj)
990 if args:
991 if isinstance(obj, GenericAlias):
992 # PEP 585 generic aliases don't convert args to ForwardRefs, unlike `typing.List/Dict` etc.
993 # This was fixed in https://github.com/python/cpython/pull/30900 (Python 3.11).
994 # TODO: this shouldn't be necessary (probably even this `_get_args_resolving_forward_refs()` function)
995 # once we drop support for Python 3.10 *or* if we implement our own `typing._eval_type()` implementation.
996 args = (_typing_extra._make_forward_ref(a) if isinstance(a, str) else a for a in args)
997 args = tuple(self._resolve_forward_ref(a) if isinstance(a, ForwardRef) else a for a in args)
998 elif required: # pragma: no cover
999 raise TypeError(f'Expected {obj} to have generic parameters but it had none')
1000 return args
1001
1002 def _get_first_arg_or_any(self, obj: Any) -> Any:
1003 args = self._get_args_resolving_forward_refs(obj)
1004 if not args:
1005 return Any
1006 return args[0]
1007
1008 def _get_first_two_args_or_any(self, obj: Any) -> tuple[Any, Any]:
1009 args = self._get_args_resolving_forward_refs(obj)
1010 if not args:
1011 return (Any, Any)
1012 if len(args) < 2:
1013 origin = get_origin(obj)
1014 raise TypeError(f'Expected two type arguments for {origin}, got 1')
1015 return args[0], args[1]
1016
1017 def _generate_schema_inner(self, obj: Any) -> core_schema.CoreSchema:
1018 if typing_objects.is_self(obj):
1019 obj = self._resolve_self_type(obj)
1020
1021 if typing_objects.is_annotated(get_origin(obj)):
1022 return self._annotated_schema(obj)
1023
1024 if isinstance(obj, dict):
1025 # we assume this is already a valid schema
1026 return obj # type: ignore[return-value]
1027
1028 if isinstance(obj, str):
1029 obj = ForwardRef(obj)
1030
1031 if isinstance(obj, ForwardRef):
1032 return self.generate_schema(self._resolve_forward_ref(obj))
1033
1034 BaseModel = import_cached_base_model()
1035
1036 if lenient_issubclass(obj, BaseModel):
1037 with self.model_type_stack.push(obj):
1038 return self._model_schema(obj)
1039
1040 if isinstance(obj, PydanticRecursiveRef):
1041 return core_schema.definition_reference_schema(schema_ref=obj.type_ref)
1042
1043 return self.match_type(obj)
1044
1045 def match_type(self, obj: Any) -> core_schema.CoreSchema: # noqa: C901
1046 """Main mapping of types to schemas.
1047
1048 The general structure is a series of if statements starting with the simple cases
1049 (non-generic primitive types) and then handling generics and other more complex cases.
1050
1051 Each case either generates a schema directly, calls into a public user-overridable method
1052 (like `GenerateSchema.tuple_variable_schema`) or calls into a private method that handles some
1053 boilerplate before calling into the user-facing method (e.g. `GenerateSchema._tuple_schema`).
1054
1055 The idea is that we'll evolve this into adding more and more user facing methods over time
1056 as they get requested and we figure out what the right API for them is.
1057 """
1058 if obj is str:
1059 return core_schema.str_schema()
1060 elif obj is bytes:
1061 return core_schema.bytes_schema()
1062 elif obj is int:
1063 return core_schema.int_schema()
1064 elif obj is float:
1065 return core_schema.float_schema()
1066 elif obj is bool:
1067 return core_schema.bool_schema()
1068 elif obj is complex:
1069 return core_schema.complex_schema()
1070 elif typing_objects.is_any(obj) or obj is object:
1071 return core_schema.any_schema()
1072 elif obj is datetime.date:
1073 return core_schema.date_schema()
1074 elif obj is datetime.datetime:
1075 return core_schema.datetime_schema()
1076 elif obj is datetime.time:
1077 return core_schema.time_schema()
1078 elif obj is datetime.timedelta:
1079 return core_schema.timedelta_schema()
1080 elif obj is Decimal:
1081 return core_schema.decimal_schema()
1082 elif obj is UUID:
1083 return core_schema.uuid_schema()
1084 elif obj is Url:
1085 return core_schema.url_schema()
1086 elif obj is Fraction:
1087 return self._fraction_schema()
1088 elif obj is MultiHostUrl:
1089 return core_schema.multi_host_url_schema()
1090 elif obj is None or obj is _typing_extra.NoneType:
1091 return core_schema.none_schema()
1092 if obj is MISSING:
1093 return core_schema.missing_sentinel_schema()
1094 elif obj in IP_TYPES:
1095 return self._ip_schema(obj)
1096 elif obj in TUPLE_TYPES:
1097 return self._tuple_schema(obj)
1098 elif obj in LIST_TYPES:
1099 return self._list_schema(Any)
1100 elif obj in SET_TYPES:
1101 return self._set_schema(Any)
1102 elif obj in FROZEN_SET_TYPES:
1103 return self._frozenset_schema(Any)
1104 elif obj in SEQUENCE_TYPES:
1105 return self._sequence_schema(Any)
1106 elif obj in ITERABLE_TYPES:
1107 return self._iterable_schema(obj)
1108 elif obj in DICT_TYPES:
1109 return self._dict_schema(Any, Any)
1110 elif obj in PATH_TYPES:
1111 return self._path_schema(obj, Any)
1112 elif obj in DEQUE_TYPES:
1113 return self._deque_schema(Any)
1114 elif obj in MAPPING_TYPES:
1115 return self._mapping_schema(obj, Any, Any)
1116 elif obj in COUNTER_TYPES:
1117 return self._mapping_schema(obj, Any, int)
1118 elif typing_objects.is_typealiastype(obj):
1119 return self._type_alias_type_schema(obj)
1120 elif obj is type:
1121 return self._type_schema()
1122 elif _typing_extra.is_callable(obj):
1123 return core_schema.callable_schema()
1124 elif typing_objects.is_literal(get_origin(obj)):
1125 return self._literal_schema(obj)
1126 elif is_typeddict(obj):
1127 return self._typed_dict_schema(obj, None)
1128 elif inspect.isclass(obj) and issubclass(obj, Enum):
1129 # NOTE: this must come before the `is_namedtuple()` check as enums values
1130 # can be namedtuples:
1131 return self._enum_schema(obj)
1132 elif _typing_extra.is_namedtuple(obj):
1133 return self._namedtuple_schema(obj, None)
1134 elif typing_objects.is_newtype(obj):
1135 # NewType, can't use isinstance because it fails <3.10
1136 return self.generate_schema(obj.__supertype__)
1137 elif obj in PATTERN_TYPES:
1138 return self._pattern_schema(obj)
1139 elif _typing_extra.is_hashable(obj):
1140 return self._hashable_schema()
1141 elif isinstance(obj, typing.TypeVar):
1142 return self._unsubstituted_typevar_schema(obj)
1143 elif _typing_extra.is_finalvar(obj):
1144 if obj is Final:
1145 return core_schema.any_schema()
1146 return self.generate_schema(
1147 self._get_first_arg_or_any(obj),
1148 )
1149 elif isinstance(obj, VALIDATE_CALL_SUPPORTED_TYPES):
1150 return self._call_schema(obj) # pyright: ignore[reportArgumentType]
1151 elif obj is ZoneInfo:
1152 return self._zoneinfo_schema()
1153
1154 # dataclasses.is_dataclass coerces dc instances to types, but we only handle
1155 # the case of a dc type here
1156 if dataclasses.is_dataclass(obj):
1157 return self._dataclass_schema(obj, None) # pyright: ignore[reportArgumentType]
1158
1159 origin = get_origin(obj)
1160 if origin is not None:
1161 return self._match_generic_type(obj, origin)
1162
1163 if self._arbitrary_types:
1164 return self._arbitrary_type_schema(obj)
1165 return self._unknown_type_schema(obj)
1166
1167 def _match_generic_type(self, obj: Any, origin: Any) -> CoreSchema: # noqa: C901
1168 # Need to handle generic dataclasses before looking for the schema properties because attribute accesses
1169 # on _GenericAlias delegate to the origin type, so lose the information about the concrete parametrization
1170 # As a result, currently, there is no way to cache the schema for generic dataclasses. This may be possible
1171 # to resolve by modifying the value returned by `Generic.__class_getitem__`, but that is a dangerous game.
1172 if dataclasses.is_dataclass(origin):
1173 return self._dataclass_schema(obj, origin) # pyright: ignore[reportArgumentType]
1174 if _typing_extra.is_namedtuple(origin):
1175 return self._namedtuple_schema(obj, origin)
1176
1177 schema = self._generate_schema_from_get_schema_method(origin, obj)
1178 if schema is not None:
1179 return schema
1180
1181 if typing_objects.is_typealiastype(origin):
1182 return self._type_alias_type_schema(obj)
1183 elif is_union_origin(origin):
1184 return self._union_schema(obj)
1185 elif origin in TUPLE_TYPES:
1186 return self._tuple_schema(obj)
1187 elif origin in LIST_TYPES:
1188 return self._list_schema(self._get_first_arg_or_any(obj))
1189 elif origin in SET_TYPES:
1190 return self._set_schema(self._get_first_arg_or_any(obj))
1191 elif origin in FROZEN_SET_TYPES:
1192 return self._frozenset_schema(self._get_first_arg_or_any(obj))
1193 elif origin in DICT_TYPES:
1194 return self._dict_schema(*self._get_first_two_args_or_any(obj))
1195 elif origin in PATH_TYPES:
1196 return self._path_schema(origin, self._get_first_arg_or_any(obj))
1197 elif origin in DEQUE_TYPES:
1198 return self._deque_schema(self._get_first_arg_or_any(obj))
1199 elif origin in MAPPING_TYPES:
1200 return self._mapping_schema(origin, *self._get_first_two_args_or_any(obj))
1201 elif origin in COUNTER_TYPES:
1202 return self._mapping_schema(origin, self._get_first_arg_or_any(obj), int)
1203 elif is_typeddict(origin):
1204 return self._typed_dict_schema(obj, origin)
1205 elif origin in TYPE_TYPES:
1206 return self._subclass_schema(obj)
1207 elif origin in SEQUENCE_TYPES:
1208 return self._sequence_schema(self._get_first_arg_or_any(obj))
1209 elif origin in ITERABLE_TYPES:
1210 return self._iterable_schema(obj)
1211 elif origin in PATTERN_TYPES:
1212 return self._pattern_schema(obj)
1213
1214 if self._arbitrary_types:
1215 return self._arbitrary_type_schema(origin)
1216 return self._unknown_type_schema(obj)
1217
1218 def _generate_td_field_schema(
1219 self,
1220 name: str,
1221 field_info: FieldInfo,
1222 decorators: DecoratorInfos,
1223 *,
1224 required: bool = True,
1225 ) -> core_schema.TypedDictField:
1226 """Prepare a TypedDictField to represent a model or typeddict field."""
1227 schema, metadata = self._common_field_schema(name, field_info, decorators)
1228 return core_schema.typed_dict_field(
1229 schema,
1230 required=False if not field_info.is_required() else required,
1231 serialization_exclude=field_info.exclude,
1232 validation_alias=_convert_to_aliases(field_info.validation_alias),
1233 serialization_alias=field_info.serialization_alias,
1234 serialization_exclude_if=field_info.exclude_if,
1235 metadata=metadata,
1236 )
1237
1238 def _generate_md_field_schema(
1239 self,
1240 name: str,
1241 field_info: FieldInfo,
1242 decorators: DecoratorInfos,
1243 ) -> core_schema.ModelField:
1244 """Prepare a ModelField to represent a model field."""
1245 schema, metadata = self._common_field_schema(name, field_info, decorators)
1246 return core_schema.model_field(
1247 schema,
1248 serialization_exclude=field_info.exclude,
1249 validation_alias=_convert_to_aliases(field_info.validation_alias),
1250 serialization_alias=field_info.serialization_alias,
1251 serialization_exclude_if=field_info.exclude_if,
1252 frozen=field_info.frozen,
1253 metadata=metadata,
1254 )
1255
1256 def _generate_dc_field_schema(
1257 self,
1258 name: str,
1259 field_info: FieldInfo,
1260 decorators: DecoratorInfos,
1261 ) -> core_schema.DataclassField:
1262 """Prepare a DataclassField to represent the parameter/field, of a dataclass."""
1263 schema, metadata = self._common_field_schema(name, field_info, decorators)
1264 return core_schema.dataclass_field(
1265 name,
1266 schema,
1267 init=field_info.init,
1268 init_only=field_info.init_var or None,
1269 kw_only=None if field_info.kw_only else False,
1270 serialization_exclude=field_info.exclude,
1271 validation_alias=_convert_to_aliases(field_info.validation_alias),
1272 serialization_alias=field_info.serialization_alias,
1273 serialization_exclude_if=field_info.exclude_if,
1274 frozen=field_info.frozen,
1275 metadata=metadata,
1276 )
1277
1278 def _common_field_schema( # C901
1279 self, name: str, field_info: FieldInfo, decorators: DecoratorInfos
1280 ) -> tuple[CoreSchema, dict[str, Any]]:
1281 source_type, annotations = field_info.annotation, field_info.metadata
1282
1283 def set_discriminator(schema: CoreSchema) -> CoreSchema:
1284 schema = self._apply_discriminator_to_union(schema, field_info.discriminator)
1285 return schema
1286
1287 # Convert `@field_validator` decorators to `Before/After/Plain/WrapValidator` instances:
1288 validators_from_decorators = [
1289 _mode_to_validator[decorator.info.mode]._from_decorator(decorator)
1290 for decorator in filter_field_decorator_info_by_field(decorators.field_validators.values(), name)
1291 ]
1292
1293 with self.field_name_stack.push(name):
1294 if field_info.discriminator is not None:
1295 schema = self._apply_annotations(
1296 source_type, annotations + validators_from_decorators, transform_inner_schema=set_discriminator
1297 )
1298 else:
1299 schema = self._apply_annotations(
1300 source_type,
1301 annotations + validators_from_decorators,
1302 )
1303
1304 # This V1 compatibility shim should eventually be removed
1305 # push down any `each_item=True` validators
1306 # note that this won't work for any Annotated types that get wrapped by a function validator
1307 # but that's okay because that didn't exist in V1
1308 this_field_validators = filter_field_decorator_info_by_field(decorators.validators.values(), name)
1309 if _validators_require_validate_default(this_field_validators):
1310 field_info.validate_default = True
1311 each_item_validators = [v for v in this_field_validators if v.info.each_item is True]
1312 this_field_validators = [v for v in this_field_validators if v not in each_item_validators]
1313 schema = apply_each_item_validators(schema, each_item_validators)
1314
1315 schema = apply_validators(schema, this_field_validators)
1316
1317 # the default validator needs to go outside of any other validators
1318 # so that it is the topmost validator for the field validator
1319 # which uses it to check if the field has a default value or not
1320 if not field_info.is_required():
1321 schema = wrap_default(field_info, schema)
1322
1323 schema = self._apply_field_serializers(
1324 schema, filter_field_decorator_info_by_field(decorators.field_serializers.values(), name)
1325 )
1326
1327 pydantic_js_updates, pydantic_js_extra = _extract_json_schema_info_from_field_info(field_info)
1328 core_metadata: dict[str, Any] = {}
1329 update_core_metadata(
1330 core_metadata, pydantic_js_updates=pydantic_js_updates, pydantic_js_extra=pydantic_js_extra
1331 )
1332
1333 return schema, core_metadata
1334
1335 def _union_schema(self, union_type: Any) -> core_schema.CoreSchema:
1336 """Generate schema for a Union."""
1337 args = self._get_args_resolving_forward_refs(union_type, required=True)
1338 choices: list[CoreSchema] = []
1339 nullable = False
1340 for arg in args:
1341 if arg is None or arg is _typing_extra.NoneType:
1342 nullable = True
1343 else:
1344 choices.append(self.generate_schema(arg))
1345
1346 if len(choices) == 1:
1347 s = choices[0]
1348 else:
1349 choices_with_tags: list[CoreSchema | tuple[CoreSchema, str]] = []
1350 for choice in choices:
1351 tag = cast(CoreMetadata, choice.get('metadata', {})).get('pydantic_internal_union_tag_key')
1352 if tag is not None:
1353 choices_with_tags.append((choice, tag))
1354 else:
1355 choices_with_tags.append(choice)
1356 s = core_schema.union_schema(choices_with_tags)
1357
1358 if nullable:
1359 s = core_schema.nullable_schema(s)
1360 return s
1361
1362 def _type_alias_type_schema(self, obj: TypeAliasType) -> CoreSchema:
1363 with self.defs.get_schema_or_ref(obj) as (ref, maybe_schema):
1364 if maybe_schema is not None:
1365 return maybe_schema
1366
1367 origin: TypeAliasType = get_origin(obj) or obj
1368 typevars_map = get_standard_typevars_map(obj)
1369
1370 with self._ns_resolver.push(origin):
1371 try:
1372 annotation = _typing_extra.eval_type(origin.__value__, *self._types_namespace)
1373 except NameError as e:
1374 raise PydanticUndefinedAnnotation.from_name_error(e) from e
1375 annotation = replace_types(annotation, typevars_map)
1376 schema = self.generate_schema(annotation)
1377 assert schema['type'] != 'definitions'
1378 schema['ref'] = ref # type: ignore
1379 return self.defs.create_definition_reference_schema(schema)
1380
1381 def _literal_schema(self, literal_type: Any) -> CoreSchema:
1382 """Generate schema for a Literal."""
1383 expected = list(get_literal_values(literal_type, type_check=False, unpack_type_aliases='eager'))
1384 assert expected, f'literal "expected" cannot be empty, obj={literal_type}'
1385 schema = core_schema.literal_schema(expected)
1386
1387 if self._config_wrapper.use_enum_values and any(isinstance(v, Enum) for v in expected):
1388 schema = core_schema.no_info_after_validator_function(
1389 lambda v: v.value if isinstance(v, Enum) else v, schema
1390 )
1391
1392 return schema
1393
1394 def _typed_dict_schema(self, typed_dict_cls: Any, origin: Any) -> core_schema.CoreSchema:
1395 """Generate a core schema for a `TypedDict` class.
1396
1397 To be able to build a `DecoratorInfos` instance for the `TypedDict` class (which will include
1398 validators, serializers, etc.), we need to have access to the original bases of the class
1399 (see https://docs.python.org/3/library/types.html#types.get_original_bases).
1400 However, the `__orig_bases__` attribute was only added in 3.12 (https://github.com/python/cpython/pull/103698).
1401
1402 For this reason, we require Python 3.12 (or using the `typing_extensions` backport).
1403 """
1404 FieldInfo = import_cached_field_info()
1405
1406 with (
1407 self.model_type_stack.push(typed_dict_cls),
1408 self.defs.get_schema_or_ref(typed_dict_cls) as (
1409 typed_dict_ref,
1410 maybe_schema,
1411 ),
1412 ):
1413 if maybe_schema is not None:
1414 return maybe_schema
1415
1416 typevars_map = get_standard_typevars_map(typed_dict_cls)
1417 if origin is not None:
1418 typed_dict_cls = origin
1419
1420 if not _SUPPORTS_TYPEDDICT and type(typed_dict_cls).__module__ == 'typing':
1421 raise PydanticUserError(
1422 'Please use `typing_extensions.TypedDict` instead of `typing.TypedDict` on Python < 3.12.',
1423 code='typed-dict-version',
1424 )
1425
1426 try:
1427 # if a typed dictionary class doesn't have config, we use the parent's config, hence a default of `None`
1428 # see https://github.com/pydantic/pydantic/issues/10917
1429 config: ConfigDict | None = get_attribute_from_bases(typed_dict_cls, '__pydantic_config__')
1430 except AttributeError:
1431 config = None
1432
1433 with self._config_wrapper_stack.push(config):
1434 core_config = self._config_wrapper.core_config(title=typed_dict_cls.__name__)
1435
1436 required_keys: frozenset[str] = typed_dict_cls.__required_keys__
1437
1438 fields: dict[str, core_schema.TypedDictField] = {}
1439
1440 decorators = DecoratorInfos.build(typed_dict_cls, replace_wrapped_methods=False)
1441 decorators.update_from_config(self._config_wrapper)
1442
1443 if self._config_wrapper.use_attribute_docstrings:
1444 field_docstrings = extract_docstrings_from_cls(typed_dict_cls, use_inspect=True)
1445 else:
1446 field_docstrings = None
1447
1448 try:
1449 annotations = _typing_extra.get_cls_type_hints(typed_dict_cls, ns_resolver=self._ns_resolver)
1450 except NameError as e:
1451 raise PydanticUndefinedAnnotation.from_name_error(e) from e
1452
1453 readonly_fields: list[str] = []
1454
1455 for field_name, annotation in annotations.items():
1456 field_info = FieldInfo.from_annotation(annotation, _source=AnnotationSource.TYPED_DICT)
1457 field_info.annotation = replace_types(field_info.annotation, typevars_map)
1458
1459 required = (
1460 field_name in required_keys or 'required' in field_info._qualifiers
1461 ) and 'not_required' not in field_info._qualifiers
1462 if 'read_only' in field_info._qualifiers:
1463 readonly_fields.append(field_name)
1464
1465 if (
1466 field_docstrings is not None
1467 and field_info.description is None
1468 and field_name in field_docstrings
1469 ):
1470 field_info.description = field_docstrings[field_name]
1471 update_field_from_config(self._config_wrapper, field_name, field_info)
1472
1473 fields[field_name] = self._generate_td_field_schema(
1474 field_name, field_info, decorators, required=required
1475 )
1476
1477 if readonly_fields:
1478 fields_repr = ', '.join(repr(f) for f in readonly_fields)
1479 plural = len(readonly_fields) >= 2
1480 warnings.warn(
1481 f'Item{"s" if plural else ""} {fields_repr} on TypedDict class {typed_dict_cls.__name__!r} '
1482 f'{"are" if plural else "is"} using the `ReadOnly` qualifier. Pydantic will not protect items '
1483 'from any mutation on dictionary instances.',
1484 UserWarning,
1485 )
1486
1487 extra_behavior: core_schema.ExtraBehavior = 'ignore'
1488 extras_schema: CoreSchema | None = None # For 'allow', equivalent to `Any` - no validation performed.
1489
1490 # `__closed__` is `None` when not specified (equivalent to `False`):
1491 is_closed = bool(getattr(typed_dict_cls, '__closed__', False))
1492 extra_items = getattr(typed_dict_cls, '__extra_items__', typing_extensions.NoExtraItems)
1493 if is_closed:
1494 extra_behavior = 'forbid'
1495 extras_schema = None
1496 elif not typing_objects.is_noextraitems(extra_items):
1497 extra_behavior = 'allow'
1498 extras_schema = self.generate_schema(replace_types(extra_items, typevars_map))
1499
1500 if (config_extra := self._config_wrapper.extra) in ('allow', 'forbid'):
1501 if is_closed and config_extra == 'allow':
1502 warnings.warn(
1503 f"TypedDict class {typed_dict_cls.__qualname__!r} is closed, but 'extra' configuration "
1504 "is set to `'allow'`. The 'extra' configuration value will be ignored.",
1505 category=TypedDictExtraConfigWarning,
1506 )
1507 elif not typing_objects.is_noextraitems(extra_items) and config_extra == 'forbid':
1508 warnings.warn(
1509 f"TypedDict class {typed_dict_cls.__qualname__!r} allows extra items, but 'extra' configuration "
1510 "is set to `'forbid'`. The 'extra' configuration value will be ignored.",
1511 category=TypedDictExtraConfigWarning,
1512 )
1513 else:
1514 extra_behavior = config_extra
1515
1516 td_schema = core_schema.typed_dict_schema(
1517 fields,
1518 cls=typed_dict_cls,
1519 computed_fields=[
1520 self._computed_field_schema(d, decorators.field_serializers)
1521 for d in decorators.computed_fields.values()
1522 ],
1523 extra_behavior=extra_behavior,
1524 extras_schema=extras_schema,
1525 ref=typed_dict_ref,
1526 config=core_config,
1527 )
1528
1529 schema = self._apply_model_serializers(td_schema, decorators.model_serializers.values())
1530 schema = apply_model_validators(schema, decorators.model_validators.values(), 'all')
1531 return self.defs.create_definition_reference_schema(schema)
1532
1533 def _namedtuple_schema(self, namedtuple_cls: Any, origin: Any) -> core_schema.CoreSchema:
1534 """Generate schema for a NamedTuple."""
1535 with (
1536 self.model_type_stack.push(namedtuple_cls),
1537 self.defs.get_schema_or_ref(namedtuple_cls) as (
1538 namedtuple_ref,
1539 maybe_schema,
1540 ),
1541 ):
1542 if maybe_schema is not None:
1543 return maybe_schema
1544 typevars_map = get_standard_typevars_map(namedtuple_cls)
1545 if origin is not None:
1546 namedtuple_cls = origin
1547
1548 try:
1549 annotations = _typing_extra.get_cls_type_hints(namedtuple_cls, ns_resolver=self._ns_resolver)
1550 except NameError as e:
1551 raise PydanticUndefinedAnnotation.from_name_error(e) from e
1552
1553 # Filter annotations to only include fields that are actually in the NamedTuple
1554 # (as subclassing an existing NamedTuple is not supported yet - see https://github.com/python/typing/issues/427)
1555 # and use `Any` if no annotation exist (i.e. when using `collections.namedtuple()`).
1556 annotations = {field_name: annotations.get(field_name, Any) for field_name in namedtuple_cls._fields}
1557
1558 if typevars_map:
1559 annotations = {
1560 field_name: replace_types(annotation, typevars_map)
1561 for field_name, annotation in annotations.items()
1562 }
1563
1564 arguments_schema = core_schema.arguments_schema(
1565 [
1566 self._generate_parameter_schema(
1567 field_name,
1568 annotation,
1569 source=AnnotationSource.NAMED_TUPLE,
1570 default=namedtuple_cls._field_defaults.get(field_name, Parameter.empty),
1571 )
1572 for field_name, annotation in annotations.items()
1573 ],
1574 metadata={'pydantic_js_prefer_positional_arguments': True},
1575 )
1576 schema = core_schema.call_schema(arguments_schema, namedtuple_cls, ref=namedtuple_ref)
1577 return self.defs.create_definition_reference_schema(schema)
1578
1579 def _generate_parameter_schema(
1580 self,
1581 name: str,
1582 annotation: type[Any],
1583 source: AnnotationSource,
1584 default: Any = Parameter.empty,
1585 mode: Literal['positional_only', 'positional_or_keyword', 'keyword_only'] | None = None,
1586 ) -> core_schema.ArgumentsParameter:
1587 """Generate the definition of a field in a namedtuple or a parameter in a function signature.
1588
1589 This definition is meant to be used for the `'arguments'` core schema, which will be replaced
1590 in V3 by the `'arguments-v3`'.
1591 """
1592 FieldInfo = import_cached_field_info()
1593
1594 if default is Parameter.empty:
1595 field = FieldInfo.from_annotation(annotation, _source=source)
1596 else:
1597 field = FieldInfo.from_annotated_attribute(annotation, default, _source=source)
1598
1599 assert field.annotation is not None, 'field.annotation should not be None when generating a schema'
1600 update_field_from_config(self._config_wrapper, name, field)
1601
1602 with self.field_name_stack.push(name):
1603 schema = self._apply_annotations(
1604 field.annotation,
1605 [field],
1606 # Because we pass `field` as metadata above (required for attributes relevant for
1607 # JSON Scheme generation), we need to ignore the potential warnings about `FieldInfo`
1608 # attributes that will not be used:
1609 check_unsupported_field_info_attributes=False,
1610 )
1611
1612 if not field.is_required():
1613 schema = wrap_default(field, schema)
1614
1615 parameter_schema = core_schema.arguments_parameter(
1616 name,
1617 schema,
1618 mode=mode,
1619 alias=_convert_to_aliases(field.validation_alias),
1620 )
1621
1622 return parameter_schema
1623
1624 def _generate_parameter_v3_schema(
1625 self,
1626 name: str,
1627 annotation: Any,
1628 source: AnnotationSource,
1629 mode: Literal[
1630 'positional_only',
1631 'positional_or_keyword',
1632 'keyword_only',
1633 'var_args',
1634 'var_kwargs_uniform',
1635 'var_kwargs_unpacked_typed_dict',
1636 ],
1637 default: Any = Parameter.empty,
1638 ) -> core_schema.ArgumentsV3Parameter:
1639 """Generate the definition of a parameter in a function signature.
1640
1641 This definition is meant to be used for the `'arguments-v3'` core schema, which will replace
1642 the `'arguments`' schema in V3.
1643 """
1644 FieldInfo = import_cached_field_info()
1645
1646 if default is Parameter.empty:
1647 field = FieldInfo.from_annotation(annotation, _source=source)
1648 else:
1649 field = FieldInfo.from_annotated_attribute(annotation, default, _source=source)
1650 update_field_from_config(self._config_wrapper, name, field)
1651
1652 with self.field_name_stack.push(name):
1653 schema = self._apply_annotations(
1654 field.annotation,
1655 [field],
1656 # Because we pass `field` as metadata above (required for attributes relevant for
1657 # JSON Scheme generation), we need to ignore the potential warnings about `FieldInfo`
1658 # attributes that will not be used:
1659 check_unsupported_field_info_attributes=False,
1660 )
1661
1662 if not field.is_required():
1663 schema = wrap_default(field, schema)
1664
1665 parameter_schema = core_schema.arguments_v3_parameter(
1666 name=name,
1667 schema=schema,
1668 mode=mode,
1669 alias=_convert_to_aliases(field.validation_alias),
1670 )
1671
1672 return parameter_schema
1673
1674 def _tuple_schema(self, tuple_type: Any) -> core_schema.CoreSchema:
1675 """Generate schema for a Tuple, e.g. `tuple[int, str]` or `tuple[int, ...]`."""
1676 # TODO: do we really need to resolve type vars here?
1677 typevars_map = get_standard_typevars_map(tuple_type)
1678 params = self._get_args_resolving_forward_refs(tuple_type)
1679
1680 if typevars_map and params:
1681 params = tuple(replace_types(param, typevars_map) for param in params)
1682
1683 # NOTE: subtle difference: `tuple[()]` gives `params=()`, whereas `typing.Tuple[()]` gives `params=((),)`
1684 # This is only true for <3.11, on Python 3.11+ `typing.Tuple[()]` gives `params=()`
1685 if not params:
1686 if tuple_type in TUPLE_TYPES:
1687 return core_schema.tuple_schema([core_schema.any_schema()], variadic_item_index=0)
1688 else:
1689 # special case for `tuple[()]` which means `tuple[]` - an empty tuple
1690 return core_schema.tuple_schema([])
1691 elif params[-1] is Ellipsis:
1692 if len(params) == 2:
1693 return core_schema.tuple_schema([self.generate_schema(params[0])], variadic_item_index=0)
1694 else:
1695 # TODO: something like https://github.com/pydantic/pydantic/issues/5952
1696 raise ValueError('Variable tuples can only have one type')
1697 elif len(params) == 1 and params[0] == ():
1698 # special case for `tuple[()]` which means `tuple[]` - an empty tuple
1699 # NOTE: This conditional can be removed when we drop support for Python 3.10.
1700 return core_schema.tuple_schema([])
1701 else:
1702 return core_schema.tuple_schema([self.generate_schema(param) for param in params])
1703
1704 def _type_schema(self) -> core_schema.CoreSchema:
1705 return core_schema.custom_error_schema(
1706 core_schema.is_instance_schema(type),
1707 custom_error_type='is_type',
1708 custom_error_message='Input should be a type',
1709 )
1710
1711 def _zoneinfo_schema(self) -> core_schema.CoreSchema:
1712 """Generate schema for a zone_info.ZoneInfo object"""
1713 from ._validators import validate_str_is_valid_iana_tz
1714
1715 metadata = {'pydantic_js_functions': [lambda _1, _2: {'type': 'string', 'format': 'zoneinfo'}]}
1716 return core_schema.no_info_plain_validator_function(
1717 validate_str_is_valid_iana_tz,
1718 serialization=core_schema.to_string_ser_schema(),
1719 metadata=metadata,
1720 )
1721
1722 def _union_is_subclass_schema(self, union_type: Any) -> core_schema.CoreSchema:
1723 """Generate schema for `type[Union[X, ...]]`."""
1724 args = self._get_args_resolving_forward_refs(union_type, required=True)
1725 return core_schema.union_schema([self.generate_schema(type[args]) for args in args])
1726
1727 def _subclass_schema(self, type_: Any) -> core_schema.CoreSchema:
1728 """Generate schema for a type, e.g. `type[int]`."""
1729 type_param = self._get_first_arg_or_any(type_)
1730
1731 # Assume `type[Annotated[<typ>, ...]]` is equivalent to `type[<typ>]`:
1732 type_param = _typing_extra.annotated_type(type_param) or type_param
1733
1734 if typing_objects.is_any(type_param):
1735 return self._type_schema()
1736 elif typing_objects.is_typealiastype(type_param):
1737 return self.generate_schema(type[type_param.__value__])
1738 elif typing_objects.is_typevar(type_param):
1739 if type_param.__bound__:
1740 if is_union_origin(get_origin(type_param.__bound__)):
1741 return self._union_is_subclass_schema(type_param.__bound__)
1742 return core_schema.is_subclass_schema(type_param.__bound__)
1743 elif type_param.__constraints__:
1744 return core_schema.union_schema([self.generate_schema(type[c]) for c in type_param.__constraints__])
1745 else:
1746 return self._type_schema()
1747 elif is_union_origin(get_origin(type_param)):
1748 return self._union_is_subclass_schema(type_param)
1749 else:
1750 if typing_objects.is_self(type_param):
1751 type_param = self._resolve_self_type(type_param)
1752 if _typing_extra.is_generic_alias(type_param):
1753 raise PydanticUserError(
1754 'Subscripting `type[]` with an already parametrized type is not supported. '
1755 f'Instead of using type[{type_param!r}], use type[{_repr.display_as_type(get_origin(type_param))}].',
1756 code=None,
1757 )
1758 if not inspect.isclass(type_param):
1759 # when using type[None], this doesn't type convert to type[NoneType], and None isn't a class
1760 # so we handle it manually here
1761 if type_param is None:
1762 return core_schema.is_subclass_schema(_typing_extra.NoneType)
1763 raise TypeError(f'Expected a class, got {type_param!r}')
1764 return core_schema.is_subclass_schema(type_param)
1765
1766 def _sequence_schema(self, items_type: Any) -> core_schema.CoreSchema:
1767 """Generate schema for a Sequence, e.g. `Sequence[int]`."""
1768 from ._serializers import serialize_sequence_via_list
1769
1770 item_type_schema = self.generate_schema(items_type)
1771 list_schema = core_schema.list_schema(item_type_schema)
1772
1773 json_schema = smart_deepcopy(list_schema)
1774 python_schema = core_schema.is_instance_schema(typing.Sequence, cls_repr='Sequence')
1775 if not typing_objects.is_any(items_type):
1776 from ._validators import sequence_validator
1777
1778 python_schema = core_schema.chain_schema(
1779 [python_schema, core_schema.no_info_wrap_validator_function(sequence_validator, list_schema)],
1780 )
1781
1782 serialization = core_schema.wrap_serializer_function_ser_schema(
1783 serialize_sequence_via_list, schema=item_type_schema, info_arg=True
1784 )
1785 return core_schema.json_or_python_schema(
1786 json_schema=json_schema, python_schema=python_schema, serialization=serialization
1787 )
1788
1789 def _iterable_schema(self, type_: Any) -> core_schema.GeneratorSchema:
1790 """Generate a schema for an `Iterable`."""
1791 item_type = self._get_first_arg_or_any(type_)
1792
1793 return core_schema.generator_schema(self.generate_schema(item_type))
1794
1795 def _pattern_schema(self, pattern_type: Any) -> core_schema.CoreSchema:
1796 from . import _validators
1797
1798 metadata = {'pydantic_js_functions': [lambda _1, _2: {'type': 'string', 'format': 'regex'}]}
1799 ser = core_schema.plain_serializer_function_ser_schema(
1800 attrgetter('pattern'), when_used='json', return_schema=core_schema.str_schema()
1801 )
1802 if pattern_type is typing.Pattern or pattern_type is re.Pattern:
1803 # bare type
1804 return core_schema.no_info_plain_validator_function(
1805 _validators.pattern_either_validator, serialization=ser, metadata=metadata
1806 )
1807
1808 param = self._get_args_resolving_forward_refs(
1809 pattern_type,
1810 required=True,
1811 )[0]
1812 if param is str:
1813 return core_schema.no_info_plain_validator_function(
1814 _validators.pattern_str_validator, serialization=ser, metadata=metadata
1815 )
1816 elif param is bytes:
1817 return core_schema.no_info_plain_validator_function(
1818 _validators.pattern_bytes_validator, serialization=ser, metadata=metadata
1819 )
1820 else:
1821 raise PydanticSchemaGenerationError(f'Unable to generate pydantic-core schema for {pattern_type!r}.')
1822
1823 def _hashable_schema(self) -> core_schema.CoreSchema:
1824 return core_schema.custom_error_schema(
1825 schema=core_schema.json_or_python_schema(
1826 json_schema=core_schema.chain_schema(
1827 [core_schema.any_schema(), core_schema.is_instance_schema(collections.abc.Hashable)]
1828 ),
1829 python_schema=core_schema.is_instance_schema(collections.abc.Hashable),
1830 ),
1831 custom_error_type='is_hashable',
1832 custom_error_message='Input should be hashable',
1833 )
1834
1835 def _dataclass_schema(
1836 self, dataclass: type[StandardDataclass], origin: type[StandardDataclass] | None
1837 ) -> core_schema.CoreSchema:
1838 """Generate schema for a dataclass."""
1839 with (
1840 self.model_type_stack.push(dataclass),
1841 self.defs.get_schema_or_ref(dataclass) as (
1842 dataclass_ref,
1843 maybe_schema,
1844 ),
1845 ):
1846 if maybe_schema is not None:
1847 return maybe_schema
1848
1849 schema = dataclass.__dict__.get('__pydantic_core_schema__')
1850 if schema is not None and not isinstance(schema, MockCoreSchema):
1851 if schema['type'] == 'definitions':
1852 schema = self.defs.unpack_definitions(schema)
1853 ref = get_ref(schema)
1854 if ref:
1855 return self.defs.create_definition_reference_schema(schema)
1856 else:
1857 return schema
1858
1859 typevars_map = get_standard_typevars_map(dataclass)
1860 if origin is not None:
1861 dataclass = origin
1862
1863 # if (plain) dataclass doesn't have config, we use the parent's config, hence a default of `None`
1864 # (Pydantic dataclasses have an empty dict config by default).
1865 # see https://github.com/pydantic/pydantic/issues/10917
1866 config = getattr(dataclass, '__pydantic_config__', None)
1867
1868 from ..dataclasses import is_pydantic_dataclass
1869
1870 with self._ns_resolver.push(dataclass), self._config_wrapper_stack.push(config):
1871 if is_pydantic_dataclass(dataclass):
1872 if dataclass.__pydantic_fields_complete__():
1873 # Copy the field info instances to avoid mutating the `FieldInfo` instances
1874 # of the generic dataclass generic origin (e.g. `apply_typevars_map` below).
1875 # Note that we don't apply `deepcopy` on `__pydantic_fields__` because we
1876 # don't want to copy the `FieldInfo` attributes:
1877 fields = {
1878 f_name: copy(field_info) for f_name, field_info in dataclass.__pydantic_fields__.items()
1879 }
1880 if typevars_map:
1881 for field in fields.values():
1882 field.apply_typevars_map(typevars_map, *self._types_namespace)
1883 else:
1884 try:
1885 fields = rebuild_dataclass_fields(
1886 dataclass,
1887 config_wrapper=self._config_wrapper,
1888 ns_resolver=self._ns_resolver,
1889 typevars_map=typevars_map or {},
1890 )
1891 except NameError as e:
1892 raise PydanticUndefinedAnnotation.from_name_error(e) from e
1893 else:
1894 fields = collect_dataclass_fields(
1895 dataclass,
1896 typevars_map=typevars_map,
1897 config_wrapper=self._config_wrapper,
1898 )
1899
1900 if self._config_wrapper.extra == 'allow':
1901 # disallow combination of init=False on a dataclass field and extra='allow' on a dataclass
1902 for field_name, field in fields.items():
1903 if field.init is False:
1904 raise PydanticUserError(
1905 f'Field {field_name} has `init=False` and dataclass has config setting `extra="allow"`. '
1906 f'This combination is not allowed.',
1907 code='dataclass-init-false-extra-allow',
1908 )
1909
1910 decorators = dataclass.__dict__.get('__pydantic_decorators__')
1911 if decorators is None:
1912 decorators = DecoratorInfos.build(dataclass, replace_wrapped_methods=False)
1913 decorators.update_from_config(self._config_wrapper)
1914 # Move kw_only=False args to the start of the list, as this is how vanilla dataclasses work.
1915 # Note that when kw_only is missing or None, it is treated as equivalent to kw_only=True
1916 args = sorted(
1917 (self._generate_dc_field_schema(k, v, decorators) for k, v in fields.items()),
1918 key=lambda a: a.get('kw_only') is not False,
1919 )
1920 has_post_init = hasattr(dataclass, '__post_init__')
1921 has_slots = hasattr(dataclass, '__slots__')
1922
1923 args_schema = core_schema.dataclass_args_schema(
1924 dataclass.__name__,
1925 args,
1926 computed_fields=[
1927 self._computed_field_schema(d, decorators.field_serializers)
1928 for d in decorators.computed_fields.values()
1929 ],
1930 collect_init_only=has_post_init,
1931 )
1932
1933 inner_schema = apply_validators(args_schema, decorators.root_validators.values())
1934
1935 model_validators = decorators.model_validators.values()
1936 inner_schema = apply_model_validators(inner_schema, model_validators, 'inner')
1937
1938 core_config = self._config_wrapper.core_config(title=dataclass.__name__)
1939
1940 dc_schema = core_schema.dataclass_schema(
1941 dataclass,
1942 inner_schema,
1943 generic_origin=origin,
1944 post_init=has_post_init,
1945 ref=dataclass_ref,
1946 fields=[field.name for field in dataclasses.fields(dataclass)],
1947 slots=has_slots,
1948 config=core_config,
1949 # we don't use a custom __setattr__ for dataclasses, so we must
1950 # pass along the frozen config setting to the pydantic-core schema
1951 frozen=self._config_wrapper_stack.tail.frozen,
1952 )
1953 schema = self._apply_model_serializers(dc_schema, decorators.model_serializers.values())
1954 schema = apply_model_validators(schema, model_validators, 'outer')
1955 return self.defs.create_definition_reference_schema(schema)
1956
1957 def _call_schema(self, function: ValidateCallSupportedTypes) -> core_schema.CallSchema:
1958 """Generate schema for a Callable.
1959
1960 TODO support functional validators once we support them in Config
1961 """
1962 arguments_schema = self._arguments_schema(function)
1963
1964 return_schema: core_schema.CoreSchema | None = None
1965 config_wrapper = self._config_wrapper
1966 if config_wrapper.validate_return:
1967 sig = _typing_extra.signature_no_eval(function)
1968 return_hint = sig.return_annotation
1969 if return_hint is not sig.empty:
1970 globalns, localns = self._types_namespace
1971 type_hints = _typing_extra.get_function_type_hints(
1972 function, globalns=globalns, localns=localns, include_keys={'return'}
1973 )
1974 return_schema = self.generate_schema(type_hints['return'])
1975
1976 return core_schema.call_schema(
1977 arguments_schema,
1978 function,
1979 return_schema=return_schema,
1980 )
1981
1982 def _arguments_schema(
1983 self, function: ValidateCallSupportedTypes, parameters_callback: ParametersCallback | None = None
1984 ) -> core_schema.ArgumentsSchema:
1985 """Generate schema for a Signature."""
1986 mode_lookup: dict[_ParameterKind, Literal['positional_only', 'positional_or_keyword', 'keyword_only']] = {
1987 Parameter.POSITIONAL_ONLY: 'positional_only',
1988 Parameter.POSITIONAL_OR_KEYWORD: 'positional_or_keyword',
1989 Parameter.KEYWORD_ONLY: 'keyword_only',
1990 }
1991
1992 sig = _typing_extra.signature_no_eval(function)
1993 globalns, localns = self._types_namespace
1994 type_hints = _typing_extra.get_function_type_hints(function, globalns=globalns, localns=localns)
1995
1996 arguments_list: list[core_schema.ArgumentsParameter] = []
1997 var_args_schema: core_schema.CoreSchema | None = None
1998 var_kwargs_schema: core_schema.CoreSchema | None = None
1999 var_kwargs_mode: core_schema.VarKwargsMode | None = None
2000
2001 for i, (name, p) in enumerate(sig.parameters.items()):
2002 if p.annotation is sig.empty:
2003 annotation = typing.cast(Any, Any)
2004 else:
2005 annotation = type_hints[name]
2006
2007 if parameters_callback is not None:
2008 result = parameters_callback(i, name, annotation)
2009 if result == 'skip':
2010 continue
2011
2012 parameter_mode = mode_lookup.get(p.kind)
2013 if parameter_mode is not None:
2014 arg_schema = self._generate_parameter_schema(
2015 name, annotation, AnnotationSource.FUNCTION, p.default, parameter_mode
2016 )
2017 arguments_list.append(arg_schema)
2018 elif p.kind == Parameter.VAR_POSITIONAL:
2019 var_args_schema = self.generate_schema(annotation)
2020 else:
2021 assert p.kind == Parameter.VAR_KEYWORD, p.kind
2022
2023 unpack_type = _typing_extra.unpack_type(annotation)
2024 if unpack_type is not None:
2025 origin = get_origin(unpack_type) or unpack_type
2026 if not is_typeddict(origin):
2027 raise PydanticUserError(
2028 f'Expected a `TypedDict` class inside `Unpack[...]`, got {unpack_type!r}',
2029 code='unpack-typed-dict',
2030 )
2031 non_pos_only_param_names = {
2032 name for name, p in sig.parameters.items() if p.kind != Parameter.POSITIONAL_ONLY
2033 }
2034 overlapping_params = non_pos_only_param_names.intersection(origin.__annotations__)
2035 if overlapping_params:
2036 raise PydanticUserError(
2037 f'Typed dictionary {origin.__name__!r} overlaps with parameter'
2038 f'{"s" if len(overlapping_params) >= 2 else ""} '
2039 f'{", ".join(repr(p) for p in sorted(overlapping_params))}',
2040 code='overlapping-unpack-typed-dict',
2041 )
2042
2043 var_kwargs_mode = 'unpacked-typed-dict'
2044 var_kwargs_schema = self._typed_dict_schema(unpack_type, get_origin(unpack_type))
2045 else:
2046 var_kwargs_mode = 'uniform'
2047 var_kwargs_schema = self.generate_schema(annotation)
2048
2049 return core_schema.arguments_schema(
2050 arguments_list,
2051 var_args_schema=var_args_schema,
2052 var_kwargs_mode=var_kwargs_mode,
2053 var_kwargs_schema=var_kwargs_schema,
2054 validate_by_name=self._config_wrapper.validate_by_name,
2055 )
2056
2057 def _arguments_v3_schema(
2058 self, function: ValidateCallSupportedTypes, parameters_callback: ParametersCallback | None = None
2059 ) -> core_schema.ArgumentsV3Schema:
2060 mode_lookup: dict[
2061 _ParameterKind, Literal['positional_only', 'positional_or_keyword', 'var_args', 'keyword_only']
2062 ] = {
2063 Parameter.POSITIONAL_ONLY: 'positional_only',
2064 Parameter.POSITIONAL_OR_KEYWORD: 'positional_or_keyword',
2065 Parameter.VAR_POSITIONAL: 'var_args',
2066 Parameter.KEYWORD_ONLY: 'keyword_only',
2067 }
2068
2069 sig = _typing_extra.signature_no_eval(function)
2070 globalns, localns = self._types_namespace
2071 type_hints = _typing_extra.get_function_type_hints(function, globalns=globalns, localns=localns)
2072
2073 parameters_list: list[core_schema.ArgumentsV3Parameter] = []
2074
2075 for i, (name, p) in enumerate(sig.parameters.items()):
2076 if parameters_callback is not None:
2077 result = parameters_callback(i, name, p.annotation)
2078 if result == 'skip':
2079 continue
2080
2081 if p.annotation is Parameter.empty:
2082 annotation = typing.cast(Any, Any)
2083 else:
2084 annotation = type_hints[name]
2085
2086 parameter_mode = mode_lookup.get(p.kind)
2087 if parameter_mode is None:
2088 assert p.kind == Parameter.VAR_KEYWORD, p.kind
2089
2090 unpack_type = _typing_extra.unpack_type(annotation)
2091 if unpack_type is not None:
2092 origin = get_origin(unpack_type) or unpack_type
2093 if not is_typeddict(origin):
2094 raise PydanticUserError(
2095 f'Expected a `TypedDict` class inside `Unpack[...]`, got {unpack_type!r}',
2096 code='unpack-typed-dict',
2097 )
2098 non_pos_only_param_names = {
2099 name for name, p in sig.parameters.items() if p.kind != Parameter.POSITIONAL_ONLY
2100 }
2101 overlapping_params = non_pos_only_param_names.intersection(origin.__annotations__)
2102 if overlapping_params:
2103 raise PydanticUserError(
2104 f'Typed dictionary {origin.__name__!r} overlaps with parameter'
2105 f'{"s" if len(overlapping_params) >= 2 else ""} '
2106 f'{", ".join(repr(p) for p in sorted(overlapping_params))}',
2107 code='overlapping-unpack-typed-dict',
2108 )
2109 parameter_mode = 'var_kwargs_unpacked_typed_dict'
2110 annotation = unpack_type
2111 else:
2112 parameter_mode = 'var_kwargs_uniform'
2113
2114 parameters_list.append(
2115 self._generate_parameter_v3_schema(
2116 name, annotation, AnnotationSource.FUNCTION, parameter_mode, default=p.default
2117 )
2118 )
2119
2120 return core_schema.arguments_v3_schema(
2121 parameters_list,
2122 validate_by_name=self._config_wrapper.validate_by_name,
2123 )
2124
2125 def _unsubstituted_typevar_schema(self, typevar: typing.TypeVar) -> core_schema.CoreSchema:
2126 try:
2127 has_default = typevar.has_default() # pyright: ignore[reportAttributeAccessIssue]
2128 except AttributeError:
2129 # Happens if using `typing.TypeVar` (and not `typing_extensions`) on Python < 3.13
2130 pass
2131 else:
2132 if has_default:
2133 return self.generate_schema(typevar.__default__) # pyright: ignore[reportAttributeAccessIssue]
2134
2135 if constraints := typevar.__constraints__:
2136 return self._union_schema(typing.Union[constraints])
2137
2138 if bound := typevar.__bound__:
2139 schema = self.generate_schema(bound)
2140 schema['serialization'] = core_schema.simple_ser_schema('any')
2141 return schema
2142
2143 return core_schema.any_schema()
2144
2145 def _computed_field_schema(
2146 self,
2147 d: Decorator[ComputedFieldInfo],
2148 field_serializers: dict[str, Decorator[FieldSerializerDecoratorInfo]],
2149 ) -> core_schema.ComputedField:
2150 if d.info.return_type is not PydanticUndefined:
2151 return_type = d.info.return_type
2152 else:
2153 try:
2154 # Do not pass in globals as the function could be defined in a different module.
2155 # Instead, let `get_callable_return_type` infer the globals to use, but still pass
2156 # in locals that may contain a parent/rebuild namespace:
2157 return_type = _decorators.get_callable_return_type(d.func, localns=self._types_namespace.locals)
2158 except NameError as e:
2159 raise PydanticUndefinedAnnotation.from_name_error(e) from e
2160 if return_type is PydanticUndefined:
2161 raise PydanticUserError(
2162 'Computed field is missing return type annotation or specifying `return_type`'
2163 ' to the `@computed_field` decorator (e.g. `@computed_field(return_type=int | str)`)',
2164 code='model-field-missing-annotation',
2165 )
2166
2167 return_type = replace_types(return_type, self._typevars_map)
2168 # Create a new ComputedFieldInfo so that different type parametrizations of the same
2169 # generic model's computed field can have different return types.
2170 d.info = dataclasses.replace(d.info, return_type=return_type)
2171 return_type_schema = self.generate_schema(return_type)
2172 # Apply serializers to computed field if there exist
2173 return_type_schema = self._apply_field_serializers(
2174 return_type_schema,
2175 filter_field_decorator_info_by_field(field_serializers.values(), d.cls_var_name),
2176 )
2177
2178 pydantic_js_updates, pydantic_js_extra = _extract_json_schema_info_from_field_info(d.info)
2179 core_metadata: dict[str, Any] = {}
2180 update_core_metadata(
2181 core_metadata,
2182 pydantic_js_updates={'readOnly': True, **(pydantic_js_updates if pydantic_js_updates else {})},
2183 pydantic_js_extra=pydantic_js_extra,
2184 )
2185 exclude_if = d.info.exclude_if
2186 # TODO: Should we support exclude_if from annotations?
2187 return core_schema.computed_field(
2188 d.cls_var_name,
2189 return_schema=return_type_schema,
2190 alias=d.info.alias,
2191 serialization_exclude_if=exclude_if,
2192 metadata=core_metadata,
2193 )
2194
2195 def _annotated_schema(self, annotated_type: Any) -> core_schema.CoreSchema:
2196 """Generate schema for an Annotated type, e.g. `Annotated[int, Field(...)]` or `Annotated[int, Gt(0)]`."""
2197 FieldInfo = import_cached_field_info()
2198 source_type, *annotations = self._get_args_resolving_forward_refs(
2199 annotated_type,
2200 required=True,
2201 )
2202 schema = self._apply_annotations(source_type, annotations)
2203 # put the default validator last so that TypeAdapter.get_default_value() works
2204 # even if there are function validators involved
2205 for annotation in annotations:
2206 if isinstance(annotation, FieldInfo):
2207 schema = wrap_default(annotation, schema)
2208 return schema
2209
2210 def _apply_annotations(
2211 self,
2212 source_type: Any,
2213 annotations: list[Any],
2214 transform_inner_schema: Callable[[CoreSchema], CoreSchema] = lambda x: x,
2215 check_unsupported_field_info_attributes: bool = True,
2216 ) -> CoreSchema:
2217 """Apply arguments from `Annotated` or from `FieldInfo` to a schema.
2218
2219 This gets called by `GenerateSchema._annotated_schema` but differs from it in that it does
2220 not expect `source_type` to be an `Annotated` object, it expects it to be the first argument of that
2221 (in other words, `GenerateSchema._annotated_schema` just unpacks `Annotated`, this process it).
2222 """
2223 annotations = list(_known_annotated_metadata.expand_grouped_metadata(annotations))
2224
2225 pydantic_js_annotation_functions: list[GetJsonSchemaFunction] = []
2226
2227 def inner_handler(obj: Any) -> CoreSchema:
2228 schema = self._generate_schema_from_get_schema_method(obj, source_type)
2229
2230 if schema is None:
2231 schema = self._generate_schema_inner(obj)
2232
2233 metadata_js_function = _extract_get_pydantic_json_schema(obj)
2234 if metadata_js_function is not None:
2235 metadata_schema = resolve_original_schema(schema, self.defs)
2236 if metadata_schema is not None:
2237 self._add_js_function(metadata_schema, metadata_js_function)
2238 return transform_inner_schema(schema)
2239
2240 get_inner_schema = CallbackGetCoreSchemaHandler(inner_handler, self)
2241
2242 for annotation in annotations:
2243 if annotation is None:
2244 continue
2245 get_inner_schema = self._get_wrapped_inner_schema(
2246 get_inner_schema,
2247 annotation,
2248 pydantic_js_annotation_functions,
2249 check_unsupported_field_info_attributes=check_unsupported_field_info_attributes,
2250 )
2251
2252 schema = get_inner_schema(source_type)
2253 if pydantic_js_annotation_functions:
2254 core_metadata = schema.setdefault('metadata', {})
2255 update_core_metadata(core_metadata, pydantic_js_annotation_functions=pydantic_js_annotation_functions)
2256 return _add_custom_serialization_from_json_encoders(self._config_wrapper.json_encoders, source_type, schema)
2257
2258 def _apply_single_annotation(
2259 self,
2260 schema: core_schema.CoreSchema,
2261 metadata: Any,
2262 check_unsupported_field_info_attributes: bool = True,
2263 ) -> core_schema.CoreSchema:
2264 FieldInfo = import_cached_field_info()
2265
2266 if isinstance(metadata, FieldInfo):
2267 if (
2268 check_unsupported_field_info_attributes
2269 # HACK: we don't want to emit the warning for `FieldInfo` subclasses, because FastAPI does weird manipulations
2270 # with its subclasses and their annotations:
2271 and type(metadata) is FieldInfo
2272 ):
2273 for attr, value in (unsupported_attributes := self._get_unsupported_field_info_attributes(metadata)):
2274 warnings.warn(
2275 f'The {attr!r} attribute with value {value!r} was provided to the `Field()` function, '
2276 f'which has no effect in the context it was used. {attr!r} is field-specific metadata, '
2277 'and can only be attached to a model field using `Annotated` metadata or by assignment. '
2278 'This may have happened because an `Annotated` type alias using the `type` statement was '
2279 'used, or if the `Field()` function was attached to a single member of a union type.',
2280 category=UnsupportedFieldAttributeWarning,
2281 )
2282
2283 if (
2284 metadata.default_factory_takes_validated_data
2285 and self.model_type_stack.get() is None
2286 and 'defaut_factory' not in unsupported_attributes
2287 ):
2288 warnings.warn(
2289 "A 'default_factory' taking validated data as an argument was provided to the `Field()` function, "
2290 'but no validated data is available in the context it was used.',
2291 category=UnsupportedFieldAttributeWarning,
2292 )
2293
2294 for field_metadata in metadata.metadata:
2295 schema = self._apply_single_annotation(schema, field_metadata)
2296
2297 if metadata.discriminator is not None:
2298 schema = self._apply_discriminator_to_union(schema, metadata.discriminator)
2299 return schema
2300
2301 if schema['type'] == 'nullable':
2302 # for nullable schemas, metadata is automatically applied to the inner schema
2303 inner = schema.get('schema', core_schema.any_schema())
2304 inner = self._apply_single_annotation(inner, metadata)
2305 if inner:
2306 schema['schema'] = inner
2307 return schema
2308
2309 if schema['type'] == 'union' and any(
2310 choice['type'] == 'missing-sentinel' for choice in core_schema.iter_union_choices(schema)
2311 ):
2312 # Same behavior as for nullable schemas. This is a bit gross, but we have to support the same pattern
2313 filtered_choices = [
2314 choice
2315 for choice in schema['choices']
2316 if (choice[0] if isinstance(choice, tuple) else choice)['type'] != 'missing-sentinel'
2317 ]
2318 if len(filtered_choices) >= 2:
2319 # e.g. `Annotated[int | str | MISSING, Constraint(...)]`. We apply `Constraint(...)` to `int | str`,
2320 # and create a new union semantically equivalent to `Annotated[int | str, Constraint(...)] | MISSING`:
2321 filtered_union = core_schema.union_schema(filtered_choices)
2322 filtered_union = self._apply_single_annotation(filtered_union, metadata)
2323 new_union = schema.copy()
2324 new_union['choices'] = [
2325 filtered_union,
2326 next(
2327 choice
2328 for choice in schema['choices']
2329 if (choice[0] if isinstance(choice, tuple) else choice)['type'] == 'missing-sentinel'
2330 ),
2331 ]
2332 return new_union
2333 elif len(filtered_choices) == 1:
2334 # e.g. `Annotated[int | MISSING, Constraint(...)]`. We apply `Constraint(...)` to `int`, and reconstruct
2335 # a new union preserving the order.
2336 inner = filtered_choices[0][0] if isinstance(filtered_choices[0], tuple) else filtered_choices[0]
2337 inner = self._apply_single_annotation(inner, metadata)
2338
2339 # Create a new union schema, preserving the order of the union:
2340 new_union = schema.copy()
2341 new_union['choices'] = [
2342 (inner, choice[1])
2343 if isinstance(choice, tuple) and choice[0]['type'] != 'missing-sentinel'
2344 else inner
2345 if not isinstance(choice, tuple) and choice['type'] != 'missing-sentinel'
2346 else choice
2347 for choice in schema['choices']
2348 ]
2349 return new_union
2350
2351 original_schema = schema
2352 ref = schema.get('ref')
2353 if ref is not None:
2354 schema = schema.copy()
2355 new_ref = ref + f'_{repr(metadata)}'
2356 if (existing := self.defs.get_schema_from_ref(new_ref)) is not None:
2357 return existing
2358 schema['ref'] = new_ref # pyright: ignore[reportGeneralTypeIssues]
2359 elif schema['type'] == 'definition-ref':
2360 ref = schema['schema_ref']
2361 if (referenced_schema := self.defs.get_schema_from_ref(ref)) is not None:
2362 schema = referenced_schema.copy()
2363 new_ref = ref + f'_{repr(metadata)}'
2364 if (existing := self.defs.get_schema_from_ref(new_ref)) is not None:
2365 return existing
2366 schema['ref'] = new_ref # pyright: ignore[reportGeneralTypeIssues]
2367
2368 maybe_updated_schema = _known_annotated_metadata.apply_known_metadata(metadata, schema)
2369
2370 if maybe_updated_schema is not None:
2371 return maybe_updated_schema
2372 return original_schema
2373
2374 def _apply_single_annotation_json_schema(
2375 self, schema: core_schema.CoreSchema, metadata: Any
2376 ) -> core_schema.CoreSchema:
2377 FieldInfo = import_cached_field_info()
2378
2379 if isinstance(metadata, FieldInfo):
2380 for field_metadata in metadata.metadata:
2381 schema = self._apply_single_annotation_json_schema(schema, field_metadata)
2382
2383 pydantic_js_updates, pydantic_js_extra = _extract_json_schema_info_from_field_info(metadata)
2384 core_metadata = schema.setdefault('metadata', {})
2385 update_core_metadata(
2386 core_metadata, pydantic_js_updates=pydantic_js_updates, pydantic_js_extra=pydantic_js_extra
2387 )
2388 return schema
2389
2390 def _get_unsupported_field_info_attributes(self, field_info: FieldInfo) -> list[tuple[str, Any]]:
2391 """Get the list of unsupported `FieldInfo` attributes when not directly used in `Annotated` for field annotations."""
2392 unused_metadata: list[tuple[str, Any]] = []
2393 for unused_metadata_name, unset_value in UNSUPPORTED_STANDALONE_FIELDINFO_ATTRIBUTES:
2394 if (
2395 (unused_metadata_value := getattr(field_info, unused_metadata_name)) is not unset_value
2396 # `default` and `default_factory` can still be used with a type adapter, so only include them
2397 # if used with a model-like class:
2398 and (
2399 unused_metadata_name not in ('default', 'default_factory')
2400 or self.model_type_stack.get() is not None
2401 )
2402 # Setting `alias` will set `validation/serialization_alias` as well, so we want to avoid duplicate warnings:
2403 and (
2404 unused_metadata_name not in ('validation_alias', 'serialization_alias')
2405 or 'alias' not in field_info._attributes_set
2406 )
2407 ):
2408 unused_metadata.append((unused_metadata_name, unused_metadata_value))
2409
2410 return unused_metadata
2411
2412 def _get_wrapped_inner_schema(
2413 self,
2414 get_inner_schema: GetCoreSchemaHandler,
2415 annotation: Any,
2416 pydantic_js_annotation_functions: list[GetJsonSchemaFunction],
2417 check_unsupported_field_info_attributes: bool = False,
2418 ) -> CallbackGetCoreSchemaHandler:
2419 annotation_get_schema: GetCoreSchemaFunction | None = getattr(annotation, '__get_pydantic_core_schema__', None)
2420
2421 def new_handler(source: Any) -> core_schema.CoreSchema:
2422 if annotation_get_schema is not None:
2423 schema = annotation_get_schema(source, get_inner_schema)
2424 else:
2425 schema = get_inner_schema(source)
2426 schema = self._apply_single_annotation(
2427 schema,
2428 annotation,
2429 check_unsupported_field_info_attributes=check_unsupported_field_info_attributes,
2430 )
2431 schema = self._apply_single_annotation_json_schema(schema, annotation)
2432
2433 metadata_js_function = _extract_get_pydantic_json_schema(annotation)
2434 if metadata_js_function is not None:
2435 pydantic_js_annotation_functions.append(metadata_js_function)
2436 return schema
2437
2438 return CallbackGetCoreSchemaHandler(new_handler, self)
2439
2440 def _apply_field_serializers(
2441 self,
2442 schema: core_schema.CoreSchema,
2443 serializers: list[Decorator[FieldSerializerDecoratorInfo]],
2444 ) -> core_schema.CoreSchema:
2445 """Apply field serializers to a schema."""
2446 if serializers:
2447 schema = copy(schema)
2448 if schema['type'] == 'definitions':
2449 inner_schema = schema['schema']
2450 schema['schema'] = self._apply_field_serializers(inner_schema, serializers)
2451 return schema
2452 elif 'ref' in schema:
2453 schema = self.defs.create_definition_reference_schema(schema)
2454
2455 # use the last serializer to make it easy to override a serializer set on a parent model
2456 serializer = serializers[-1]
2457 is_field_serializer, info_arg = inspect_field_serializer(serializer.func, serializer.info.mode)
2458
2459 if serializer.info.return_type is not PydanticUndefined:
2460 return_type = serializer.info.return_type
2461 else:
2462 try:
2463 # Do not pass in globals as the function could be defined in a different module.
2464 # Instead, let `get_callable_return_type` infer the globals to use, but still pass
2465 # in locals that may contain a parent/rebuild namespace:
2466 return_type = _decorators.get_callable_return_type(
2467 serializer.func, localns=self._types_namespace.locals
2468 )
2469 except NameError as e:
2470 raise PydanticUndefinedAnnotation.from_name_error(e) from e
2471
2472 if return_type is PydanticUndefined:
2473 return_schema = None
2474 else:
2475 return_schema = self.generate_schema(return_type)
2476
2477 if serializer.info.mode == 'wrap':
2478 schema['serialization'] = core_schema.wrap_serializer_function_ser_schema(
2479 serializer.func,
2480 is_field_serializer=is_field_serializer,
2481 info_arg=info_arg,
2482 return_schema=return_schema,
2483 when_used=serializer.info.when_used,
2484 )
2485 else:
2486 assert serializer.info.mode == 'plain'
2487 schema['serialization'] = core_schema.plain_serializer_function_ser_schema(
2488 serializer.func,
2489 is_field_serializer=is_field_serializer,
2490 info_arg=info_arg,
2491 return_schema=return_schema,
2492 when_used=serializer.info.when_used,
2493 )
2494 return schema
2495
2496 def _apply_model_serializers(
2497 self, schema: core_schema.CoreSchema, serializers: Iterable[Decorator[ModelSerializerDecoratorInfo]]
2498 ) -> core_schema.CoreSchema:
2499 """Apply model serializers to a schema."""
2500 ref: str | None = schema.pop('ref', None) # type: ignore
2501 if serializers:
2502 serializer = list(serializers)[-1]
2503 info_arg = inspect_model_serializer(serializer.func, serializer.info.mode)
2504
2505 if serializer.info.return_type is not PydanticUndefined:
2506 return_type = serializer.info.return_type
2507 else:
2508 try:
2509 # Do not pass in globals as the function could be defined in a different module.
2510 # Instead, let `get_callable_return_type` infer the globals to use, but still pass
2511 # in locals that may contain a parent/rebuild namespace:
2512 return_type = _decorators.get_callable_return_type(
2513 serializer.func, localns=self._types_namespace.locals
2514 )
2515 except NameError as e:
2516 raise PydanticUndefinedAnnotation.from_name_error(e) from e
2517
2518 if return_type is PydanticUndefined:
2519 return_schema = None
2520 else:
2521 return_schema = self.generate_schema(return_type)
2522
2523 if serializer.info.mode == 'wrap':
2524 ser_schema: core_schema.SerSchema = core_schema.wrap_serializer_function_ser_schema(
2525 serializer.func,
2526 info_arg=info_arg,
2527 return_schema=return_schema,
2528 when_used=serializer.info.when_used,
2529 )
2530 else:
2531 # plain
2532 ser_schema = core_schema.plain_serializer_function_ser_schema(
2533 serializer.func,
2534 info_arg=info_arg,
2535 return_schema=return_schema,
2536 when_used=serializer.info.when_used,
2537 )
2538 schema['serialization'] = ser_schema
2539 if ref:
2540 schema['ref'] = ref # type: ignore
2541 return schema
2542
2543
2544_VALIDATOR_F_MATCH: Mapping[
2545 tuple[FieldValidatorModes, Literal['no-info', 'with-info']],
2546 Callable[[Callable[..., Any], core_schema.CoreSchema], core_schema.CoreSchema],
2547] = {
2548 ('before', 'no-info'): lambda f, schema: core_schema.no_info_before_validator_function(f, schema),
2549 ('after', 'no-info'): lambda f, schema: core_schema.no_info_after_validator_function(f, schema),
2550 ('plain', 'no-info'): lambda f, _: core_schema.no_info_plain_validator_function(f),
2551 ('wrap', 'no-info'): lambda f, schema: core_schema.no_info_wrap_validator_function(f, schema),
2552 ('before', 'with-info'): lambda f, schema: core_schema.with_info_before_validator_function(f, schema),
2553 ('after', 'with-info'): lambda f, schema: core_schema.with_info_after_validator_function(f, schema),
2554 ('plain', 'with-info'): lambda f, _: core_schema.with_info_plain_validator_function(f),
2555 ('wrap', 'with-info'): lambda f, schema: core_schema.with_info_wrap_validator_function(f, schema),
2556}
2557
2558
2559# TODO V3: this function is only used for deprecated decorators. It should
2560# be removed once we drop support for those.
2561def apply_validators(
2562 schema: core_schema.CoreSchema,
2563 validators: Iterable[Decorator[RootValidatorDecoratorInfo]]
2564 | Iterable[Decorator[ValidatorDecoratorInfo]]
2565 | Iterable[Decorator[FieldValidatorDecoratorInfo]],
2566) -> core_schema.CoreSchema:
2567 """Apply validators to a schema.
2568
2569 Args:
2570 schema: The schema to apply validators on.
2571 validators: An iterable of validators.
2572 field_name: The name of the field if validators are being applied to a model field.
2573
2574 Returns:
2575 The updated schema.
2576 """
2577 for validator in validators:
2578 # Actually, type could be 'field' or 'model', but this is only used for deprecated
2579 # decorators, so let's not worry about it.
2580 info_arg = inspect_validator(validator.func, mode=validator.info.mode, type='field')
2581 val_type = 'with-info' if info_arg else 'no-info'
2582
2583 schema = _VALIDATOR_F_MATCH[(validator.info.mode, val_type)](validator.func, schema)
2584 return schema
2585
2586
2587def _validators_require_validate_default(validators: Iterable[Decorator[ValidatorDecoratorInfo]]) -> bool:
2588 """In v1, if any of the validators for a field had `always=True`, the default value would be validated.
2589
2590 This serves as an auxiliary function for re-implementing that logic, by looping over a provided
2591 collection of (v1-style) ValidatorDecoratorInfo's and checking if any of them have `always=True`.
2592
2593 We should be able to drop this function and the associated logic calling it once we drop support
2594 for v1-style validator decorators. (Or we can extend it and keep it if we add something equivalent
2595 to the v1-validator `always` kwarg to `field_validator`.)
2596 """
2597 for validator in validators:
2598 if validator.info.always:
2599 return True
2600 return False
2601
2602
2603def _convert_to_aliases(
2604 alias: str | AliasChoices | AliasPath | None,
2605) -> str | list[str | int] | list[list[str | int]] | None:
2606 if isinstance(alias, (AliasChoices, AliasPath)):
2607 return alias.convert_to_aliases()
2608 else:
2609 return alias
2610
2611
2612def apply_model_validators(
2613 schema: core_schema.CoreSchema,
2614 validators: Iterable[Decorator[ModelValidatorDecoratorInfo]],
2615 mode: Literal['inner', 'outer', 'all'],
2616) -> core_schema.CoreSchema:
2617 """Apply model validators to a schema.
2618
2619 If mode == 'inner', only "before" validators are applied
2620 If mode == 'outer', validators other than "before" are applied
2621 If mode == 'all', all validators are applied
2622
2623 Args:
2624 schema: The schema to apply validators on.
2625 validators: An iterable of validators.
2626 mode: The validator mode.
2627
2628 Returns:
2629 The updated schema.
2630 """
2631 ref: str | None = schema.pop('ref', None) # type: ignore
2632 for validator in validators:
2633 if mode == 'inner' and validator.info.mode != 'before':
2634 continue
2635 if mode == 'outer' and validator.info.mode == 'before':
2636 continue
2637 info_arg = inspect_validator(validator.func, mode=validator.info.mode, type='model')
2638 if validator.info.mode == 'wrap':
2639 if info_arg:
2640 schema = core_schema.with_info_wrap_validator_function(function=validator.func, schema=schema)
2641 else:
2642 schema = core_schema.no_info_wrap_validator_function(function=validator.func, schema=schema)
2643 elif validator.info.mode == 'before':
2644 if info_arg:
2645 schema = core_schema.with_info_before_validator_function(function=validator.func, schema=schema)
2646 else:
2647 schema = core_schema.no_info_before_validator_function(function=validator.func, schema=schema)
2648 else:
2649 assert validator.info.mode == 'after'
2650 if info_arg:
2651 schema = core_schema.with_info_after_validator_function(function=validator.func, schema=schema)
2652 else:
2653 schema = core_schema.no_info_after_validator_function(function=validator.func, schema=schema)
2654 if ref:
2655 schema['ref'] = ref # type: ignore
2656 return schema
2657
2658
2659def wrap_default(field_info: FieldInfo, schema: core_schema.CoreSchema) -> core_schema.CoreSchema:
2660 """Wrap schema with default schema if default value or `default_factory` are available.
2661
2662 Args:
2663 field_info: The field info object.
2664 schema: The schema to apply default on.
2665
2666 Returns:
2667 Updated schema by default value or `default_factory`.
2668 """
2669 if field_info.default_factory:
2670 return core_schema.with_default_schema(
2671 schema,
2672 default_factory=field_info.default_factory,
2673 default_factory_takes_data=takes_validated_data_argument(field_info.default_factory),
2674 validate_default=field_info.validate_default,
2675 )
2676 elif field_info.default is not PydanticUndefined:
2677 return core_schema.with_default_schema(
2678 schema, default=field_info.default, validate_default=field_info.validate_default
2679 )
2680 else:
2681 return schema
2682
2683
2684def _extract_get_pydantic_json_schema(tp: Any) -> GetJsonSchemaFunction | None:
2685 """Extract `__get_pydantic_json_schema__` from a type, handling the deprecated `__modify_schema__`."""
2686 js_modify_function = getattr(tp, '__get_pydantic_json_schema__', None)
2687
2688 if hasattr(tp, '__modify_schema__'):
2689 BaseModel = import_cached_base_model()
2690
2691 has_custom_v2_modify_js_func = (
2692 js_modify_function is not None
2693 and BaseModel.__get_pydantic_json_schema__.__func__ # type: ignore
2694 not in (js_modify_function, getattr(js_modify_function, '__func__', None))
2695 )
2696
2697 if not has_custom_v2_modify_js_func:
2698 cls_name = getattr(tp, '__name__', None)
2699 raise PydanticUserError(
2700 f'The `__modify_schema__` method is not supported in Pydantic v2. '
2701 f'Use `__get_pydantic_json_schema__` instead{f" in class `{cls_name}`" if cls_name else ""}.',
2702 code='custom-json-schema',
2703 )
2704
2705 if (origin := get_origin(tp)) is not None:
2706 # Generic aliases proxy attribute access to the origin, *except* dunder attributes,
2707 # such as `__get_pydantic_json_schema__`, hence the explicit check.
2708 return _extract_get_pydantic_json_schema(origin)
2709
2710 if js_modify_function is None:
2711 return None
2712
2713 return js_modify_function
2714
2715
2716def resolve_original_schema(schema: CoreSchema, definitions: _Definitions) -> CoreSchema | None:
2717 if schema['type'] == 'definition-ref':
2718 return definitions.get_schema_from_ref(schema['schema_ref'])
2719 elif schema['type'] == 'definitions':
2720 return schema['schema']
2721 else:
2722 return schema
2723
2724
2725def _inlining_behavior(
2726 def_ref: core_schema.DefinitionReferenceSchema,
2727) -> Literal['inline', 'keep', 'preserve_metadata']:
2728 """Determine the inlining behavior of the `'definition-ref'` schema.
2729
2730 - If no `'serialization'` schema and no metadata is attached, the schema can safely be inlined.
2731 - If it has metadata but only related to the deferred discriminator application, it can be inlined
2732 provided that such metadata is kept.
2733 - Otherwise, the schema should not be inlined. Doing so would remove the `'serialization'` schema or metadata.
2734 """
2735 if 'serialization' in def_ref:
2736 return 'keep'
2737 metadata = def_ref.get('metadata')
2738 if not metadata:
2739 return 'inline'
2740 if len(metadata) == 1 and 'pydantic_internal_union_discriminator' in metadata:
2741 return 'preserve_metadata'
2742 return 'keep'
2743
2744
2745class _Definitions:
2746 """Keeps track of references and definitions."""
2747
2748 _recursively_seen: set[str]
2749 """A set of recursively seen references.
2750
2751 When a referenceable type is encountered, the `get_schema_or_ref` context manager is
2752 entered to compute the reference. If the type references itself by some way (e.g. for
2753 a dataclass a Pydantic model, the class can be referenced as a field annotation),
2754 entering the context manager again will yield a `'definition-ref'` schema that should
2755 short-circuit the normal generation process, as the reference was already in this set.
2756 """
2757
2758 _definitions: dict[str, core_schema.CoreSchema]
2759 """A mapping of references to their corresponding schema.
2760
2761 When a schema for a referenceable type is generated, it is stored in this mapping. If the
2762 same type is encountered again, the reference is yielded by the `get_schema_or_ref` context
2763 manager.
2764 """
2765
2766 def __init__(self) -> None:
2767 self._recursively_seen = set()
2768 self._definitions = {}
2769
2770 @contextmanager
2771 def get_schema_or_ref(self, tp: Any, /) -> Generator[tuple[str, core_schema.DefinitionReferenceSchema | None]]:
2772 """Get a definition for `tp` if one exists.
2773
2774 If a definition exists, a tuple of `(ref_string, CoreSchema)` is returned.
2775 If no definition exists yet, a tuple of `(ref_string, None)` is returned.
2776
2777 Note that the returned `CoreSchema` will always be a `DefinitionReferenceSchema`,
2778 not the actual definition itself.
2779
2780 This should be called for any type that can be identified by reference.
2781 This includes any recursive types.
2782
2783 At present the following types can be named/recursive:
2784
2785 - Pydantic model
2786 - Pydantic and stdlib dataclasses
2787 - Typed dictionaries
2788 - Named tuples
2789 - `TypeAliasType` instances
2790 - Enums
2791 """
2792 ref = get_type_ref(tp)
2793 # return the reference if we're either (1) in a cycle or (2) it the reference was already encountered:
2794 if ref in self._recursively_seen or ref in self._definitions:
2795 yield (ref, core_schema.definition_reference_schema(ref))
2796 else:
2797 self._recursively_seen.add(ref)
2798 try:
2799 yield (ref, None)
2800 finally:
2801 self._recursively_seen.discard(ref)
2802
2803 def get_schema_from_ref(self, ref: str) -> CoreSchema | None:
2804 """Resolve the schema from the given reference."""
2805 return self._definitions.get(ref)
2806
2807 def create_definition_reference_schema(self, schema: CoreSchema) -> core_schema.DefinitionReferenceSchema:
2808 """Store the schema as a definition and return a `'definition-reference'` schema pointing to it.
2809
2810 The schema must have a reference attached to it.
2811 """
2812 ref = schema['ref'] # pyright: ignore
2813 self._definitions[ref] = schema
2814 return core_schema.definition_reference_schema(ref)
2815
2816 def unpack_definitions(self, schema: core_schema.DefinitionsSchema) -> CoreSchema:
2817 """Store the definitions of the `'definitions'` core schema and return the inner core schema."""
2818 for def_schema in schema['definitions']:
2819 self._definitions[def_schema['ref']] = def_schema # pyright: ignore
2820 return schema['schema']
2821
2822 def finalize_schema(self, schema: CoreSchema) -> CoreSchema:
2823 """Finalize the core schema.
2824
2825 This traverses the core schema and referenced definitions, replaces `'definition-ref'` schemas
2826 by the referenced definition if possible, and applies deferred discriminators.
2827 """
2828 definitions = self._definitions
2829 try:
2830 gather_result = gather_schemas_for_cleaning(
2831 schema,
2832 definitions=definitions,
2833 )
2834 except MissingDefinitionError as e:
2835 raise InvalidSchemaError from e
2836
2837 remaining_defs: dict[str, CoreSchema] = {}
2838
2839 # Note: this logic doesn't play well when core schemas with deferred discriminator metadata
2840 # and references are encountered. See the `test_deferred_discriminated_union_and_references()` test.
2841 for ref, inlinable_def_ref in gather_result['collected_references'].items():
2842 if inlinable_def_ref is not None and (inlining_behavior := _inlining_behavior(inlinable_def_ref)) != 'keep':
2843 if inlining_behavior == 'inline':
2844 # `ref` was encountered, and only once:
2845 # - `inlinable_def_ref` is a `'definition-ref'` schema and is guaranteed to be
2846 # the only one. Transform it into the definition it points to.
2847 # - Do not store the definition in the `remaining_defs`.
2848 inlinable_def_ref.clear() # pyright: ignore[reportAttributeAccessIssue]
2849 inlinable_def_ref.update(self._resolve_definition(ref, definitions)) # pyright: ignore
2850 elif inlining_behavior == 'preserve_metadata':
2851 # `ref` was encountered, and only once, but contains discriminator metadata.
2852 # We will do the same thing as if `inlining_behavior` was `'inline'`, but make
2853 # sure to keep the metadata for the deferred discriminator application logic below.
2854 meta = inlinable_def_ref.pop('metadata')
2855 inlinable_def_ref.clear() # pyright: ignore[reportAttributeAccessIssue]
2856 inlinable_def_ref.update(self._resolve_definition(ref, definitions)) # pyright: ignore
2857 inlinable_def_ref['metadata'] = meta
2858 else:
2859 # `ref` was encountered, at least two times (or only once, but with metadata or a serialization schema):
2860 # - Do not inline the `'definition-ref'` schemas (they are not provided in the gather result anyway).
2861 # - Store the definition in the `remaining_defs`
2862 remaining_defs[ref] = self._resolve_definition(ref, definitions)
2863
2864 for cs in gather_result['deferred_discriminator_schemas']:
2865 discriminator: str | None = cs['metadata'].pop('pydantic_internal_union_discriminator', None) # pyright: ignore[reportTypedDictNotRequiredAccess]
2866 if discriminator is None:
2867 # This can happen in rare scenarios, when a deferred schema is present multiple times in the
2868 # gather result (e.g. when using the `Sequence` type -- see `test_sequence_discriminated_union()`).
2869 # In this case, a previous loop iteration applied the discriminator and so we can just skip it here.
2870 continue
2871 applied = _discriminated_union.apply_discriminator(cs.copy(), discriminator, remaining_defs)
2872 # Mutate the schema directly to have the discriminator applied
2873 cs.clear() # pyright: ignore[reportAttributeAccessIssue]
2874 cs.update(applied) # pyright: ignore
2875
2876 if remaining_defs:
2877 schema = core_schema.definitions_schema(schema=schema, definitions=[*remaining_defs.values()])
2878 return schema
2879
2880 def _resolve_definition(self, ref: str, definitions: dict[str, CoreSchema]) -> CoreSchema:
2881 definition = definitions[ref]
2882 if definition['type'] != 'definition-ref':
2883 return definition
2884
2885 # Some `'definition-ref'` schemas might act as "intermediate" references (e.g. when using
2886 # a PEP 695 type alias (which is referenceable) that references another PEP 695 type alias):
2887 visited: set[str] = set()
2888 while definition['type'] == 'definition-ref' and _inlining_behavior(definition) == 'inline':
2889 schema_ref = definition['schema_ref']
2890 if schema_ref in visited:
2891 raise PydanticUserError(
2892 f'{ref} contains a circular reference to itself.', code='circular-reference-schema'
2893 )
2894 visited.add(schema_ref)
2895 definition = definitions[schema_ref]
2896 return {**definition, 'ref': ref} # pyright: ignore[reportReturnType]
2897
2898
2899class _FieldNameStack:
2900 __slots__ = ('_stack',)
2901
2902 def __init__(self) -> None:
2903 self._stack: list[str] = []
2904
2905 @contextmanager
2906 def push(self, field_name: str) -> Iterator[None]:
2907 self._stack.append(field_name)
2908 yield
2909 self._stack.pop()
2910
2911 def get(self) -> str | None:
2912 if self._stack:
2913 return self._stack[-1]
2914 else:
2915 return None
2916
2917
2918class _ModelTypeStack:
2919 __slots__ = ('_stack',)
2920
2921 def __init__(self) -> None:
2922 self._stack: list[type] = []
2923
2924 @contextmanager
2925 def push(self, type_obj: type) -> Iterator[None]:
2926 self._stack.append(type_obj)
2927 yield
2928 self._stack.pop()
2929
2930 def get(self) -> type | None:
2931 if self._stack:
2932 return self._stack[-1]
2933 else:
2934 return None