1"""Convert python types to pydantic-core schema."""
2
3from __future__ import annotations as _annotations
4
5import collections.abc
6import dataclasses
7import datetime
8import inspect
9import os
10import pathlib
11import re
12import sys
13import typing
14import warnings
15from collections.abc import Generator, Iterable, Iterator, Mapping
16from contextlib import contextmanager
17from copy import copy
18from decimal import Decimal
19from enum import Enum
20from fractions import Fraction
21from functools import partial
22from inspect import Parameter, _ParameterKind
23from ipaddress import IPv4Address, IPv4Interface, IPv4Network, IPv6Address, IPv6Interface, IPv6Network
24from itertools import chain
25from operator import attrgetter
26from types import FunctionType, GenericAlias, LambdaType, MethodType
27from typing import (
28 TYPE_CHECKING,
29 Any,
30 Callable,
31 Final,
32 ForwardRef,
33 Literal,
34 TypeVar,
35 Union,
36 cast,
37 overload,
38)
39from uuid import UUID
40from zoneinfo import ZoneInfo
41
42import typing_extensions
43from pydantic_core import (
44 MISSING,
45 CoreSchema,
46 MultiHostUrl,
47 PydanticCustomError,
48 PydanticSerializationUnexpectedValue,
49 PydanticUndefined,
50 Url,
51 core_schema,
52 to_jsonable_python,
53)
54from typing_extensions import TypeAlias, TypeAliasType, get_args, get_origin, is_typeddict
55from typing_inspection import typing_objects
56from typing_inspection.introspection import AnnotationSource, get_literal_values, is_union_origin
57
58from ..aliases import AliasChoices, AliasPath
59from ..annotated_handlers import GetCoreSchemaHandler, GetJsonSchemaHandler
60from ..config import ConfigDict, JsonDict, JsonEncoder, JsonSchemaExtraCallable
61from ..errors import (
62 PydanticForbiddenQualifier,
63 PydanticInvalidForJsonSchema,
64 PydanticSchemaGenerationError,
65 PydanticUndefinedAnnotation,
66 PydanticUserError,
67)
68from ..functional_validators import AfterValidator, BeforeValidator, FieldValidatorModes, PlainValidator, WrapValidator
69from ..json_schema import JsonSchemaValue
70from ..version import version_short
71from ..warnings import (
72 ArbitraryTypeWarning,
73 PydanticDeprecatedSince20,
74 TypedDictExtraConfigWarning,
75 UnsupportedFieldAttributeWarning,
76)
77from . import _decorators, _discriminated_union, _known_annotated_metadata, _repr, _typing_extra
78from ._config import ConfigWrapper, ConfigWrapperStack
79from ._core_metadata import CoreMetadata, update_core_metadata
80from ._core_utils import (
81 get_ref,
82 get_type_ref,
83 is_list_like_schema_with_items_schema,
84)
85from ._decorators import (
86 Decorator,
87 DecoratorInfos,
88 FieldSerializerDecoratorInfo,
89 FieldValidatorDecoratorInfo,
90 ModelSerializerDecoratorInfo,
91 ModelValidatorDecoratorInfo,
92 RootValidatorDecoratorInfo,
93 ValidatorDecoratorInfo,
94 get_attribute_from_bases,
95 inspect_field_serializer,
96 inspect_model_serializer,
97 inspect_validator,
98)
99from ._docs_extraction import extract_docstrings_from_cls
100from ._fields import (
101 collect_dataclass_fields,
102 rebuild_dataclass_fields,
103 rebuild_model_fields,
104 takes_validated_data_argument,
105 update_field_from_config,
106)
107from ._forward_ref import PydanticRecursiveRef
108from ._generics import get_standard_typevars_map, replace_types
109from ._import_utils import import_cached_base_model, import_cached_field_info
110from ._mock_val_ser import MockCoreSchema
111from ._namespace_utils import NamespacesTuple, NsResolver
112from ._schema_gather import MissingDefinitionError, gather_schemas_for_cleaning
113from ._schema_generation_shared import CallbackGetCoreSchemaHandler
114from ._utils import lenient_issubclass, smart_deepcopy
115
116if TYPE_CHECKING:
117 from ..fields import ComputedFieldInfo, FieldInfo
118 from ..main import BaseModel
119 from ..types import Discriminator
120 from ._dataclasses import StandardDataclass
121 from ._schema_generation_shared import GetJsonSchemaFunction
122
123_SUPPORTS_TYPEDDICT = sys.version_info >= (3, 12)
124
125FieldDecoratorInfo = Union[ValidatorDecoratorInfo, FieldValidatorDecoratorInfo, FieldSerializerDecoratorInfo]
126FieldDecoratorInfoType = TypeVar('FieldDecoratorInfoType', bound=FieldDecoratorInfo)
127AnyFieldDecorator = Union[
128 Decorator[ValidatorDecoratorInfo],
129 Decorator[FieldValidatorDecoratorInfo],
130 Decorator[FieldSerializerDecoratorInfo],
131]
132
133ModifyCoreSchemaWrapHandler: TypeAlias = GetCoreSchemaHandler
134GetCoreSchemaFunction: TypeAlias = Callable[[Any, ModifyCoreSchemaWrapHandler], core_schema.CoreSchema]
135ParametersCallback: TypeAlias = "Callable[[int, str, Any], Literal['skip'] | None]"
136
137TUPLE_TYPES: list[type] = [typing.Tuple, tuple] # noqa: UP006
138LIST_TYPES: list[type] = [typing.List, list, collections.abc.MutableSequence] # noqa: UP006
139SET_TYPES: list[type] = [typing.Set, set, collections.abc.MutableSet] # noqa: UP006
140FROZEN_SET_TYPES: list[type] = [typing.FrozenSet, frozenset, collections.abc.Set] # noqa: UP006
141DICT_TYPES: list[type] = [typing.Dict, dict] # noqa: UP006
142IP_TYPES: list[type] = [IPv4Address, IPv4Interface, IPv4Network, IPv6Address, IPv6Interface, IPv6Network]
143SEQUENCE_TYPES: list[type] = [typing.Sequence, collections.abc.Sequence]
144ITERABLE_TYPES: list[type] = [typing.Iterable, collections.abc.Iterable, typing.Generator, collections.abc.Generator]
145TYPE_TYPES: list[type] = [typing.Type, type] # noqa: UP006
146PATTERN_TYPES: list[type] = [typing.Pattern, re.Pattern]
147PATH_TYPES: list[type] = [
148 os.PathLike,
149 pathlib.Path,
150 pathlib.PurePath,
151 pathlib.PosixPath,
152 pathlib.PurePosixPath,
153 pathlib.PureWindowsPath,
154]
155MAPPING_TYPES = [
156 typing.Mapping,
157 typing.MutableMapping,
158 collections.abc.Mapping,
159 collections.abc.MutableMapping,
160 collections.OrderedDict,
161 typing_extensions.OrderedDict,
162 typing.DefaultDict, # noqa: UP006
163 collections.defaultdict,
164]
165COUNTER_TYPES = [collections.Counter, typing.Counter]
166DEQUE_TYPES: list[type] = [collections.deque, typing.Deque] # noqa: UP006
167
168# Note: This does not play very well with type checkers. For example,
169# `a: LambdaType = lambda x: x` will raise a type error by Pyright.
170ValidateCallSupportedTypes = Union[
171 LambdaType,
172 FunctionType,
173 MethodType,
174 partial,
175]
176
177VALIDATE_CALL_SUPPORTED_TYPES = get_args(ValidateCallSupportedTypes)
178UNSUPPORTED_STANDALONE_FIELDINFO_ATTRIBUTES: list[tuple[str, Any]] = [
179 ('alias', None),
180 ('validation_alias', None),
181 ('serialization_alias', None),
182 # will be set if any alias is set, so disable it to avoid double warnings:
183 # 'alias_priority',
184 ('default', PydanticUndefined),
185 ('default_factory', None),
186 ('exclude', None),
187 ('deprecated', None),
188 ('repr', True),
189 ('validate_default', None),
190 ('frozen', None),
191 ('init', None),
192 ('init_var', None),
193 ('kw_only', None),
194]
195"""`FieldInfo` attributes (and their default value) that can't be used outside of a model (e.g. in a type adapter or a PEP 695 type alias)."""
196
197_mode_to_validator: dict[
198 FieldValidatorModes, type[BeforeValidator | AfterValidator | PlainValidator | WrapValidator]
199] = {'before': BeforeValidator, 'after': AfterValidator, 'plain': PlainValidator, 'wrap': WrapValidator}
200
201
202def check_validator_fields_against_field_name(
203 info: FieldDecoratorInfo,
204 field: str,
205) -> bool:
206 """Check if field name is in validator fields.
207
208 Args:
209 info: The field info.
210 field: The field name to check.
211
212 Returns:
213 `True` if field name is in validator fields, `False` otherwise.
214 """
215 fields = info.fields
216 return '*' in fields or field in fields
217
218
219def check_decorator_fields_exist(decorators: Iterable[AnyFieldDecorator], fields: Iterable[str]) -> None:
220 """Check if the defined fields in decorators exist in `fields` param.
221
222 It ignores the check for a decorator if the decorator has `*` as field or `check_fields=False`.
223
224 Args:
225 decorators: An iterable of decorators.
226 fields: An iterable of fields name.
227
228 Raises:
229 PydanticUserError: If one of the field names does not exist in `fields` param.
230 """
231 fields = set(fields)
232 for dec in decorators:
233 if '*' in dec.info.fields:
234 continue
235 if dec.info.check_fields is False:
236 continue
237 for field in dec.info.fields:
238 if field not in fields:
239 raise PydanticUserError(
240 f'Decorators defined with incorrect fields: {dec.cls_ref}.{dec.cls_var_name}'
241 " (use check_fields=False if you're inheriting from the model and intended this)",
242 code='decorator-missing-field',
243 )
244
245
246def filter_field_decorator_info_by_field(
247 validator_functions: Iterable[Decorator[FieldDecoratorInfoType]], field: str
248) -> list[Decorator[FieldDecoratorInfoType]]:
249 return [dec for dec in validator_functions if check_validator_fields_against_field_name(dec.info, field)]
250
251
252def apply_each_item_validators(
253 schema: core_schema.CoreSchema,
254 each_item_validators: list[Decorator[ValidatorDecoratorInfo]],
255) -> core_schema.CoreSchema:
256 # This V1 compatibility shim should eventually be removed
257
258 # fail early if each_item_validators is empty
259 if not each_item_validators:
260 return schema
261
262 # push down any `each_item=True` validators
263 # note that this won't work for any Annotated types that get wrapped by a function validator
264 # but that's okay because that didn't exist in V1
265 if schema['type'] == 'nullable':
266 schema['schema'] = apply_each_item_validators(schema['schema'], each_item_validators)
267 return schema
268 elif schema['type'] == 'tuple':
269 if (variadic_item_index := schema.get('variadic_item_index')) is not None:
270 schema['items_schema'][variadic_item_index] = apply_validators(
271 schema['items_schema'][variadic_item_index],
272 each_item_validators,
273 )
274 elif is_list_like_schema_with_items_schema(schema):
275 inner_schema = schema.get('items_schema', core_schema.any_schema())
276 schema['items_schema'] = apply_validators(inner_schema, each_item_validators)
277 elif schema['type'] == 'dict':
278 inner_schema = schema.get('values_schema', core_schema.any_schema())
279 schema['values_schema'] = apply_validators(inner_schema, each_item_validators)
280 else:
281 raise TypeError(
282 f'`@validator(..., each_item=True)` cannot be applied to fields with a schema of {schema["type"]}'
283 )
284 return schema
285
286
287def _extract_json_schema_info_from_field_info(
288 info: FieldInfo | ComputedFieldInfo,
289) -> tuple[JsonDict | None, JsonDict | JsonSchemaExtraCallable | None]:
290 json_schema_updates = {
291 'title': info.title,
292 'description': info.description,
293 'deprecated': bool(info.deprecated) or info.deprecated == '' or None,
294 'examples': to_jsonable_python(info.examples),
295 }
296 json_schema_updates = {k: v for k, v in json_schema_updates.items() if v is not None}
297 return (json_schema_updates or None, info.json_schema_extra)
298
299
300JsonEncoders = dict[type[Any], JsonEncoder]
301
302
303def _add_custom_serialization_from_json_encoders(
304 json_encoders: JsonEncoders | None, tp: Any, schema: CoreSchema
305) -> CoreSchema:
306 """Iterate over the json_encoders and add the first matching encoder to the schema.
307
308 Args:
309 json_encoders: A dictionary of types and their encoder functions.
310 tp: The type to check for a matching encoder.
311 schema: The schema to add the encoder to.
312 """
313 if not json_encoders:
314 return schema
315 if 'serialization' in schema:
316 return schema
317 # Check the class type and its superclasses for a matching encoder
318 # Decimal.__class__.__mro__ (and probably other cases) doesn't include Decimal itself
319 # if the type is a GenericAlias (e.g. from list[int]) we need to use __class__ instead of .__mro__
320 for base in (tp, *getattr(tp, '__mro__', tp.__class__.__mro__)[:-1]):
321 encoder = json_encoders.get(base)
322 if encoder is None:
323 continue
324
325 warnings.warn(
326 f'`json_encoders` is deprecated. See https://docs.pydantic.dev/{version_short()}/concepts/serialization/#custom-serializers for alternatives',
327 PydanticDeprecatedSince20,
328 )
329
330 # TODO: in theory we should check that the schema accepts a serialization key
331 schema['serialization'] = core_schema.plain_serializer_function_ser_schema(encoder, when_used='json')
332 return schema
333
334 return schema
335
336
337GENERATE_SCHEMA_ERRORS = (
338 PydanticForbiddenQualifier,
339 PydanticInvalidForJsonSchema,
340 PydanticSchemaGenerationError,
341 PydanticUndefinedAnnotation,
342)
343"""Errors raised during core schema generation. This does *not* include `InvalidSchemaError`, which is raised during schema cleaning."""
344
345
346class InvalidSchemaError(Exception):
347 """The core schema is invalid."""
348
349
350class GenerateSchema:
351 """Generate core schema for a Pydantic model, dataclass and types like `str`, `datetime`, ... ."""
352
353 __slots__ = (
354 '_config_wrapper_stack',
355 '_ns_resolver',
356 '_typevars_map',
357 'field_name_stack',
358 'model_type_stack',
359 'defs',
360 )
361
362 def __init__(
363 self,
364 config_wrapper: ConfigWrapper,
365 ns_resolver: NsResolver | None = None,
366 typevars_map: Mapping[TypeVar, Any] | None = None,
367 ) -> None:
368 # we need a stack for recursing into nested models
369 self._config_wrapper_stack = ConfigWrapperStack(config_wrapper)
370 self._ns_resolver = ns_resolver or NsResolver()
371 self._typevars_map = typevars_map
372 self.field_name_stack = _FieldNameStack()
373 self.model_type_stack = _ModelTypeStack()
374 self.defs = _Definitions()
375
376 def __init_subclass__(cls) -> None:
377 super().__init_subclass__()
378 warnings.warn(
379 'Subclassing `GenerateSchema` is not supported. The API is highly subject to change in minor versions.',
380 UserWarning,
381 stacklevel=2,
382 )
383
384 @property
385 def _config_wrapper(self) -> ConfigWrapper:
386 return self._config_wrapper_stack.tail
387
388 @property
389 def _types_namespace(self) -> NamespacesTuple:
390 return self._ns_resolver.types_namespace
391
392 @property
393 def _arbitrary_types(self) -> bool:
394 return self._config_wrapper.arbitrary_types_allowed
395
396 # the following methods can be overridden but should be considered
397 # unstable / private APIs
398 def _list_schema(self, items_type: Any) -> CoreSchema:
399 return core_schema.list_schema(self.generate_schema(items_type))
400
401 def _dict_schema(self, keys_type: Any, values_type: Any) -> CoreSchema:
402 return core_schema.dict_schema(self.generate_schema(keys_type), self.generate_schema(values_type))
403
404 def _set_schema(self, items_type: Any) -> CoreSchema:
405 return core_schema.set_schema(self.generate_schema(items_type))
406
407 def _frozenset_schema(self, items_type: Any) -> CoreSchema:
408 return core_schema.frozenset_schema(self.generate_schema(items_type))
409
410 def _enum_schema(self, enum_type: type[Enum]) -> CoreSchema:
411 cases: list[Any] = list(enum_type.__members__.values())
412
413 enum_ref = get_type_ref(enum_type)
414 description = None if not enum_type.__doc__ else inspect.cleandoc(enum_type.__doc__)
415 if (
416 description == 'An enumeration.'
417 ): # This is the default value provided by enum.EnumMeta.__new__; don't use it
418 description = None
419 js_updates = {'title': enum_type.__name__, 'description': description}
420 js_updates = {k: v for k, v in js_updates.items() if v is not None}
421
422 sub_type: Literal['str', 'int', 'float'] | None = None
423 if issubclass(enum_type, int):
424 sub_type = 'int'
425 value_ser_type: core_schema.SerSchema = core_schema.simple_ser_schema('int')
426 elif issubclass(enum_type, str):
427 # this handles `StrEnum` (3.11 only), and also `Foobar(str, Enum)`
428 sub_type = 'str'
429 value_ser_type = core_schema.simple_ser_schema('str')
430 elif issubclass(enum_type, float):
431 sub_type = 'float'
432 value_ser_type = core_schema.simple_ser_schema('float')
433 else:
434 # TODO this is an ugly hack, how do we trigger an Any schema for serialization?
435 value_ser_type = core_schema.plain_serializer_function_ser_schema(lambda x: x)
436
437 if cases:
438
439 def get_json_schema(schema: CoreSchema, handler: GetJsonSchemaHandler) -> JsonSchemaValue:
440 json_schema = handler(schema)
441 original_schema = handler.resolve_ref_schema(json_schema)
442 original_schema.update(js_updates)
443 return json_schema
444
445 # we don't want to add the missing to the schema if it's the default one
446 default_missing = getattr(enum_type._missing_, '__func__', None) is Enum._missing_.__func__ # pyright: ignore[reportFunctionMemberAccess]
447 enum_schema = core_schema.enum_schema(
448 enum_type,
449 cases,
450 sub_type=sub_type,
451 missing=None if default_missing else enum_type._missing_,
452 ref=enum_ref,
453 metadata={'pydantic_js_functions': [get_json_schema]},
454 )
455
456 if self._config_wrapper.use_enum_values:
457 enum_schema = core_schema.no_info_after_validator_function(
458 attrgetter('value'), enum_schema, serialization=value_ser_type
459 )
460
461 return enum_schema
462
463 else:
464
465 def get_json_schema_no_cases(_, handler: GetJsonSchemaHandler) -> JsonSchemaValue:
466 json_schema = handler(core_schema.enum_schema(enum_type, cases, sub_type=sub_type, ref=enum_ref))
467 original_schema = handler.resolve_ref_schema(json_schema)
468 original_schema.update(js_updates)
469 return json_schema
470
471 # Use an isinstance check for enums with no cases.
472 # The most important use case for this is creating TypeVar bounds for generics that should
473 # be restricted to enums. This is more consistent than it might seem at first, since you can only
474 # subclass enum.Enum (or subclasses of enum.Enum) if all parent classes have no cases.
475 # We use the get_json_schema function when an Enum subclass has been declared with no cases
476 # so that we can still generate a valid json schema.
477 return core_schema.is_instance_schema(
478 enum_type,
479 metadata={'pydantic_js_functions': [get_json_schema_no_cases]},
480 )
481
482 def _ip_schema(self, tp: Any) -> CoreSchema:
483 from ._validators import IP_VALIDATOR_LOOKUP, IpType
484
485 ip_type_json_schema_format: dict[type[IpType], str] = {
486 IPv4Address: 'ipv4',
487 IPv4Network: 'ipv4network',
488 IPv4Interface: 'ipv4interface',
489 IPv6Address: 'ipv6',
490 IPv6Network: 'ipv6network',
491 IPv6Interface: 'ipv6interface',
492 }
493
494 def ser_ip(ip: Any, info: core_schema.SerializationInfo) -> str | IpType:
495 if not isinstance(ip, (tp, str)):
496 raise PydanticSerializationUnexpectedValue(
497 f"Expected `{tp}` but got `{type(ip)}` with value `'{ip}'` - serialized value may not be as expected."
498 )
499 if info.mode == 'python':
500 return ip
501 return str(ip)
502
503 return core_schema.lax_or_strict_schema(
504 lax_schema=core_schema.no_info_plain_validator_function(IP_VALIDATOR_LOOKUP[tp]),
505 strict_schema=core_schema.json_or_python_schema(
506 json_schema=core_schema.no_info_after_validator_function(tp, core_schema.str_schema()),
507 python_schema=core_schema.is_instance_schema(tp),
508 ),
509 serialization=core_schema.plain_serializer_function_ser_schema(ser_ip, info_arg=True, when_used='always'),
510 metadata={
511 'pydantic_js_functions': [lambda _1, _2: {'type': 'string', 'format': ip_type_json_schema_format[tp]}]
512 },
513 )
514
515 def _path_schema(self, tp: Any, path_type: Any) -> CoreSchema:
516 if tp is os.PathLike and (path_type not in {str, bytes} and not typing_objects.is_any(path_type)):
517 raise PydanticUserError(
518 '`os.PathLike` can only be used with `str`, `bytes` or `Any`', code='schema-for-unknown-type'
519 )
520
521 path_constructor = pathlib.PurePath if tp is os.PathLike else tp
522 strict_inner_schema = (
523 core_schema.bytes_schema(strict=True) if (path_type is bytes) else core_schema.str_schema(strict=True)
524 )
525 lax_inner_schema = core_schema.bytes_schema() if (path_type is bytes) else core_schema.str_schema()
526
527 def path_validator(input_value: str | bytes) -> os.PathLike[Any]: # type: ignore
528 try:
529 if path_type is bytes:
530 if isinstance(input_value, bytes):
531 try:
532 input_value = input_value.decode()
533 except UnicodeDecodeError as e:
534 raise PydanticCustomError('bytes_type', 'Input must be valid bytes') from e
535 else:
536 raise PydanticCustomError('bytes_type', 'Input must be bytes')
537 elif not isinstance(input_value, str):
538 raise PydanticCustomError('path_type', 'Input is not a valid path')
539
540 return path_constructor(input_value) # type: ignore
541 except TypeError as e:
542 raise PydanticCustomError('path_type', 'Input is not a valid path') from e
543
544 def ser_path(path: Any, info: core_schema.SerializationInfo) -> str | os.PathLike[Any]:
545 if not isinstance(path, (tp, str)):
546 raise PydanticSerializationUnexpectedValue(
547 f"Expected `{tp}` but got `{type(path)}` with value `'{path}'` - serialized value may not be as expected."
548 )
549 if info.mode == 'python':
550 return path
551 return str(path)
552
553 instance_schema = core_schema.json_or_python_schema(
554 json_schema=core_schema.no_info_after_validator_function(path_validator, lax_inner_schema),
555 python_schema=core_schema.is_instance_schema(tp),
556 )
557
558 schema = core_schema.lax_or_strict_schema(
559 lax_schema=core_schema.union_schema(
560 [
561 instance_schema,
562 core_schema.no_info_after_validator_function(path_validator, strict_inner_schema),
563 ],
564 custom_error_type='path_type',
565 custom_error_message=f'Input is not a valid path for {tp}',
566 ),
567 strict_schema=instance_schema,
568 serialization=core_schema.plain_serializer_function_ser_schema(ser_path, info_arg=True, when_used='always'),
569 metadata={'pydantic_js_functions': [lambda source, handler: {**handler(source), 'format': 'path'}]},
570 )
571 return schema
572
573 def _deque_schema(self, items_type: Any) -> CoreSchema:
574 from ._serializers import serialize_sequence_via_list
575 from ._validators import deque_validator
576
577 item_type_schema = self.generate_schema(items_type)
578
579 # we have to use a lax list schema here, because we need to validate the deque's
580 # items via a list schema, but it's ok if the deque itself is not a list
581 list_schema = core_schema.list_schema(item_type_schema, strict=False)
582
583 check_instance = core_schema.json_or_python_schema(
584 json_schema=list_schema,
585 python_schema=core_schema.is_instance_schema(collections.deque, cls_repr='Deque'),
586 )
587
588 lax_schema = core_schema.no_info_wrap_validator_function(deque_validator, list_schema)
589
590 return core_schema.lax_or_strict_schema(
591 lax_schema=lax_schema,
592 strict_schema=core_schema.chain_schema([check_instance, lax_schema]),
593 serialization=core_schema.wrap_serializer_function_ser_schema(
594 serialize_sequence_via_list, schema=item_type_schema, info_arg=True
595 ),
596 )
597
598 def _mapping_schema(self, tp: Any, keys_type: Any, values_type: Any) -> CoreSchema:
599 from ._validators import MAPPING_ORIGIN_MAP, defaultdict_validator, get_defaultdict_default_default_factory
600
601 mapped_origin = MAPPING_ORIGIN_MAP[tp]
602 keys_schema = self.generate_schema(keys_type)
603 with warnings.catch_warnings():
604 # We kind of abused `Field()` default factories to be able to specify
605 # the `defaultdict`'s `default_factory`. As a consequence, we get warnings
606 # as normally `FieldInfo.default_factory` is unsupported in the context where
607 # `Field()` is used and our only solution is to ignore them (note that this might
608 # wrongfully ignore valid warnings, e.g. if the `value_type` is a PEP 695 type alias
609 # with unsupported metadata).
610 warnings.simplefilter('ignore', category=UnsupportedFieldAttributeWarning)
611 values_schema = self.generate_schema(values_type)
612 dict_schema = core_schema.dict_schema(keys_schema, values_schema, strict=False)
613
614 if mapped_origin is dict:
615 schema = dict_schema
616 else:
617 check_instance = core_schema.json_or_python_schema(
618 json_schema=dict_schema,
619 python_schema=core_schema.is_instance_schema(mapped_origin),
620 )
621
622 if tp is collections.defaultdict:
623 default_default_factory = get_defaultdict_default_default_factory(values_type)
624 coerce_instance_wrap = partial(
625 core_schema.no_info_wrap_validator_function,
626 partial(defaultdict_validator, default_default_factory=default_default_factory),
627 )
628 else:
629 coerce_instance_wrap = partial(core_schema.no_info_after_validator_function, mapped_origin)
630
631 lax_schema = coerce_instance_wrap(dict_schema)
632 strict_schema = core_schema.chain_schema([check_instance, lax_schema])
633
634 schema = core_schema.lax_or_strict_schema(
635 lax_schema=lax_schema,
636 strict_schema=strict_schema,
637 serialization=core_schema.wrap_serializer_function_ser_schema(
638 lambda v, h: h(v), schema=dict_schema, info_arg=False
639 ),
640 )
641
642 return schema
643
644 def _fraction_schema(self) -> CoreSchema:
645 """Support for [`fractions.Fraction`][fractions.Fraction]."""
646 from ._validators import fraction_validator
647
648 # TODO: note, this is a fairly common pattern, re lax / strict for attempted type coercion,
649 # can we use a helper function to reduce boilerplate?
650 return core_schema.lax_or_strict_schema(
651 lax_schema=core_schema.no_info_plain_validator_function(fraction_validator),
652 strict_schema=core_schema.json_or_python_schema(
653 json_schema=core_schema.no_info_plain_validator_function(fraction_validator),
654 python_schema=core_schema.is_instance_schema(Fraction),
655 ),
656 # use str serialization to guarantee round trip behavior
657 serialization=core_schema.to_string_ser_schema(when_used='always'),
658 metadata={'pydantic_js_functions': [lambda _1, _2: {'type': 'string', 'format': 'fraction'}]},
659 )
660
661 def _arbitrary_type_schema(self, tp: Any) -> CoreSchema:
662 if not isinstance(tp, type):
663 warnings.warn(
664 f'{tp!r} is not a Python type (it may be an instance of an object),'
665 ' Pydantic will allow any object with no validation since we cannot even'
666 ' enforce that the input is an instance of the given type.'
667 ' To get rid of this error wrap the type with `pydantic.SkipValidation`.',
668 ArbitraryTypeWarning,
669 )
670 return core_schema.any_schema()
671 return core_schema.is_instance_schema(tp)
672
673 def _unknown_type_schema(self, obj: Any) -> CoreSchema:
674 raise PydanticSchemaGenerationError(
675 f'Unable to generate pydantic-core schema for {obj!r}. '
676 'Set `arbitrary_types_allowed=True` in the model_config to ignore this error'
677 ' or implement `__get_pydantic_core_schema__` on your type to fully support it.'
678 '\n\nIf you got this error by calling handler(<some type>) within'
679 ' `__get_pydantic_core_schema__` then you likely need to call'
680 ' `handler.generate_schema(<some type>)` since we do not call'
681 ' `__get_pydantic_core_schema__` on `<some type>` otherwise to avoid infinite recursion.'
682 )
683
684 def _apply_discriminator_to_union(
685 self, schema: CoreSchema, discriminator: str | Discriminator | None
686 ) -> CoreSchema:
687 if discriminator is None:
688 return schema
689 try:
690 return _discriminated_union.apply_discriminator(
691 schema,
692 discriminator,
693 self.defs._definitions,
694 )
695 except _discriminated_union.MissingDefinitionForUnionRef:
696 # defer until defs are resolved
697 _discriminated_union.set_discriminator_in_metadata(
698 schema,
699 discriminator,
700 )
701 return schema
702
703 def clean_schema(self, schema: CoreSchema) -> CoreSchema:
704 return self.defs.finalize_schema(schema)
705
706 def _add_js_function(self, metadata_schema: CoreSchema, js_function: Callable[..., Any]) -> None:
707 metadata = metadata_schema.get('metadata', {})
708 pydantic_js_functions = metadata.setdefault('pydantic_js_functions', [])
709 # because of how we generate core schemas for nested generic models
710 # we can end up adding `BaseModel.__get_pydantic_json_schema__` multiple times
711 # this check may fail to catch duplicates if the function is a `functools.partial`
712 # or something like that, but if it does it'll fail by inserting the duplicate
713 if js_function not in pydantic_js_functions:
714 pydantic_js_functions.append(js_function)
715 metadata_schema['metadata'] = metadata
716
717 def generate_schema(
718 self,
719 obj: Any,
720 ) -> core_schema.CoreSchema:
721 """Generate core schema.
722
723 Args:
724 obj: The object to generate core schema for.
725
726 Returns:
727 The generated core schema.
728
729 Raises:
730 PydanticUndefinedAnnotation:
731 If it is not possible to evaluate forward reference.
732 PydanticSchemaGenerationError:
733 If it is not possible to generate pydantic-core schema.
734 TypeError:
735 - If `alias_generator` returns a disallowed type (must be str, AliasPath or AliasChoices).
736 - If V1 style validator with `each_item=True` applied on a wrong field.
737 PydanticUserError:
738 - If `typing.TypedDict` is used instead of `typing_extensions.TypedDict` on Python < 3.12.
739 - If `__modify_schema__` method is used instead of `__get_pydantic_json_schema__`.
740 """
741 schema = self._generate_schema_from_get_schema_method(obj, obj)
742
743 if schema is None:
744 schema = self._generate_schema_inner(obj)
745
746 metadata_js_function = _extract_get_pydantic_json_schema(obj)
747 if metadata_js_function is not None:
748 metadata_schema = resolve_original_schema(schema, self.defs)
749 if metadata_schema:
750 self._add_js_function(metadata_schema, metadata_js_function)
751
752 schema = _add_custom_serialization_from_json_encoders(self._config_wrapper.json_encoders, obj, schema)
753
754 return schema
755
756 def _model_schema(self, cls: type[BaseModel]) -> core_schema.CoreSchema:
757 """Generate schema for a Pydantic model."""
758 BaseModel_ = import_cached_base_model()
759
760 with self.defs.get_schema_or_ref(cls) as (model_ref, maybe_schema):
761 if maybe_schema is not None:
762 return maybe_schema
763
764 schema = cls.__dict__.get('__pydantic_core_schema__')
765 if schema is not None and not isinstance(schema, MockCoreSchema):
766 if schema['type'] == 'definitions':
767 schema = self.defs.unpack_definitions(schema)
768 ref = get_ref(schema)
769 if ref:
770 return self.defs.create_definition_reference_schema(schema)
771 else:
772 return schema
773
774 config_wrapper = ConfigWrapper(cls.model_config, check=False)
775
776 with self._config_wrapper_stack.push(config_wrapper), self._ns_resolver.push(cls):
777 core_config = self._config_wrapper.core_config(title=cls.__name__)
778
779 if cls.__pydantic_fields_complete__ or cls is BaseModel_:
780 fields = getattr(cls, '__pydantic_fields__', {})
781 extra_info = getattr(cls, '__pydantic_extra_info__', None)
782 else:
783 if '__pydantic_fields__' not in cls.__dict__:
784 # This happens when we have a loop in the schema generation:
785 # class Base[T](BaseModel):
786 # t: T
787 #
788 # class Other(BaseModel):
789 # b: 'Base[Other]'
790 # When we build fields for `Other`, we evaluate the forward annotation.
791 # At this point, `Other` doesn't have the model fields set. We create
792 # `Base[Other]`; model fields are successfully built, and we try to generate
793 # a schema for `t: Other`. As `Other.__pydantic_fields__` aren't set, we abort.
794 raise PydanticUndefinedAnnotation(
795 name=cls.__name__,
796 message=f'Class {cls.__name__!r} is not defined',
797 )
798 try:
799 fields, extra_info = rebuild_model_fields(
800 cls,
801 config_wrapper=self._config_wrapper,
802 ns_resolver=self._ns_resolver,
803 typevars_map=self._typevars_map or {},
804 )
805 except NameError as e:
806 raise PydanticUndefinedAnnotation.from_name_error(e) from e
807
808 decorators = cls.__pydantic_decorators__
809 computed_fields = decorators.computed_fields
810 check_decorator_fields_exist(
811 chain(
812 decorators.field_validators.values(),
813 decorators.field_serializers.values(),
814 decorators.validators.values(),
815 ),
816 {*fields.keys(), *computed_fields.keys()},
817 )
818
819 model_validators = decorators.model_validators.values()
820
821 extras_schema = None
822 extras_keys_schema = None
823 if core_config.get('extra_fields_behavior') == 'allow' and extra_info is not None:
824 tp = get_origin(extra_info.annotation)
825 if tp not in DICT_TYPES:
826 raise PydanticSchemaGenerationError(
827 'The type annotation for `__pydantic_extra__` must be `dict[str, ...]`'
828 )
829 # See the comments in `_get_args_resolving_forward_refs()` for why we need
830 # to re-evaluate the annotation:
831 extra_keys_type, extra_items_type = self._get_args_resolving_forward_refs(
832 extra_info.annotation,
833 required=True,
834 )
835 if extra_keys_type is not str:
836 extras_keys_schema = self.generate_schema(extra_keys_type)
837 if not typing_objects.is_any(extra_items_type):
838 extras_schema = self.generate_schema(extra_items_type)
839
840 generic_origin: type[BaseModel] | None = getattr(cls, '__pydantic_generic_metadata__', {}).get('origin')
841
842 if cls.__pydantic_root_model__:
843 # FIXME: should the common field metadata be used here?
844 inner_schema, _ = self._common_field_schema('root', fields['root'], decorators)
845 inner_schema = apply_model_validators(inner_schema, model_validators, 'inner')
846 model_schema = core_schema.model_schema(
847 cls,
848 inner_schema,
849 generic_origin=generic_origin,
850 custom_init=getattr(cls, '__pydantic_custom_init__', None),
851 root_model=True,
852 post_init=getattr(cls, '__pydantic_post_init__', None),
853 config=core_config,
854 ref=model_ref,
855 )
856 else:
857 fields_schema: core_schema.CoreSchema = core_schema.model_fields_schema(
858 {k: self._generate_md_field_schema(k, v, decorators) for k, v in fields.items()},
859 computed_fields=[
860 self._computed_field_schema(d, decorators.field_serializers)
861 for d in computed_fields.values()
862 ],
863 extras_schema=extras_schema,
864 extras_keys_schema=extras_keys_schema,
865 model_name=cls.__name__,
866 )
867 inner_schema = apply_validators(fields_schema, decorators.root_validators.values())
868 inner_schema = apply_model_validators(inner_schema, model_validators, 'inner')
869
870 model_schema = core_schema.model_schema(
871 cls,
872 inner_schema,
873 generic_origin=generic_origin,
874 custom_init=getattr(cls, '__pydantic_custom_init__', None),
875 root_model=False,
876 post_init=getattr(cls, '__pydantic_post_init__', None),
877 config=core_config,
878 ref=model_ref,
879 )
880
881 schema = self._apply_model_serializers(model_schema, decorators.model_serializers.values())
882 schema = apply_model_validators(schema, model_validators, 'outer')
883 return self.defs.create_definition_reference_schema(schema)
884
885 def _resolve_self_type(self, obj: Any) -> Any:
886 obj = self.model_type_stack.get()
887 if obj is None:
888 raise PydanticUserError('`typing.Self` is invalid in this context', code='invalid-self-type')
889 return obj
890
891 def _generate_schema_from_get_schema_method(self, obj: Any, source: Any) -> core_schema.CoreSchema | None:
892 BaseModel_ = import_cached_base_model()
893
894 get_schema = getattr(obj, '__get_pydantic_core_schema__', None)
895 is_base_model_get_schema = (
896 getattr(get_schema, '__func__', None) is BaseModel_.__get_pydantic_core_schema__.__func__ # pyright: ignore[reportFunctionMemberAccess]
897 )
898
899 if (
900 get_schema is not None
901 # BaseModel.__get_pydantic_core_schema__ is defined for backwards compatibility,
902 # to allow existing code to call `super().__get_pydantic_core_schema__` in Pydantic
903 # model that overrides `__get_pydantic_core_schema__`. However, it raises a deprecation
904 # warning stating that the method will be removed, and during the core schema gen we actually
905 # don't call the method:
906 and not is_base_model_get_schema
907 ):
908 # Some referenceable types might have a `__get_pydantic_core_schema__` method
909 # defined on it by users (e.g. on a dataclass). This generally doesn't play well
910 # as these types are already recognized by the `GenerateSchema` class and isn't ideal
911 # as we might end up calling `get_schema_or_ref` (expensive) on types that are actually
912 # not referenceable:
913 with self.defs.get_schema_or_ref(obj) as (_, maybe_schema):
914 if maybe_schema is not None:
915 return maybe_schema
916
917 if obj is source:
918 ref_mode = 'unpack'
919 else:
920 ref_mode = 'to-def'
921 schema = get_schema(
922 source, CallbackGetCoreSchemaHandler(self._generate_schema_inner, self, ref_mode=ref_mode)
923 )
924 if schema['type'] == 'definitions':
925 schema = self.defs.unpack_definitions(schema)
926
927 ref = get_ref(schema)
928 if ref:
929 return self.defs.create_definition_reference_schema(schema)
930
931 # Note: if schema is of type `'definition-ref'`, we might want to copy it as a
932 # safety measure (because these are inlined in place -- i.e. mutated directly)
933 return schema
934
935 if get_schema is None and (validators := getattr(obj, '__get_validators__', None)) is not None:
936 from pydantic.v1 import BaseModel as BaseModelV1
937
938 if issubclass(obj, BaseModelV1):
939 warnings.warn(
940 f'Mixing V1 models and V2 models (or constructs, like `TypeAdapter`) is not supported. Please upgrade `{obj.__name__}` to V2.',
941 UserWarning,
942 )
943 else:
944 warnings.warn(
945 '`__get_validators__` is deprecated and will be removed, use `__get_pydantic_core_schema__` instead.',
946 PydanticDeprecatedSince20,
947 )
948 return core_schema.chain_schema([core_schema.with_info_plain_validator_function(v) for v in validators()])
949
950 def _resolve_forward_ref(self, obj: Any) -> Any:
951 # we assume that types_namespace has the target of forward references in its scope,
952 # but this could fail, for example, if calling Validator on an imported type which contains
953 # forward references to other types only defined in the module from which it was imported
954 # `Validator(SomeImportedTypeAliasWithAForwardReference)`
955 # or the equivalent for BaseModel
956 # class Model(BaseModel):
957 # x: SomeImportedTypeAliasWithAForwardReference
958 try:
959 obj = _typing_extra.eval_type_backport(obj, *self._types_namespace)
960 except NameError as e:
961 raise PydanticUndefinedAnnotation.from_name_error(e) from e
962
963 # if obj is still a ForwardRef, it means we can't evaluate it, raise PydanticUndefinedAnnotation
964 if isinstance(obj, ForwardRef):
965 raise PydanticUndefinedAnnotation(obj.__forward_arg__, f'Unable to evaluate forward reference {obj}')
966
967 if self._typevars_map:
968 obj = replace_types(obj, self._typevars_map)
969
970 return obj
971
972 @overload
973 def _get_args_resolving_forward_refs(self, obj: Any, required: Literal[True]) -> tuple[Any, ...]: ...
974
975 @overload
976 def _get_args_resolving_forward_refs(self, obj: Any) -> tuple[Any, ...] | None: ...
977
978 def _get_args_resolving_forward_refs(self, obj: Any, required: bool = False) -> tuple[Any, ...] | None:
979 args = get_args(obj)
980 if args:
981 if isinstance(obj, GenericAlias):
982 # PEP 585 generic aliases don't convert args to ForwardRefs, unlike `typing.List/Dict` etc.
983 # This was fixed in https://github.com/python/cpython/pull/30900 (Python 3.11).
984 # TODO: this shouldn't be necessary (probably even this `_get_args_resolving_forward_refs()` function)
985 # once we drop support for Python 3.10 *or* if we implement our own `typing._eval_type()` implementation.
986 args = (_typing_extra._make_forward_ref(a) if isinstance(a, str) else a for a in args)
987 args = tuple(self._resolve_forward_ref(a) if isinstance(a, ForwardRef) else a for a in args)
988 elif required: # pragma: no cover
989 raise TypeError(f'Expected {obj} to have generic parameters but it had none')
990 return args
991
992 def _get_first_arg_or_any(self, obj: Any) -> Any:
993 args = self._get_args_resolving_forward_refs(obj)
994 if not args:
995 return Any
996 return args[0]
997
998 def _get_first_two_args_or_any(self, obj: Any) -> tuple[Any, Any]:
999 args = self._get_args_resolving_forward_refs(obj)
1000 if not args:
1001 return (Any, Any)
1002 if len(args) < 2:
1003 origin = get_origin(obj)
1004 raise TypeError(f'Expected two type arguments for {origin}, got 1')
1005 return args[0], args[1]
1006
1007 def _generate_schema_inner(self, obj: Any) -> core_schema.CoreSchema:
1008 if typing_objects.is_self(obj):
1009 obj = self._resolve_self_type(obj)
1010
1011 if typing_objects.is_annotated(get_origin(obj)):
1012 return self._annotated_schema(obj)
1013
1014 if isinstance(obj, dict):
1015 # we assume this is already a valid schema
1016 return obj # type: ignore[return-value]
1017
1018 if isinstance(obj, str):
1019 obj = ForwardRef(obj)
1020
1021 if isinstance(obj, ForwardRef):
1022 return self.generate_schema(self._resolve_forward_ref(obj))
1023
1024 BaseModel = import_cached_base_model()
1025
1026 if lenient_issubclass(obj, BaseModel):
1027 with self.model_type_stack.push(obj):
1028 return self._model_schema(obj)
1029
1030 if isinstance(obj, PydanticRecursiveRef):
1031 return core_schema.definition_reference_schema(schema_ref=obj.type_ref)
1032
1033 return self.match_type(obj)
1034
1035 def match_type(self, obj: Any) -> core_schema.CoreSchema: # noqa: C901
1036 """Main mapping of types to schemas.
1037
1038 The general structure is a series of if statements starting with the simple cases
1039 (non-generic primitive types) and then handling generics and other more complex cases.
1040
1041 Each case either generates a schema directly, calls into a public user-overridable method
1042 (like `GenerateSchema.tuple_variable_schema`) or calls into a private method that handles some
1043 boilerplate before calling into the user-facing method (e.g. `GenerateSchema._tuple_schema`).
1044
1045 The idea is that we'll evolve this into adding more and more user facing methods over time
1046 as they get requested and we figure out what the right API for them is.
1047 """
1048 if obj is str:
1049 return core_schema.str_schema()
1050 elif obj is bytes:
1051 return core_schema.bytes_schema()
1052 elif obj is int:
1053 return core_schema.int_schema()
1054 elif obj is float:
1055 return core_schema.float_schema()
1056 elif obj is bool:
1057 return core_schema.bool_schema()
1058 elif obj is complex:
1059 return core_schema.complex_schema()
1060 elif typing_objects.is_any(obj) or obj is object:
1061 return core_schema.any_schema()
1062 elif obj is datetime.date:
1063 return core_schema.date_schema()
1064 elif obj is datetime.datetime:
1065 return core_schema.datetime_schema()
1066 elif obj is datetime.time:
1067 return core_schema.time_schema()
1068 elif obj is datetime.timedelta:
1069 return core_schema.timedelta_schema()
1070 elif obj is Decimal:
1071 return core_schema.decimal_schema()
1072 elif obj is UUID:
1073 return core_schema.uuid_schema()
1074 elif obj is Url:
1075 return core_schema.url_schema()
1076 elif obj is Fraction:
1077 return self._fraction_schema()
1078 elif obj is MultiHostUrl:
1079 return core_schema.multi_host_url_schema()
1080 elif obj is None or obj is _typing_extra.NoneType:
1081 return core_schema.none_schema()
1082 if obj is MISSING:
1083 return core_schema.missing_sentinel_schema()
1084 elif obj in IP_TYPES:
1085 return self._ip_schema(obj)
1086 elif obj in TUPLE_TYPES:
1087 return self._tuple_schema(obj)
1088 elif obj in LIST_TYPES:
1089 return self._list_schema(Any)
1090 elif obj in SET_TYPES:
1091 return self._set_schema(Any)
1092 elif obj in FROZEN_SET_TYPES:
1093 return self._frozenset_schema(Any)
1094 elif obj in SEQUENCE_TYPES:
1095 return self._sequence_schema(Any)
1096 elif obj in ITERABLE_TYPES:
1097 return self._iterable_schema(obj)
1098 elif obj in DICT_TYPES:
1099 return self._dict_schema(Any, Any)
1100 elif obj in PATH_TYPES:
1101 return self._path_schema(obj, Any)
1102 elif obj in DEQUE_TYPES:
1103 return self._deque_schema(Any)
1104 elif obj in MAPPING_TYPES:
1105 return self._mapping_schema(obj, Any, Any)
1106 elif obj in COUNTER_TYPES:
1107 return self._mapping_schema(obj, Any, int)
1108 elif typing_objects.is_typealiastype(obj):
1109 return self._type_alias_type_schema(obj)
1110 elif obj is type:
1111 return self._type_schema()
1112 elif _typing_extra.is_callable(obj):
1113 return core_schema.callable_schema()
1114 elif typing_objects.is_literal(get_origin(obj)):
1115 return self._literal_schema(obj)
1116 elif is_typeddict(obj):
1117 return self._typed_dict_schema(obj, None)
1118 elif inspect.isclass(obj) and issubclass(obj, Enum):
1119 # NOTE: this must come before the `is_namedtuple()` check as enums values
1120 # can be namedtuples:
1121 return self._enum_schema(obj)
1122 elif _typing_extra.is_namedtuple(obj):
1123 return self._namedtuple_schema(obj, None)
1124 elif typing_objects.is_newtype(obj):
1125 # NewType, can't use isinstance because it fails <3.10
1126 return self.generate_schema(obj.__supertype__)
1127 elif obj in PATTERN_TYPES:
1128 return self._pattern_schema(obj)
1129 elif _typing_extra.is_hashable(obj):
1130 return self._hashable_schema()
1131 elif isinstance(obj, typing.TypeVar):
1132 return self._unsubstituted_typevar_schema(obj)
1133 elif _typing_extra.is_finalvar(obj):
1134 if obj is Final:
1135 return core_schema.any_schema()
1136 return self.generate_schema(
1137 self._get_first_arg_or_any(obj),
1138 )
1139 elif isinstance(obj, VALIDATE_CALL_SUPPORTED_TYPES):
1140 return self._call_schema(obj) # pyright: ignore[reportArgumentType]
1141 elif obj is ZoneInfo:
1142 return self._zoneinfo_schema()
1143
1144 # dataclasses.is_dataclass coerces dc instances to types, but we only handle
1145 # the case of a dc type here
1146 if dataclasses.is_dataclass(obj):
1147 return self._dataclass_schema(obj, None) # pyright: ignore[reportArgumentType]
1148
1149 origin = get_origin(obj)
1150 if origin is not None:
1151 return self._match_generic_type(obj, origin)
1152
1153 if self._arbitrary_types:
1154 return self._arbitrary_type_schema(obj)
1155 return self._unknown_type_schema(obj)
1156
1157 def _match_generic_type(self, obj: Any, origin: Any) -> CoreSchema: # noqa: C901
1158 # Need to handle generic dataclasses before looking for the schema properties because attribute accesses
1159 # on _GenericAlias delegate to the origin type, so lose the information about the concrete parametrization
1160 # As a result, currently, there is no way to cache the schema for generic dataclasses. This may be possible
1161 # to resolve by modifying the value returned by `Generic.__class_getitem__`, but that is a dangerous game.
1162 if dataclasses.is_dataclass(origin):
1163 return self._dataclass_schema(obj, origin) # pyright: ignore[reportArgumentType]
1164 if _typing_extra.is_namedtuple(origin):
1165 return self._namedtuple_schema(obj, origin)
1166
1167 schema = self._generate_schema_from_get_schema_method(origin, obj)
1168 if schema is not None:
1169 return schema
1170
1171 if typing_objects.is_typealiastype(origin):
1172 return self._type_alias_type_schema(obj)
1173 elif is_union_origin(origin):
1174 return self._union_schema(obj)
1175 elif origin in TUPLE_TYPES:
1176 return self._tuple_schema(obj)
1177 elif origin in LIST_TYPES:
1178 return self._list_schema(self._get_first_arg_or_any(obj))
1179 elif origin in SET_TYPES:
1180 return self._set_schema(self._get_first_arg_or_any(obj))
1181 elif origin in FROZEN_SET_TYPES:
1182 return self._frozenset_schema(self._get_first_arg_or_any(obj))
1183 elif origin in DICT_TYPES:
1184 return self._dict_schema(*self._get_first_two_args_or_any(obj))
1185 elif origin in PATH_TYPES:
1186 return self._path_schema(origin, self._get_first_arg_or_any(obj))
1187 elif origin in DEQUE_TYPES:
1188 return self._deque_schema(self._get_first_arg_or_any(obj))
1189 elif origin in MAPPING_TYPES:
1190 return self._mapping_schema(origin, *self._get_first_two_args_or_any(obj))
1191 elif origin in COUNTER_TYPES:
1192 return self._mapping_schema(origin, self._get_first_arg_or_any(obj), int)
1193 elif is_typeddict(origin):
1194 return self._typed_dict_schema(obj, origin)
1195 elif origin in TYPE_TYPES:
1196 return self._subclass_schema(obj)
1197 elif origin in SEQUENCE_TYPES:
1198 return self._sequence_schema(self._get_first_arg_or_any(obj))
1199 elif origin in ITERABLE_TYPES:
1200 return self._iterable_schema(obj)
1201 elif origin in PATTERN_TYPES:
1202 return self._pattern_schema(obj)
1203
1204 if self._arbitrary_types:
1205 return self._arbitrary_type_schema(origin)
1206 return self._unknown_type_schema(obj)
1207
1208 def _generate_td_field_schema(
1209 self,
1210 name: str,
1211 field_info: FieldInfo,
1212 decorators: DecoratorInfos,
1213 *,
1214 required: bool = True,
1215 ) -> core_schema.TypedDictField:
1216 """Prepare a TypedDictField to represent a model or typeddict field."""
1217 schema, metadata = self._common_field_schema(name, field_info, decorators)
1218 return core_schema.typed_dict_field(
1219 schema,
1220 required=False if not field_info.is_required() else required,
1221 serialization_exclude=field_info.exclude,
1222 validation_alias=_convert_to_aliases(field_info.validation_alias),
1223 serialization_alias=field_info.serialization_alias,
1224 serialization_exclude_if=field_info.exclude_if,
1225 metadata=metadata,
1226 )
1227
1228 def _generate_md_field_schema(
1229 self,
1230 name: str,
1231 field_info: FieldInfo,
1232 decorators: DecoratorInfos,
1233 ) -> core_schema.ModelField:
1234 """Prepare a ModelField to represent a model field."""
1235 schema, metadata = self._common_field_schema(name, field_info, decorators)
1236 return core_schema.model_field(
1237 schema,
1238 serialization_exclude=field_info.exclude,
1239 validation_alias=_convert_to_aliases(field_info.validation_alias),
1240 serialization_alias=field_info.serialization_alias,
1241 serialization_exclude_if=field_info.exclude_if,
1242 frozen=field_info.frozen,
1243 metadata=metadata,
1244 )
1245
1246 def _generate_dc_field_schema(
1247 self,
1248 name: str,
1249 field_info: FieldInfo,
1250 decorators: DecoratorInfos,
1251 ) -> core_schema.DataclassField:
1252 """Prepare a DataclassField to represent the parameter/field, of a dataclass."""
1253 schema, metadata = self._common_field_schema(name, field_info, decorators)
1254 return core_schema.dataclass_field(
1255 name,
1256 schema,
1257 init=field_info.init,
1258 init_only=field_info.init_var or None,
1259 kw_only=None if field_info.kw_only else False,
1260 serialization_exclude=field_info.exclude,
1261 validation_alias=_convert_to_aliases(field_info.validation_alias),
1262 serialization_alias=field_info.serialization_alias,
1263 serialization_exclude_if=field_info.exclude_if,
1264 frozen=field_info.frozen,
1265 metadata=metadata,
1266 )
1267
1268 def _common_field_schema( # C901
1269 self, name: str, field_info: FieldInfo, decorators: DecoratorInfos
1270 ) -> tuple[CoreSchema, dict[str, Any]]:
1271 source_type, annotations = field_info.annotation, field_info.metadata
1272
1273 def set_discriminator(schema: CoreSchema) -> CoreSchema:
1274 schema = self._apply_discriminator_to_union(schema, field_info.discriminator)
1275 return schema
1276
1277 # Convert `@field_validator` decorators to `Before/After/Plain/WrapValidator` instances:
1278 validators_from_decorators = [
1279 _mode_to_validator[decorator.info.mode]._from_decorator(decorator)
1280 for decorator in filter_field_decorator_info_by_field(decorators.field_validators.values(), name)
1281 ]
1282
1283 with self.field_name_stack.push(name):
1284 if field_info.discriminator is not None:
1285 schema = self._apply_annotations(
1286 source_type, annotations + validators_from_decorators, transform_inner_schema=set_discriminator
1287 )
1288 else:
1289 schema = self._apply_annotations(
1290 source_type,
1291 annotations + validators_from_decorators,
1292 )
1293
1294 # This V1 compatibility shim should eventually be removed
1295 # push down any `each_item=True` validators
1296 # note that this won't work for any Annotated types that get wrapped by a function validator
1297 # but that's okay because that didn't exist in V1
1298 this_field_validators = filter_field_decorator_info_by_field(decorators.validators.values(), name)
1299 if _validators_require_validate_default(this_field_validators):
1300 field_info.validate_default = True
1301 each_item_validators = [v for v in this_field_validators if v.info.each_item is True]
1302 this_field_validators = [v for v in this_field_validators if v not in each_item_validators]
1303 schema = apply_each_item_validators(schema, each_item_validators)
1304
1305 schema = apply_validators(schema, this_field_validators)
1306
1307 # the default validator needs to go outside of any other validators
1308 # so that it is the topmost validator for the field validator
1309 # which uses it to check if the field has a default value or not
1310 if not field_info.is_required():
1311 schema = wrap_default(field_info, schema)
1312
1313 schema = self._apply_field_serializers(
1314 schema, filter_field_decorator_info_by_field(decorators.field_serializers.values(), name)
1315 )
1316
1317 pydantic_js_updates, pydantic_js_extra = _extract_json_schema_info_from_field_info(field_info)
1318 core_metadata: dict[str, Any] = {}
1319 update_core_metadata(
1320 core_metadata, pydantic_js_updates=pydantic_js_updates, pydantic_js_extra=pydantic_js_extra
1321 )
1322
1323 return schema, core_metadata
1324
1325 def _union_schema(self, union_type: Any) -> core_schema.CoreSchema:
1326 """Generate schema for a Union."""
1327 args = self._get_args_resolving_forward_refs(union_type, required=True)
1328 choices: list[CoreSchema] = []
1329 nullable = False
1330 for arg in args:
1331 if arg is None or arg is _typing_extra.NoneType:
1332 nullable = True
1333 else:
1334 choices.append(self.generate_schema(arg))
1335
1336 if len(choices) == 1:
1337 s = choices[0]
1338 else:
1339 choices_with_tags: list[CoreSchema | tuple[CoreSchema, str]] = []
1340 for choice in choices:
1341 tag = cast(CoreMetadata, choice.get('metadata', {})).get('pydantic_internal_union_tag_key')
1342 if tag is not None:
1343 choices_with_tags.append((choice, tag))
1344 else:
1345 choices_with_tags.append(choice)
1346 s = core_schema.union_schema(choices_with_tags)
1347
1348 if nullable:
1349 s = core_schema.nullable_schema(s)
1350 return s
1351
1352 def _type_alias_type_schema(self, obj: TypeAliasType) -> CoreSchema:
1353 with self.defs.get_schema_or_ref(obj) as (ref, maybe_schema):
1354 if maybe_schema is not None:
1355 return maybe_schema
1356
1357 origin: TypeAliasType = get_origin(obj) or obj
1358 typevars_map = get_standard_typevars_map(obj)
1359
1360 with self._ns_resolver.push(origin):
1361 try:
1362 annotation = _typing_extra.eval_type(origin.__value__, *self._types_namespace)
1363 except NameError as e:
1364 raise PydanticUndefinedAnnotation.from_name_error(e) from e
1365 annotation = replace_types(annotation, typevars_map)
1366 schema = self.generate_schema(annotation)
1367 assert schema['type'] != 'definitions'
1368 schema['ref'] = ref # type: ignore
1369 return self.defs.create_definition_reference_schema(schema)
1370
1371 def _literal_schema(self, literal_type: Any) -> CoreSchema:
1372 """Generate schema for a Literal."""
1373 expected = list(get_literal_values(literal_type, type_check=False, unpack_type_aliases='eager'))
1374 assert expected, f'literal "expected" cannot be empty, obj={literal_type}'
1375 schema = core_schema.literal_schema(expected)
1376
1377 if self._config_wrapper.use_enum_values and any(isinstance(v, Enum) for v in expected):
1378 schema = core_schema.no_info_after_validator_function(
1379 lambda v: v.value if isinstance(v, Enum) else v, schema
1380 )
1381
1382 return schema
1383
1384 def _typed_dict_schema(self, typed_dict_cls: Any, origin: Any) -> core_schema.CoreSchema:
1385 """Generate a core schema for a `TypedDict` class.
1386
1387 To be able to build a `DecoratorInfos` instance for the `TypedDict` class (which will include
1388 validators, serializers, etc.), we need to have access to the original bases of the class
1389 (see https://docs.python.org/3/library/types.html#types.get_original_bases).
1390 However, the `__orig_bases__` attribute was only added in 3.12 (https://github.com/python/cpython/pull/103698).
1391
1392 For this reason, we require Python 3.12 (or using the `typing_extensions` backport).
1393 """
1394 FieldInfo = import_cached_field_info()
1395
1396 with (
1397 self.model_type_stack.push(typed_dict_cls),
1398 self.defs.get_schema_or_ref(typed_dict_cls) as (
1399 typed_dict_ref,
1400 maybe_schema,
1401 ),
1402 ):
1403 if maybe_schema is not None:
1404 return maybe_schema
1405
1406 typevars_map = get_standard_typevars_map(typed_dict_cls)
1407 if origin is not None:
1408 typed_dict_cls = origin
1409
1410 if not _SUPPORTS_TYPEDDICT and type(typed_dict_cls).__module__ == 'typing':
1411 raise PydanticUserError(
1412 'Please use `typing_extensions.TypedDict` instead of `typing.TypedDict` on Python < 3.12.',
1413 code='typed-dict-version',
1414 )
1415
1416 try:
1417 # if a typed dictionary class doesn't have config, we use the parent's config, hence a default of `None`
1418 # see https://github.com/pydantic/pydantic/issues/10917
1419 config: ConfigDict | None = get_attribute_from_bases(typed_dict_cls, '__pydantic_config__')
1420 except AttributeError:
1421 config = None
1422
1423 with self._config_wrapper_stack.push(config):
1424 core_config = self._config_wrapper.core_config(title=typed_dict_cls.__name__)
1425
1426 required_keys: frozenset[str] = typed_dict_cls.__required_keys__
1427
1428 fields: dict[str, core_schema.TypedDictField] = {}
1429
1430 decorators = DecoratorInfos.build(typed_dict_cls, replace_wrapped_methods=False)
1431 decorators.update_from_config(self._config_wrapper)
1432
1433 if self._config_wrapper.use_attribute_docstrings:
1434 field_docstrings = extract_docstrings_from_cls(typed_dict_cls, use_inspect=True)
1435 else:
1436 field_docstrings = None
1437
1438 try:
1439 annotations = _typing_extra.get_cls_type_hints(typed_dict_cls, ns_resolver=self._ns_resolver)
1440 except NameError as e:
1441 raise PydanticUndefinedAnnotation.from_name_error(e) from e
1442
1443 readonly_fields: list[str] = []
1444
1445 for field_name, annotation in annotations.items():
1446 field_info = FieldInfo.from_annotation(annotation, _source=AnnotationSource.TYPED_DICT)
1447 field_info.annotation = replace_types(field_info.annotation, typevars_map)
1448
1449 required = (
1450 field_name in required_keys or 'required' in field_info._qualifiers
1451 ) and 'not_required' not in field_info._qualifiers
1452 if 'read_only' in field_info._qualifiers:
1453 readonly_fields.append(field_name)
1454
1455 if (
1456 field_docstrings is not None
1457 and field_info.description is None
1458 and field_name in field_docstrings
1459 ):
1460 field_info.description = field_docstrings[field_name]
1461 update_field_from_config(self._config_wrapper, field_name, field_info)
1462
1463 fields[field_name] = self._generate_td_field_schema(
1464 field_name, field_info, decorators, required=required
1465 )
1466
1467 if readonly_fields:
1468 fields_repr = ', '.join(repr(f) for f in readonly_fields)
1469 plural = len(readonly_fields) >= 2
1470 warnings.warn(
1471 f'Item{"s" if plural else ""} {fields_repr} on TypedDict class {typed_dict_cls.__name__!r} '
1472 f'{"are" if plural else "is"} using the `ReadOnly` qualifier. Pydantic will not protect items '
1473 'from any mutation on dictionary instances.',
1474 UserWarning,
1475 )
1476
1477 extra_behavior: core_schema.ExtraBehavior = 'ignore'
1478 extras_schema: CoreSchema | None = None # For 'allow', equivalent to `Any` - no validation performed.
1479
1480 # `__closed__` is `None` when not specified (equivalent to `False`):
1481 is_closed = bool(getattr(typed_dict_cls, '__closed__', False))
1482 extra_items = getattr(typed_dict_cls, '__extra_items__', typing_extensions.NoExtraItems)
1483 if is_closed:
1484 extra_behavior = 'forbid'
1485 extras_schema = None
1486 elif not typing_objects.is_noextraitems(extra_items):
1487 extra_behavior = 'allow'
1488 extras_schema = self.generate_schema(replace_types(extra_items, typevars_map))
1489
1490 if (config_extra := self._config_wrapper.extra) in ('allow', 'forbid'):
1491 if is_closed and config_extra == 'allow':
1492 warnings.warn(
1493 f"TypedDict class {typed_dict_cls.__qualname__!r} is closed, but 'extra' configuration "
1494 "is set to `'allow'`. The 'extra' configuration value will be ignored.",
1495 category=TypedDictExtraConfigWarning,
1496 )
1497 elif not typing_objects.is_noextraitems(extra_items) and config_extra == 'forbid':
1498 warnings.warn(
1499 f"TypedDict class {typed_dict_cls.__qualname__!r} allows extra items, but 'extra' configuration "
1500 "is set to `'forbid'`. The 'extra' configuration value will be ignored.",
1501 category=TypedDictExtraConfigWarning,
1502 )
1503 else:
1504 extra_behavior = config_extra
1505
1506 td_schema = core_schema.typed_dict_schema(
1507 fields,
1508 cls=typed_dict_cls,
1509 computed_fields=[
1510 self._computed_field_schema(d, decorators.field_serializers)
1511 for d in decorators.computed_fields.values()
1512 ],
1513 extra_behavior=extra_behavior,
1514 extras_schema=extras_schema,
1515 ref=typed_dict_ref,
1516 config=core_config,
1517 )
1518
1519 schema = self._apply_model_serializers(td_schema, decorators.model_serializers.values())
1520 schema = apply_model_validators(schema, decorators.model_validators.values(), 'all')
1521 return self.defs.create_definition_reference_schema(schema)
1522
1523 def _namedtuple_schema(self, namedtuple_cls: Any, origin: Any) -> core_schema.CoreSchema:
1524 """Generate schema for a NamedTuple."""
1525 with (
1526 self.model_type_stack.push(namedtuple_cls),
1527 self.defs.get_schema_or_ref(namedtuple_cls) as (
1528 namedtuple_ref,
1529 maybe_schema,
1530 ),
1531 ):
1532 if maybe_schema is not None:
1533 return maybe_schema
1534 typevars_map = get_standard_typevars_map(namedtuple_cls)
1535 if origin is not None:
1536 namedtuple_cls = origin
1537
1538 try:
1539 annotations = _typing_extra.get_cls_type_hints(namedtuple_cls, ns_resolver=self._ns_resolver)
1540 except NameError as e:
1541 raise PydanticUndefinedAnnotation.from_name_error(e) from e
1542
1543 # Filter annotations to only include fields that are actually in the NamedTuple
1544 # (as subclassing an existing NamedTuple is not supported yet - see https://github.com/python/typing/issues/427)
1545 # and use `Any` if no annotation exist (i.e. when using `collections.namedtuple()`).
1546 annotations = {field_name: annotations.get(field_name, Any) for field_name in namedtuple_cls._fields}
1547
1548 if typevars_map:
1549 annotations = {
1550 field_name: replace_types(annotation, typevars_map)
1551 for field_name, annotation in annotations.items()
1552 }
1553
1554 arguments_schema = core_schema.arguments_schema(
1555 [
1556 self._generate_parameter_schema(
1557 field_name,
1558 annotation,
1559 source=AnnotationSource.NAMED_TUPLE,
1560 default=namedtuple_cls._field_defaults.get(field_name, Parameter.empty),
1561 )
1562 for field_name, annotation in annotations.items()
1563 ],
1564 metadata={'pydantic_js_prefer_positional_arguments': True},
1565 )
1566 schema = core_schema.call_schema(arguments_schema, namedtuple_cls, ref=namedtuple_ref)
1567 return self.defs.create_definition_reference_schema(schema)
1568
1569 def _generate_parameter_schema(
1570 self,
1571 name: str,
1572 annotation: type[Any],
1573 source: AnnotationSource,
1574 default: Any = Parameter.empty,
1575 mode: Literal['positional_only', 'positional_or_keyword', 'keyword_only'] | None = None,
1576 ) -> core_schema.ArgumentsParameter:
1577 """Generate the definition of a field in a namedtuple or a parameter in a function signature.
1578
1579 This definition is meant to be used for the `'arguments'` core schema, which will be replaced
1580 in V3 by the `'arguments-v3`'.
1581 """
1582 FieldInfo = import_cached_field_info()
1583
1584 if default is Parameter.empty:
1585 field = FieldInfo.from_annotation(annotation, _source=source)
1586 else:
1587 field = FieldInfo.from_annotated_attribute(annotation, default, _source=source)
1588
1589 assert field.annotation is not None, 'field.annotation should not be None when generating a schema'
1590 update_field_from_config(self._config_wrapper, name, field)
1591
1592 with self.field_name_stack.push(name):
1593 schema = self._apply_annotations(
1594 field.annotation,
1595 [field],
1596 # Because we pass `field` as metadata above (required for attributes relevant for
1597 # JSON Scheme generation), we need to ignore the potential warnings about `FieldInfo`
1598 # attributes that will not be used:
1599 check_unsupported_field_info_attributes=False,
1600 )
1601
1602 if not field.is_required():
1603 schema = wrap_default(field, schema)
1604
1605 parameter_schema = core_schema.arguments_parameter(
1606 name,
1607 schema,
1608 mode=mode,
1609 alias=_convert_to_aliases(field.validation_alias),
1610 )
1611
1612 return parameter_schema
1613
1614 def _generate_parameter_v3_schema(
1615 self,
1616 name: str,
1617 annotation: Any,
1618 source: AnnotationSource,
1619 mode: Literal[
1620 'positional_only',
1621 'positional_or_keyword',
1622 'keyword_only',
1623 'var_args',
1624 'var_kwargs_uniform',
1625 'var_kwargs_unpacked_typed_dict',
1626 ],
1627 default: Any = Parameter.empty,
1628 ) -> core_schema.ArgumentsV3Parameter:
1629 """Generate the definition of a parameter in a function signature.
1630
1631 This definition is meant to be used for the `'arguments-v3'` core schema, which will replace
1632 the `'arguments`' schema in V3.
1633 """
1634 FieldInfo = import_cached_field_info()
1635
1636 if default is Parameter.empty:
1637 field = FieldInfo.from_annotation(annotation, _source=source)
1638 else:
1639 field = FieldInfo.from_annotated_attribute(annotation, default, _source=source)
1640 update_field_from_config(self._config_wrapper, name, field)
1641
1642 with self.field_name_stack.push(name):
1643 schema = self._apply_annotations(
1644 field.annotation,
1645 [field],
1646 # Because we pass `field` as metadata above (required for attributes relevant for
1647 # JSON Scheme generation), we need to ignore the potential warnings about `FieldInfo`
1648 # attributes that will not be used:
1649 check_unsupported_field_info_attributes=False,
1650 )
1651
1652 if not field.is_required():
1653 schema = wrap_default(field, schema)
1654
1655 parameter_schema = core_schema.arguments_v3_parameter(
1656 name=name,
1657 schema=schema,
1658 mode=mode,
1659 alias=_convert_to_aliases(field.validation_alias),
1660 )
1661
1662 return parameter_schema
1663
1664 def _tuple_schema(self, tuple_type: Any) -> core_schema.CoreSchema:
1665 """Generate schema for a Tuple, e.g. `tuple[int, str]` or `tuple[int, ...]`."""
1666 # TODO: do we really need to resolve type vars here?
1667 typevars_map = get_standard_typevars_map(tuple_type)
1668 params = self._get_args_resolving_forward_refs(tuple_type)
1669
1670 if typevars_map and params:
1671 params = tuple(replace_types(param, typevars_map) for param in params)
1672
1673 # NOTE: subtle difference: `tuple[()]` gives `params=()`, whereas `typing.Tuple[()]` gives `params=((),)`
1674 # This is only true for <3.11, on Python 3.11+ `typing.Tuple[()]` gives `params=()`
1675 if not params:
1676 if tuple_type in TUPLE_TYPES:
1677 return core_schema.tuple_schema([core_schema.any_schema()], variadic_item_index=0)
1678 else:
1679 # special case for `tuple[()]` which means `tuple[]` - an empty tuple
1680 return core_schema.tuple_schema([])
1681 elif params[-1] is Ellipsis:
1682 if len(params) == 2:
1683 return core_schema.tuple_schema([self.generate_schema(params[0])], variadic_item_index=0)
1684 else:
1685 # TODO: something like https://github.com/pydantic/pydantic/issues/5952
1686 raise ValueError('Variable tuples can only have one type')
1687 elif len(params) == 1 and params[0] == ():
1688 # special case for `tuple[()]` which means `tuple[]` - an empty tuple
1689 # NOTE: This conditional can be removed when we drop support for Python 3.10.
1690 return core_schema.tuple_schema([])
1691 else:
1692 return core_schema.tuple_schema([self.generate_schema(param) for param in params])
1693
1694 def _type_schema(self) -> core_schema.CoreSchema:
1695 return core_schema.custom_error_schema(
1696 core_schema.is_instance_schema(type),
1697 custom_error_type='is_type',
1698 custom_error_message='Input should be a type',
1699 )
1700
1701 def _zoneinfo_schema(self) -> core_schema.CoreSchema:
1702 """Generate schema for a zone_info.ZoneInfo object"""
1703 from ._validators import validate_str_is_valid_iana_tz
1704
1705 metadata = {'pydantic_js_functions': [lambda _1, _2: {'type': 'string', 'format': 'zoneinfo'}]}
1706 return core_schema.no_info_plain_validator_function(
1707 validate_str_is_valid_iana_tz,
1708 serialization=core_schema.to_string_ser_schema(),
1709 metadata=metadata,
1710 )
1711
1712 def _union_is_subclass_schema(self, union_type: Any) -> core_schema.CoreSchema:
1713 """Generate schema for `type[Union[X, ...]]`."""
1714 args = self._get_args_resolving_forward_refs(union_type, required=True)
1715 return core_schema.union_schema([self.generate_schema(type[args]) for args in args])
1716
1717 def _subclass_schema(self, type_: Any) -> core_schema.CoreSchema:
1718 """Generate schema for a type, e.g. `type[int]`."""
1719 type_param = self._get_first_arg_or_any(type_)
1720
1721 # Assume `type[Annotated[<typ>, ...]]` is equivalent to `type[<typ>]`:
1722 type_param = _typing_extra.annotated_type(type_param) or type_param
1723
1724 if typing_objects.is_any(type_param):
1725 return self._type_schema()
1726 elif typing_objects.is_typealiastype(type_param):
1727 return self.generate_schema(type[type_param.__value__])
1728 elif typing_objects.is_typevar(type_param):
1729 if type_param.__bound__:
1730 if is_union_origin(get_origin(type_param.__bound__)):
1731 return self._union_is_subclass_schema(type_param.__bound__)
1732 return core_schema.is_subclass_schema(type_param.__bound__)
1733 elif type_param.__constraints__:
1734 return core_schema.union_schema([self.generate_schema(type[c]) for c in type_param.__constraints__])
1735 else:
1736 return self._type_schema()
1737 elif is_union_origin(get_origin(type_param)):
1738 return self._union_is_subclass_schema(type_param)
1739 else:
1740 if typing_objects.is_self(type_param):
1741 type_param = self._resolve_self_type(type_param)
1742 if _typing_extra.is_generic_alias(type_param):
1743 raise PydanticUserError(
1744 'Subscripting `type[]` with an already parametrized type is not supported. '
1745 f'Instead of using type[{type_param!r}], use type[{_repr.display_as_type(get_origin(type_param))}].',
1746 code=None,
1747 )
1748 if not inspect.isclass(type_param):
1749 # when using type[None], this doesn't type convert to type[NoneType], and None isn't a class
1750 # so we handle it manually here
1751 if type_param is None:
1752 return core_schema.is_subclass_schema(_typing_extra.NoneType)
1753 raise TypeError(f'Expected a class, got {type_param!r}')
1754 return core_schema.is_subclass_schema(type_param)
1755
1756 def _sequence_schema(self, items_type: Any) -> core_schema.CoreSchema:
1757 """Generate schema for a Sequence, e.g. `Sequence[int]`."""
1758 from ._serializers import serialize_sequence_via_list
1759
1760 item_type_schema = self.generate_schema(items_type)
1761 list_schema = core_schema.list_schema(item_type_schema)
1762
1763 json_schema = smart_deepcopy(list_schema)
1764 python_schema = core_schema.is_instance_schema(typing.Sequence, cls_repr='Sequence')
1765 if not typing_objects.is_any(items_type):
1766 from ._validators import sequence_validator
1767
1768 python_schema = core_schema.chain_schema(
1769 [python_schema, core_schema.no_info_wrap_validator_function(sequence_validator, list_schema)],
1770 )
1771
1772 serialization = core_schema.wrap_serializer_function_ser_schema(
1773 serialize_sequence_via_list, schema=item_type_schema, info_arg=True
1774 )
1775 return core_schema.json_or_python_schema(
1776 json_schema=json_schema, python_schema=python_schema, serialization=serialization
1777 )
1778
1779 def _iterable_schema(self, type_: Any) -> core_schema.GeneratorSchema:
1780 """Generate a schema for an `Iterable`."""
1781 item_type = self._get_first_arg_or_any(type_)
1782
1783 return core_schema.generator_schema(self.generate_schema(item_type))
1784
1785 def _pattern_schema(self, pattern_type: Any) -> core_schema.CoreSchema:
1786 from . import _validators
1787
1788 metadata = {'pydantic_js_functions': [lambda _1, _2: {'type': 'string', 'format': 'regex'}]}
1789 ser = core_schema.plain_serializer_function_ser_schema(
1790 attrgetter('pattern'), when_used='json', return_schema=core_schema.str_schema()
1791 )
1792 if pattern_type is typing.Pattern or pattern_type is re.Pattern:
1793 # bare type
1794 return core_schema.no_info_plain_validator_function(
1795 _validators.pattern_either_validator, serialization=ser, metadata=metadata
1796 )
1797
1798 param = self._get_args_resolving_forward_refs(
1799 pattern_type,
1800 required=True,
1801 )[0]
1802 if param is str:
1803 return core_schema.no_info_plain_validator_function(
1804 _validators.pattern_str_validator, serialization=ser, metadata=metadata
1805 )
1806 elif param is bytes:
1807 return core_schema.no_info_plain_validator_function(
1808 _validators.pattern_bytes_validator, serialization=ser, metadata=metadata
1809 )
1810 else:
1811 raise PydanticSchemaGenerationError(f'Unable to generate pydantic-core schema for {pattern_type!r}.')
1812
1813 def _hashable_schema(self) -> core_schema.CoreSchema:
1814 return core_schema.custom_error_schema(
1815 schema=core_schema.json_or_python_schema(
1816 json_schema=core_schema.chain_schema(
1817 [core_schema.any_schema(), core_schema.is_instance_schema(collections.abc.Hashable)]
1818 ),
1819 python_schema=core_schema.is_instance_schema(collections.abc.Hashable),
1820 ),
1821 custom_error_type='is_hashable',
1822 custom_error_message='Input should be hashable',
1823 )
1824
1825 def _dataclass_schema(
1826 self, dataclass: type[StandardDataclass], origin: type[StandardDataclass] | None
1827 ) -> core_schema.CoreSchema:
1828 """Generate schema for a dataclass."""
1829 with (
1830 self.model_type_stack.push(dataclass),
1831 self.defs.get_schema_or_ref(dataclass) as (
1832 dataclass_ref,
1833 maybe_schema,
1834 ),
1835 ):
1836 if maybe_schema is not None:
1837 return maybe_schema
1838
1839 schema = dataclass.__dict__.get('__pydantic_core_schema__')
1840 if schema is not None and not isinstance(schema, MockCoreSchema):
1841 if schema['type'] == 'definitions':
1842 schema = self.defs.unpack_definitions(schema)
1843 ref = get_ref(schema)
1844 if ref:
1845 return self.defs.create_definition_reference_schema(schema)
1846 else:
1847 return schema
1848
1849 typevars_map = get_standard_typevars_map(dataclass)
1850 if origin is not None:
1851 dataclass = origin
1852
1853 # if (plain) dataclass doesn't have config, we use the parent's config, hence a default of `None`
1854 # (Pydantic dataclasses have an empty dict config by default).
1855 # see https://github.com/pydantic/pydantic/issues/10917
1856 config = getattr(dataclass, '__pydantic_config__', None)
1857
1858 from ..dataclasses import is_pydantic_dataclass
1859
1860 with self._ns_resolver.push(dataclass), self._config_wrapper_stack.push(config):
1861 if is_pydantic_dataclass(dataclass):
1862 if dataclass.__pydantic_fields_complete__():
1863 # Copy the field info instances to avoid mutating the `FieldInfo` instances
1864 # of the generic dataclass generic origin (e.g. `apply_typevars_map` below).
1865 # Note that we don't apply `deepcopy` on `__pydantic_fields__` because we
1866 # don't want to copy the `FieldInfo` attributes:
1867 fields = {
1868 f_name: copy(field_info) for f_name, field_info in dataclass.__pydantic_fields__.items()
1869 }
1870 if typevars_map:
1871 for field in fields.values():
1872 field.apply_typevars_map(typevars_map, *self._types_namespace)
1873 else:
1874 try:
1875 fields = rebuild_dataclass_fields(
1876 dataclass,
1877 config_wrapper=self._config_wrapper,
1878 ns_resolver=self._ns_resolver,
1879 typevars_map=typevars_map or {},
1880 )
1881 except NameError as e:
1882 raise PydanticUndefinedAnnotation.from_name_error(e) from e
1883 else:
1884 fields = collect_dataclass_fields(
1885 dataclass,
1886 typevars_map=typevars_map,
1887 config_wrapper=self._config_wrapper,
1888 )
1889
1890 if self._config_wrapper.extra == 'allow':
1891 # disallow combination of init=False on a dataclass field and extra='allow' on a dataclass
1892 for field_name, field in fields.items():
1893 if field.init is False:
1894 raise PydanticUserError(
1895 f'Field {field_name} has `init=False` and dataclass has config setting `extra="allow"`. '
1896 f'This combination is not allowed.',
1897 code='dataclass-init-false-extra-allow',
1898 )
1899
1900 decorators = dataclass.__dict__.get('__pydantic_decorators__')
1901 if decorators is None:
1902 decorators = DecoratorInfos.build(dataclass, replace_wrapped_methods=False)
1903 decorators.update_from_config(self._config_wrapper)
1904 # Move kw_only=False args to the start of the list, as this is how vanilla dataclasses work.
1905 # Note that when kw_only is missing or None, it is treated as equivalent to kw_only=True
1906 args = sorted(
1907 (self._generate_dc_field_schema(k, v, decorators) for k, v in fields.items()),
1908 key=lambda a: a.get('kw_only') is not False,
1909 )
1910 has_post_init = hasattr(dataclass, '__post_init__')
1911 has_slots = hasattr(dataclass, '__slots__')
1912
1913 args_schema = core_schema.dataclass_args_schema(
1914 dataclass.__name__,
1915 args,
1916 computed_fields=[
1917 self._computed_field_schema(d, decorators.field_serializers)
1918 for d in decorators.computed_fields.values()
1919 ],
1920 collect_init_only=has_post_init,
1921 )
1922
1923 inner_schema = apply_validators(args_schema, decorators.root_validators.values())
1924
1925 model_validators = decorators.model_validators.values()
1926 inner_schema = apply_model_validators(inner_schema, model_validators, 'inner')
1927
1928 core_config = self._config_wrapper.core_config(title=dataclass.__name__)
1929
1930 dc_schema = core_schema.dataclass_schema(
1931 dataclass,
1932 inner_schema,
1933 generic_origin=origin,
1934 post_init=has_post_init,
1935 ref=dataclass_ref,
1936 fields=[field.name for field in dataclasses.fields(dataclass)],
1937 slots=has_slots,
1938 config=core_config,
1939 # we don't use a custom __setattr__ for dataclasses, so we must
1940 # pass along the frozen config setting to the pydantic-core schema
1941 frozen=self._config_wrapper_stack.tail.frozen,
1942 )
1943 schema = self._apply_model_serializers(dc_schema, decorators.model_serializers.values())
1944 schema = apply_model_validators(schema, model_validators, 'outer')
1945 return self.defs.create_definition_reference_schema(schema)
1946
1947 def _call_schema(self, function: ValidateCallSupportedTypes) -> core_schema.CallSchema:
1948 """Generate schema for a Callable.
1949
1950 TODO support functional validators once we support them in Config
1951 """
1952 arguments_schema = self._arguments_schema(function)
1953
1954 return_schema: core_schema.CoreSchema | None = None
1955 config_wrapper = self._config_wrapper
1956 if config_wrapper.validate_return:
1957 sig = _typing_extra.signature_no_eval(function)
1958 return_hint = sig.return_annotation
1959 if return_hint is not sig.empty:
1960 globalns, localns = self._types_namespace
1961 type_hints = _typing_extra.get_function_type_hints(
1962 function, globalns=globalns, localns=localns, include_keys={'return'}
1963 )
1964 return_schema = self.generate_schema(type_hints['return'])
1965
1966 return core_schema.call_schema(
1967 arguments_schema,
1968 function,
1969 return_schema=return_schema,
1970 )
1971
1972 def _arguments_schema(
1973 self, function: ValidateCallSupportedTypes, parameters_callback: ParametersCallback | None = None
1974 ) -> core_schema.ArgumentsSchema:
1975 """Generate schema for a Signature."""
1976 mode_lookup: dict[_ParameterKind, Literal['positional_only', 'positional_or_keyword', 'keyword_only']] = {
1977 Parameter.POSITIONAL_ONLY: 'positional_only',
1978 Parameter.POSITIONAL_OR_KEYWORD: 'positional_or_keyword',
1979 Parameter.KEYWORD_ONLY: 'keyword_only',
1980 }
1981
1982 sig = _typing_extra.signature_no_eval(function)
1983 globalns, localns = self._types_namespace
1984 type_hints = _typing_extra.get_function_type_hints(function, globalns=globalns, localns=localns)
1985
1986 arguments_list: list[core_schema.ArgumentsParameter] = []
1987 var_args_schema: core_schema.CoreSchema | None = None
1988 var_kwargs_schema: core_schema.CoreSchema | None = None
1989 var_kwargs_mode: core_schema.VarKwargsMode | None = None
1990
1991 for i, (name, p) in enumerate(sig.parameters.items()):
1992 if p.annotation is sig.empty:
1993 annotation = typing.cast(Any, Any)
1994 else:
1995 annotation = type_hints[name]
1996
1997 if parameters_callback is not None:
1998 result = parameters_callback(i, name, annotation)
1999 if result == 'skip':
2000 continue
2001
2002 parameter_mode = mode_lookup.get(p.kind)
2003 if parameter_mode is not None:
2004 arg_schema = self._generate_parameter_schema(
2005 name, annotation, AnnotationSource.FUNCTION, p.default, parameter_mode
2006 )
2007 arguments_list.append(arg_schema)
2008 elif p.kind == Parameter.VAR_POSITIONAL:
2009 var_args_schema = self.generate_schema(annotation)
2010 else:
2011 assert p.kind == Parameter.VAR_KEYWORD, p.kind
2012
2013 unpack_type = _typing_extra.unpack_type(annotation)
2014 if unpack_type is not None:
2015 origin = get_origin(unpack_type) or unpack_type
2016 if not is_typeddict(origin):
2017 raise PydanticUserError(
2018 f'Expected a `TypedDict` class inside `Unpack[...]`, got {unpack_type!r}',
2019 code='unpack-typed-dict',
2020 )
2021 non_pos_only_param_names = {
2022 name for name, p in sig.parameters.items() if p.kind != Parameter.POSITIONAL_ONLY
2023 }
2024 overlapping_params = non_pos_only_param_names.intersection(origin.__annotations__)
2025 if overlapping_params:
2026 raise PydanticUserError(
2027 f'Typed dictionary {origin.__name__!r} overlaps with parameter'
2028 f'{"s" if len(overlapping_params) >= 2 else ""} '
2029 f'{", ".join(repr(p) for p in sorted(overlapping_params))}',
2030 code='overlapping-unpack-typed-dict',
2031 )
2032
2033 var_kwargs_mode = 'unpacked-typed-dict'
2034 var_kwargs_schema = self._typed_dict_schema(unpack_type, get_origin(unpack_type))
2035 else:
2036 var_kwargs_mode = 'uniform'
2037 var_kwargs_schema = self.generate_schema(annotation)
2038
2039 return core_schema.arguments_schema(
2040 arguments_list,
2041 var_args_schema=var_args_schema,
2042 var_kwargs_mode=var_kwargs_mode,
2043 var_kwargs_schema=var_kwargs_schema,
2044 validate_by_name=self._config_wrapper.validate_by_name,
2045 )
2046
2047 def _arguments_v3_schema(
2048 self, function: ValidateCallSupportedTypes, parameters_callback: ParametersCallback | None = None
2049 ) -> core_schema.ArgumentsV3Schema:
2050 mode_lookup: dict[
2051 _ParameterKind, Literal['positional_only', 'positional_or_keyword', 'var_args', 'keyword_only']
2052 ] = {
2053 Parameter.POSITIONAL_ONLY: 'positional_only',
2054 Parameter.POSITIONAL_OR_KEYWORD: 'positional_or_keyword',
2055 Parameter.VAR_POSITIONAL: 'var_args',
2056 Parameter.KEYWORD_ONLY: 'keyword_only',
2057 }
2058
2059 sig = _typing_extra.signature_no_eval(function)
2060 globalns, localns = self._types_namespace
2061 type_hints = _typing_extra.get_function_type_hints(function, globalns=globalns, localns=localns)
2062
2063 parameters_list: list[core_schema.ArgumentsV3Parameter] = []
2064
2065 for i, (name, p) in enumerate(sig.parameters.items()):
2066 if parameters_callback is not None:
2067 result = parameters_callback(i, name, p.annotation)
2068 if result == 'skip':
2069 continue
2070
2071 if p.annotation is Parameter.empty:
2072 annotation = typing.cast(Any, Any)
2073 else:
2074 annotation = type_hints[name]
2075
2076 parameter_mode = mode_lookup.get(p.kind)
2077 if parameter_mode is None:
2078 assert p.kind == Parameter.VAR_KEYWORD, p.kind
2079
2080 unpack_type = _typing_extra.unpack_type(annotation)
2081 if unpack_type is not None:
2082 origin = get_origin(unpack_type) or unpack_type
2083 if not is_typeddict(origin):
2084 raise PydanticUserError(
2085 f'Expected a `TypedDict` class inside `Unpack[...]`, got {unpack_type!r}',
2086 code='unpack-typed-dict',
2087 )
2088 non_pos_only_param_names = {
2089 name for name, p in sig.parameters.items() if p.kind != Parameter.POSITIONAL_ONLY
2090 }
2091 overlapping_params = non_pos_only_param_names.intersection(origin.__annotations__)
2092 if overlapping_params:
2093 raise PydanticUserError(
2094 f'Typed dictionary {origin.__name__!r} overlaps with parameter'
2095 f'{"s" if len(overlapping_params) >= 2 else ""} '
2096 f'{", ".join(repr(p) for p in sorted(overlapping_params))}',
2097 code='overlapping-unpack-typed-dict',
2098 )
2099 parameter_mode = 'var_kwargs_unpacked_typed_dict'
2100 annotation = unpack_type
2101 else:
2102 parameter_mode = 'var_kwargs_uniform'
2103
2104 parameters_list.append(
2105 self._generate_parameter_v3_schema(
2106 name, annotation, AnnotationSource.FUNCTION, parameter_mode, default=p.default
2107 )
2108 )
2109
2110 return core_schema.arguments_v3_schema(
2111 parameters_list,
2112 validate_by_name=self._config_wrapper.validate_by_name,
2113 )
2114
2115 def _unsubstituted_typevar_schema(self, typevar: typing.TypeVar) -> core_schema.CoreSchema:
2116 try:
2117 has_default = typevar.has_default() # pyright: ignore[reportAttributeAccessIssue]
2118 except AttributeError:
2119 # Happens if using `typing.TypeVar` (and not `typing_extensions`) on Python < 3.13
2120 pass
2121 else:
2122 if has_default:
2123 return self.generate_schema(typevar.__default__) # pyright: ignore[reportAttributeAccessIssue]
2124
2125 if constraints := typevar.__constraints__:
2126 return self._union_schema(typing.Union[constraints])
2127
2128 if bound := typevar.__bound__:
2129 schema = self.generate_schema(bound)
2130 schema['serialization'] = core_schema.simple_ser_schema('any')
2131 return schema
2132
2133 return core_schema.any_schema()
2134
2135 def _computed_field_schema(
2136 self,
2137 d: Decorator[ComputedFieldInfo],
2138 field_serializers: dict[str, Decorator[FieldSerializerDecoratorInfo]],
2139 ) -> core_schema.ComputedField:
2140 if d.info.return_type is not PydanticUndefined:
2141 return_type = d.info.return_type
2142 else:
2143 try:
2144 # Do not pass in globals as the function could be defined in a different module.
2145 # Instead, let `get_callable_return_type` infer the globals to use, but still pass
2146 # in locals that may contain a parent/rebuild namespace:
2147 return_type = _decorators.get_callable_return_type(d.func, localns=self._types_namespace.locals)
2148 except NameError as e:
2149 raise PydanticUndefinedAnnotation.from_name_error(e) from e
2150 if return_type is PydanticUndefined:
2151 raise PydanticUserError(
2152 'Computed field is missing return type annotation or specifying `return_type`'
2153 ' to the `@computed_field` decorator (e.g. `@computed_field(return_type=int | str)`)',
2154 code='model-field-missing-annotation',
2155 )
2156
2157 return_type = replace_types(return_type, self._typevars_map)
2158 # Create a new ComputedFieldInfo so that different type parametrizations of the same
2159 # generic model's computed field can have different return types.
2160 d.info = dataclasses.replace(d.info, return_type=return_type)
2161 return_type_schema = self.generate_schema(return_type)
2162 # Apply serializers to computed field if there exist
2163 return_type_schema = self._apply_field_serializers(
2164 return_type_schema,
2165 filter_field_decorator_info_by_field(field_serializers.values(), d.cls_var_name),
2166 )
2167
2168 pydantic_js_updates, pydantic_js_extra = _extract_json_schema_info_from_field_info(d.info)
2169 core_metadata: dict[str, Any] = {}
2170 update_core_metadata(
2171 core_metadata,
2172 pydantic_js_updates={'readOnly': True, **(pydantic_js_updates if pydantic_js_updates else {})},
2173 pydantic_js_extra=pydantic_js_extra,
2174 )
2175 exclude_if = d.info.exclude_if
2176 # TODO: Should we support exclude_if from annotations?
2177 return core_schema.computed_field(
2178 d.cls_var_name,
2179 return_schema=return_type_schema,
2180 alias=d.info.alias,
2181 serialization_exclude_if=exclude_if,
2182 metadata=core_metadata,
2183 )
2184
2185 def _annotated_schema(self, annotated_type: Any) -> core_schema.CoreSchema:
2186 """Generate schema for an Annotated type, e.g. `Annotated[int, Field(...)]` or `Annotated[int, Gt(0)]`."""
2187 FieldInfo = import_cached_field_info()
2188 source_type, *annotations = self._get_args_resolving_forward_refs(
2189 annotated_type,
2190 required=True,
2191 )
2192 schema = self._apply_annotations(source_type, annotations)
2193 # put the default validator last so that TypeAdapter.get_default_value() works
2194 # even if there are function validators involved
2195 for annotation in annotations:
2196 if isinstance(annotation, FieldInfo):
2197 schema = wrap_default(annotation, schema)
2198 return schema
2199
2200 def _apply_annotations(
2201 self,
2202 source_type: Any,
2203 annotations: list[Any],
2204 transform_inner_schema: Callable[[CoreSchema], CoreSchema] = lambda x: x,
2205 check_unsupported_field_info_attributes: bool = True,
2206 ) -> CoreSchema:
2207 """Apply arguments from `Annotated` or from `FieldInfo` to a schema.
2208
2209 This gets called by `GenerateSchema._annotated_schema` but differs from it in that it does
2210 not expect `source_type` to be an `Annotated` object, it expects it to be the first argument of that
2211 (in other words, `GenerateSchema._annotated_schema` just unpacks `Annotated`, this process it).
2212 """
2213 annotations = list(_known_annotated_metadata.expand_grouped_metadata(annotations))
2214
2215 pydantic_js_annotation_functions: list[GetJsonSchemaFunction] = []
2216
2217 def inner_handler(obj: Any) -> CoreSchema:
2218 schema = self._generate_schema_from_get_schema_method(obj, source_type)
2219
2220 if schema is None:
2221 schema = self._generate_schema_inner(obj)
2222
2223 metadata_js_function = _extract_get_pydantic_json_schema(obj)
2224 if metadata_js_function is not None:
2225 metadata_schema = resolve_original_schema(schema, self.defs)
2226 if metadata_schema is not None:
2227 self._add_js_function(metadata_schema, metadata_js_function)
2228 return transform_inner_schema(schema)
2229
2230 get_inner_schema = CallbackGetCoreSchemaHandler(inner_handler, self)
2231
2232 for annotation in annotations:
2233 if annotation is None:
2234 continue
2235 get_inner_schema = self._get_wrapped_inner_schema(
2236 get_inner_schema,
2237 annotation,
2238 pydantic_js_annotation_functions,
2239 check_unsupported_field_info_attributes=check_unsupported_field_info_attributes,
2240 )
2241
2242 schema = get_inner_schema(source_type)
2243 if pydantic_js_annotation_functions:
2244 core_metadata = schema.setdefault('metadata', {})
2245 update_core_metadata(core_metadata, pydantic_js_annotation_functions=pydantic_js_annotation_functions)
2246 return _add_custom_serialization_from_json_encoders(self._config_wrapper.json_encoders, source_type, schema)
2247
2248 def _apply_single_annotation(
2249 self,
2250 schema: core_schema.CoreSchema,
2251 metadata: Any,
2252 check_unsupported_field_info_attributes: bool = True,
2253 ) -> core_schema.CoreSchema:
2254 FieldInfo = import_cached_field_info()
2255
2256 if isinstance(metadata, FieldInfo):
2257 if (
2258 check_unsupported_field_info_attributes
2259 # HACK: we don't want to emit the warning for `FieldInfo` subclasses, because FastAPI does weird manipulations
2260 # with its subclasses and their annotations:
2261 and type(metadata) is FieldInfo
2262 ):
2263 for attr, value in (unsupported_attributes := self._get_unsupported_field_info_attributes(metadata)):
2264 warnings.warn(
2265 f'The {attr!r} attribute with value {value!r} was provided to the `Field()` function, '
2266 f'which has no effect in the context it was used. {attr!r} is field-specific metadata, '
2267 'and can only be attached to a model field using `Annotated` metadata or by assignment. '
2268 'This may have happened because an `Annotated` type alias using the `type` statement was '
2269 'used, or if the `Field()` function was attached to a single member of a union type.',
2270 category=UnsupportedFieldAttributeWarning,
2271 )
2272
2273 if (
2274 metadata.default_factory_takes_validated_data
2275 and self.model_type_stack.get() is None
2276 and 'defaut_factory' not in unsupported_attributes
2277 ):
2278 warnings.warn(
2279 "A 'default_factory' taking validated data as an argument was provided to the `Field()` function, "
2280 'but no validated data is available in the context it was used.',
2281 category=UnsupportedFieldAttributeWarning,
2282 )
2283
2284 for field_metadata in metadata.metadata:
2285 schema = self._apply_single_annotation(schema, field_metadata)
2286
2287 if metadata.discriminator is not None:
2288 schema = self._apply_discriminator_to_union(schema, metadata.discriminator)
2289 return schema
2290
2291 if schema['type'] == 'nullable':
2292 # for nullable schemas, metadata is automatically applied to the inner schema
2293 inner = schema.get('schema', core_schema.any_schema())
2294 inner = self._apply_single_annotation(inner, metadata)
2295 if inner:
2296 schema['schema'] = inner
2297 return schema
2298
2299 if schema['type'] == 'union' and any(
2300 choice['type'] == 'missing-sentinel' for choice in core_schema.iter_union_choices(schema)
2301 ):
2302 # Same behavior as for nullable schemas. This is a bit gross, but we have to support the same pattern
2303 filtered_choices = [
2304 choice
2305 for choice in schema['choices']
2306 if (choice[0] if isinstance(choice, tuple) else choice)['type'] != 'missing-sentinel'
2307 ]
2308 if len(filtered_choices) >= 2:
2309 # e.g. `Annotated[int | str | MISSING, Constraint(...)]`. We apply `Constraint(...)` to `int | str`,
2310 # and create a new union semantically equivalent to `Annotated[int | str, Constraint(...)] | MISSING`:
2311 filtered_union = core_schema.union_schema(filtered_choices)
2312 filtered_union = self._apply_single_annotation(filtered_union, metadata)
2313 new_union = schema.copy()
2314 new_union['choices'] = [
2315 filtered_union,
2316 next(
2317 choice
2318 for choice in schema['choices']
2319 if (choice[0] if isinstance(choice, tuple) else choice)['type'] == 'missing-sentinel'
2320 ),
2321 ]
2322 return new_union
2323 elif len(filtered_choices) == 1:
2324 # e.g. `Annotated[int | MISSING, Constraint(...)]`. We apply `Constraint(...)` to `int`, and reconstruct
2325 # a new union preserving the order.
2326 inner = filtered_choices[0][0] if isinstance(filtered_choices[0], tuple) else filtered_choices[0]
2327 inner = self._apply_single_annotation(inner, metadata)
2328
2329 # Create a new union schema, preserving the order of the union:
2330 new_union = schema.copy()
2331 new_union['choices'] = [
2332 (inner, choice[1])
2333 if isinstance(choice, tuple) and choice[0]['type'] != 'missing-sentinel'
2334 else inner
2335 if not isinstance(choice, tuple) and choice['type'] != 'missing-sentinel'
2336 else choice
2337 for choice in schema['choices']
2338 ]
2339 return new_union
2340
2341 original_schema = schema
2342 ref = schema.get('ref')
2343 if ref is not None:
2344 schema = schema.copy()
2345 new_ref = ref + f'_{repr(metadata)}'
2346 if (existing := self.defs.get_schema_from_ref(new_ref)) is not None:
2347 return existing
2348 schema['ref'] = new_ref # pyright: ignore[reportGeneralTypeIssues]
2349 elif schema['type'] == 'definition-ref':
2350 ref = schema['schema_ref']
2351 if (referenced_schema := self.defs.get_schema_from_ref(ref)) is not None:
2352 schema = referenced_schema.copy()
2353 new_ref = ref + f'_{repr(metadata)}'
2354 if (existing := self.defs.get_schema_from_ref(new_ref)) is not None:
2355 return existing
2356 schema['ref'] = new_ref # pyright: ignore[reportGeneralTypeIssues]
2357
2358 maybe_updated_schema = _known_annotated_metadata.apply_known_metadata(metadata, schema)
2359
2360 if maybe_updated_schema is not None:
2361 return maybe_updated_schema
2362 return original_schema
2363
2364 def _apply_single_annotation_json_schema(
2365 self, schema: core_schema.CoreSchema, metadata: Any
2366 ) -> core_schema.CoreSchema:
2367 FieldInfo = import_cached_field_info()
2368
2369 if isinstance(metadata, FieldInfo):
2370 for field_metadata in metadata.metadata:
2371 schema = self._apply_single_annotation_json_schema(schema, field_metadata)
2372
2373 pydantic_js_updates, pydantic_js_extra = _extract_json_schema_info_from_field_info(metadata)
2374 core_metadata = schema.setdefault('metadata', {})
2375 update_core_metadata(
2376 core_metadata, pydantic_js_updates=pydantic_js_updates, pydantic_js_extra=pydantic_js_extra
2377 )
2378 return schema
2379
2380 def _get_unsupported_field_info_attributes(self, field_info: FieldInfo) -> list[tuple[str, Any]]:
2381 """Get the list of unsupported `FieldInfo` attributes when not directly used in `Annotated` for field annotations."""
2382 unused_metadata: list[tuple[str, Any]] = []
2383 for unused_metadata_name, unset_value in UNSUPPORTED_STANDALONE_FIELDINFO_ATTRIBUTES:
2384 if (
2385 (unused_metadata_value := getattr(field_info, unused_metadata_name)) is not unset_value
2386 # `default` and `default_factory` can still be used with a type adapter, so only include them
2387 # if used with a model-like class:
2388 and (
2389 unused_metadata_name not in ('default', 'default_factory')
2390 or self.model_type_stack.get() is not None
2391 )
2392 # Setting `alias` will set `validation/serialization_alias` as well, so we want to avoid duplicate warnings:
2393 and (
2394 unused_metadata_name not in ('validation_alias', 'serialization_alias')
2395 or 'alias' not in field_info._attributes_set
2396 )
2397 ):
2398 unused_metadata.append((unused_metadata_name, unused_metadata_value))
2399
2400 return unused_metadata
2401
2402 def _get_wrapped_inner_schema(
2403 self,
2404 get_inner_schema: GetCoreSchemaHandler,
2405 annotation: Any,
2406 pydantic_js_annotation_functions: list[GetJsonSchemaFunction],
2407 check_unsupported_field_info_attributes: bool = False,
2408 ) -> CallbackGetCoreSchemaHandler:
2409 annotation_get_schema: GetCoreSchemaFunction | None = getattr(annotation, '__get_pydantic_core_schema__', None)
2410
2411 def new_handler(source: Any) -> core_schema.CoreSchema:
2412 if annotation_get_schema is not None:
2413 schema = annotation_get_schema(source, get_inner_schema)
2414 else:
2415 schema = get_inner_schema(source)
2416 schema = self._apply_single_annotation(
2417 schema,
2418 annotation,
2419 check_unsupported_field_info_attributes=check_unsupported_field_info_attributes,
2420 )
2421 schema = self._apply_single_annotation_json_schema(schema, annotation)
2422
2423 metadata_js_function = _extract_get_pydantic_json_schema(annotation)
2424 if metadata_js_function is not None:
2425 pydantic_js_annotation_functions.append(metadata_js_function)
2426 return schema
2427
2428 return CallbackGetCoreSchemaHandler(new_handler, self)
2429
2430 def _apply_field_serializers(
2431 self,
2432 schema: core_schema.CoreSchema,
2433 serializers: list[Decorator[FieldSerializerDecoratorInfo]],
2434 ) -> core_schema.CoreSchema:
2435 """Apply field serializers to a schema."""
2436 if serializers:
2437 schema = copy(schema)
2438 if schema['type'] == 'definitions':
2439 inner_schema = schema['schema']
2440 schema['schema'] = self._apply_field_serializers(inner_schema, serializers)
2441 return schema
2442 elif 'ref' in schema:
2443 schema = self.defs.create_definition_reference_schema(schema)
2444
2445 # use the last serializer to make it easy to override a serializer set on a parent model
2446 serializer = serializers[-1]
2447 is_field_serializer, info_arg = inspect_field_serializer(serializer.func, serializer.info.mode)
2448
2449 if serializer.info.return_type is not PydanticUndefined:
2450 return_type = serializer.info.return_type
2451 else:
2452 try:
2453 # Do not pass in globals as the function could be defined in a different module.
2454 # Instead, let `get_callable_return_type` infer the globals to use, but still pass
2455 # in locals that may contain a parent/rebuild namespace:
2456 return_type = _decorators.get_callable_return_type(
2457 serializer.func, localns=self._types_namespace.locals
2458 )
2459 except NameError as e:
2460 raise PydanticUndefinedAnnotation.from_name_error(e) from e
2461
2462 if return_type is PydanticUndefined:
2463 return_schema = None
2464 else:
2465 return_schema = self.generate_schema(return_type)
2466
2467 if serializer.info.mode == 'wrap':
2468 schema['serialization'] = core_schema.wrap_serializer_function_ser_schema(
2469 serializer.func,
2470 is_field_serializer=is_field_serializer,
2471 info_arg=info_arg,
2472 return_schema=return_schema,
2473 when_used=serializer.info.when_used,
2474 )
2475 else:
2476 assert serializer.info.mode == 'plain'
2477 schema['serialization'] = core_schema.plain_serializer_function_ser_schema(
2478 serializer.func,
2479 is_field_serializer=is_field_serializer,
2480 info_arg=info_arg,
2481 return_schema=return_schema,
2482 when_used=serializer.info.when_used,
2483 )
2484 return schema
2485
2486 def _apply_model_serializers(
2487 self, schema: core_schema.CoreSchema, serializers: Iterable[Decorator[ModelSerializerDecoratorInfo]]
2488 ) -> core_schema.CoreSchema:
2489 """Apply model serializers to a schema."""
2490 ref: str | None = schema.pop('ref', None) # type: ignore
2491 if serializers:
2492 serializer = list(serializers)[-1]
2493 info_arg = inspect_model_serializer(serializer.func, serializer.info.mode)
2494
2495 if serializer.info.return_type is not PydanticUndefined:
2496 return_type = serializer.info.return_type
2497 else:
2498 try:
2499 # Do not pass in globals as the function could be defined in a different module.
2500 # Instead, let `get_callable_return_type` infer the globals to use, but still pass
2501 # in locals that may contain a parent/rebuild namespace:
2502 return_type = _decorators.get_callable_return_type(
2503 serializer.func, localns=self._types_namespace.locals
2504 )
2505 except NameError as e:
2506 raise PydanticUndefinedAnnotation.from_name_error(e) from e
2507
2508 if return_type is PydanticUndefined:
2509 return_schema = None
2510 else:
2511 return_schema = self.generate_schema(return_type)
2512
2513 if serializer.info.mode == 'wrap':
2514 ser_schema: core_schema.SerSchema = core_schema.wrap_serializer_function_ser_schema(
2515 serializer.func,
2516 info_arg=info_arg,
2517 return_schema=return_schema,
2518 when_used=serializer.info.when_used,
2519 )
2520 else:
2521 # plain
2522 ser_schema = core_schema.plain_serializer_function_ser_schema(
2523 serializer.func,
2524 info_arg=info_arg,
2525 return_schema=return_schema,
2526 when_used=serializer.info.when_used,
2527 )
2528 schema['serialization'] = ser_schema
2529 if ref:
2530 schema['ref'] = ref # type: ignore
2531 return schema
2532
2533
2534_VALIDATOR_F_MATCH: Mapping[
2535 tuple[FieldValidatorModes, Literal['no-info', 'with-info']],
2536 Callable[[Callable[..., Any], core_schema.CoreSchema], core_schema.CoreSchema],
2537] = {
2538 ('before', 'no-info'): lambda f, schema: core_schema.no_info_before_validator_function(f, schema),
2539 ('after', 'no-info'): lambda f, schema: core_schema.no_info_after_validator_function(f, schema),
2540 ('plain', 'no-info'): lambda f, _: core_schema.no_info_plain_validator_function(f),
2541 ('wrap', 'no-info'): lambda f, schema: core_schema.no_info_wrap_validator_function(f, schema),
2542 ('before', 'with-info'): lambda f, schema: core_schema.with_info_before_validator_function(f, schema),
2543 ('after', 'with-info'): lambda f, schema: core_schema.with_info_after_validator_function(f, schema),
2544 ('plain', 'with-info'): lambda f, _: core_schema.with_info_plain_validator_function(f),
2545 ('wrap', 'with-info'): lambda f, schema: core_schema.with_info_wrap_validator_function(f, schema),
2546}
2547
2548
2549# TODO V3: this function is only used for deprecated decorators. It should
2550# be removed once we drop support for those.
2551def apply_validators(
2552 schema: core_schema.CoreSchema,
2553 validators: Iterable[Decorator[RootValidatorDecoratorInfo]]
2554 | Iterable[Decorator[ValidatorDecoratorInfo]]
2555 | Iterable[Decorator[FieldValidatorDecoratorInfo]],
2556) -> core_schema.CoreSchema:
2557 """Apply validators to a schema.
2558
2559 Args:
2560 schema: The schema to apply validators on.
2561 validators: An iterable of validators.
2562 field_name: The name of the field if validators are being applied to a model field.
2563
2564 Returns:
2565 The updated schema.
2566 """
2567 for validator in validators:
2568 # Actually, type could be 'field' or 'model', but this is only used for deprecated
2569 # decorators, so let's not worry about it.
2570 info_arg = inspect_validator(validator.func, mode=validator.info.mode, type='field')
2571 val_type = 'with-info' if info_arg else 'no-info'
2572
2573 schema = _VALIDATOR_F_MATCH[(validator.info.mode, val_type)](validator.func, schema)
2574 return schema
2575
2576
2577def _validators_require_validate_default(validators: Iterable[Decorator[ValidatorDecoratorInfo]]) -> bool:
2578 """In v1, if any of the validators for a field had `always=True`, the default value would be validated.
2579
2580 This serves as an auxiliary function for re-implementing that logic, by looping over a provided
2581 collection of (v1-style) ValidatorDecoratorInfo's and checking if any of them have `always=True`.
2582
2583 We should be able to drop this function and the associated logic calling it once we drop support
2584 for v1-style validator decorators. (Or we can extend it and keep it if we add something equivalent
2585 to the v1-validator `always` kwarg to `field_validator`.)
2586 """
2587 for validator in validators:
2588 if validator.info.always:
2589 return True
2590 return False
2591
2592
2593def _convert_to_aliases(
2594 alias: str | AliasChoices | AliasPath | None,
2595) -> str | list[str | int] | list[list[str | int]] | None:
2596 if isinstance(alias, (AliasChoices, AliasPath)):
2597 return alias.convert_to_aliases()
2598 else:
2599 return alias
2600
2601
2602def apply_model_validators(
2603 schema: core_schema.CoreSchema,
2604 validators: Iterable[Decorator[ModelValidatorDecoratorInfo]],
2605 mode: Literal['inner', 'outer', 'all'],
2606) -> core_schema.CoreSchema:
2607 """Apply model validators to a schema.
2608
2609 If mode == 'inner', only "before" validators are applied
2610 If mode == 'outer', validators other than "before" are applied
2611 If mode == 'all', all validators are applied
2612
2613 Args:
2614 schema: The schema to apply validators on.
2615 validators: An iterable of validators.
2616 mode: The validator mode.
2617
2618 Returns:
2619 The updated schema.
2620 """
2621 ref: str | None = schema.pop('ref', None) # type: ignore
2622 for validator in validators:
2623 if mode == 'inner' and validator.info.mode != 'before':
2624 continue
2625 if mode == 'outer' and validator.info.mode == 'before':
2626 continue
2627 info_arg = inspect_validator(validator.func, mode=validator.info.mode, type='model')
2628 if validator.info.mode == 'wrap':
2629 if info_arg:
2630 schema = core_schema.with_info_wrap_validator_function(function=validator.func, schema=schema)
2631 else:
2632 schema = core_schema.no_info_wrap_validator_function(function=validator.func, schema=schema)
2633 elif validator.info.mode == 'before':
2634 if info_arg:
2635 schema = core_schema.with_info_before_validator_function(function=validator.func, schema=schema)
2636 else:
2637 schema = core_schema.no_info_before_validator_function(function=validator.func, schema=schema)
2638 else:
2639 assert validator.info.mode == 'after'
2640 if info_arg:
2641 schema = core_schema.with_info_after_validator_function(function=validator.func, schema=schema)
2642 else:
2643 schema = core_schema.no_info_after_validator_function(function=validator.func, schema=schema)
2644 if ref:
2645 schema['ref'] = ref # type: ignore
2646 return schema
2647
2648
2649def wrap_default(field_info: FieldInfo, schema: core_schema.CoreSchema) -> core_schema.CoreSchema:
2650 """Wrap schema with default schema if default value or `default_factory` are available.
2651
2652 Args:
2653 field_info: The field info object.
2654 schema: The schema to apply default on.
2655
2656 Returns:
2657 Updated schema by default value or `default_factory`.
2658 """
2659 if field_info.default_factory:
2660 return core_schema.with_default_schema(
2661 schema,
2662 default_factory=field_info.default_factory,
2663 default_factory_takes_data=takes_validated_data_argument(field_info.default_factory),
2664 validate_default=field_info.validate_default,
2665 )
2666 elif field_info.default is not PydanticUndefined:
2667 return core_schema.with_default_schema(
2668 schema, default=field_info.default, validate_default=field_info.validate_default
2669 )
2670 else:
2671 return schema
2672
2673
2674def _extract_get_pydantic_json_schema(tp: Any) -> GetJsonSchemaFunction | None:
2675 """Extract `__get_pydantic_json_schema__` from a type, handling the deprecated `__modify_schema__`."""
2676 js_modify_function = getattr(tp, '__get_pydantic_json_schema__', None)
2677
2678 if hasattr(tp, '__modify_schema__'):
2679 BaseModel = import_cached_base_model()
2680
2681 has_custom_v2_modify_js_func = (
2682 js_modify_function is not None
2683 and BaseModel.__get_pydantic_json_schema__.__func__ # type: ignore
2684 not in (js_modify_function, getattr(js_modify_function, '__func__', None))
2685 )
2686
2687 if not has_custom_v2_modify_js_func:
2688 cls_name = getattr(tp, '__name__', None)
2689 raise PydanticUserError(
2690 f'The `__modify_schema__` method is not supported in Pydantic v2. '
2691 f'Use `__get_pydantic_json_schema__` instead{f" in class `{cls_name}`" if cls_name else ""}.',
2692 code='custom-json-schema',
2693 )
2694
2695 if (origin := get_origin(tp)) is not None:
2696 # Generic aliases proxy attribute access to the origin, *except* dunder attributes,
2697 # such as `__get_pydantic_json_schema__`, hence the explicit check.
2698 return _extract_get_pydantic_json_schema(origin)
2699
2700 if js_modify_function is None:
2701 return None
2702
2703 return js_modify_function
2704
2705
2706def resolve_original_schema(schema: CoreSchema, definitions: _Definitions) -> CoreSchema | None:
2707 if schema['type'] == 'definition-ref':
2708 return definitions.get_schema_from_ref(schema['schema_ref'])
2709 elif schema['type'] == 'definitions':
2710 return schema['schema']
2711 else:
2712 return schema
2713
2714
2715def _inlining_behavior(
2716 def_ref: core_schema.DefinitionReferenceSchema,
2717) -> Literal['inline', 'keep', 'preserve_metadata']:
2718 """Determine the inlining behavior of the `'definition-ref'` schema.
2719
2720 - If no `'serialization'` schema and no metadata is attached, the schema can safely be inlined.
2721 - If it has metadata but only related to the deferred discriminator application, it can be inlined
2722 provided that such metadata is kept.
2723 - Otherwise, the schema should not be inlined. Doing so would remove the `'serialization'` schema or metadata.
2724 """
2725 if 'serialization' in def_ref:
2726 return 'keep'
2727 metadata = def_ref.get('metadata')
2728 if not metadata:
2729 return 'inline'
2730 if len(metadata) == 1 and 'pydantic_internal_union_discriminator' in metadata:
2731 return 'preserve_metadata'
2732 return 'keep'
2733
2734
2735class _Definitions:
2736 """Keeps track of references and definitions."""
2737
2738 _recursively_seen: set[str]
2739 """A set of recursively seen references.
2740
2741 When a referenceable type is encountered, the `get_schema_or_ref` context manager is
2742 entered to compute the reference. If the type references itself by some way (e.g. for
2743 a dataclass a Pydantic model, the class can be referenced as a field annotation),
2744 entering the context manager again will yield a `'definition-ref'` schema that should
2745 short-circuit the normal generation process, as the reference was already in this set.
2746 """
2747
2748 _definitions: dict[str, core_schema.CoreSchema]
2749 """A mapping of references to their corresponding schema.
2750
2751 When a schema for a referenceable type is generated, it is stored in this mapping. If the
2752 same type is encountered again, the reference is yielded by the `get_schema_or_ref` context
2753 manager.
2754 """
2755
2756 def __init__(self) -> None:
2757 self._recursively_seen = set()
2758 self._definitions = {}
2759
2760 @contextmanager
2761 def get_schema_or_ref(self, tp: Any, /) -> Generator[tuple[str, core_schema.DefinitionReferenceSchema | None]]:
2762 """Get a definition for `tp` if one exists.
2763
2764 If a definition exists, a tuple of `(ref_string, CoreSchema)` is returned.
2765 If no definition exists yet, a tuple of `(ref_string, None)` is returned.
2766
2767 Note that the returned `CoreSchema` will always be a `DefinitionReferenceSchema`,
2768 not the actual definition itself.
2769
2770 This should be called for any type that can be identified by reference.
2771 This includes any recursive types.
2772
2773 At present the following types can be named/recursive:
2774
2775 - Pydantic model
2776 - Pydantic and stdlib dataclasses
2777 - Typed dictionaries
2778 - Named tuples
2779 - `TypeAliasType` instances
2780 - Enums
2781 """
2782 ref = get_type_ref(tp)
2783 # return the reference if we're either (1) in a cycle or (2) it the reference was already encountered:
2784 if ref in self._recursively_seen or ref in self._definitions:
2785 yield (ref, core_schema.definition_reference_schema(ref))
2786 else:
2787 self._recursively_seen.add(ref)
2788 try:
2789 yield (ref, None)
2790 finally:
2791 self._recursively_seen.discard(ref)
2792
2793 def get_schema_from_ref(self, ref: str) -> CoreSchema | None:
2794 """Resolve the schema from the given reference."""
2795 return self._definitions.get(ref)
2796
2797 def create_definition_reference_schema(self, schema: CoreSchema) -> core_schema.DefinitionReferenceSchema:
2798 """Store the schema as a definition and return a `'definition-reference'` schema pointing to it.
2799
2800 The schema must have a reference attached to it.
2801 """
2802 ref = schema['ref'] # pyright: ignore
2803 self._definitions[ref] = schema
2804 return core_schema.definition_reference_schema(ref)
2805
2806 def unpack_definitions(self, schema: core_schema.DefinitionsSchema) -> CoreSchema:
2807 """Store the definitions of the `'definitions'` core schema and return the inner core schema."""
2808 for def_schema in schema['definitions']:
2809 self._definitions[def_schema['ref']] = def_schema # pyright: ignore
2810 return schema['schema']
2811
2812 def finalize_schema(self, schema: CoreSchema) -> CoreSchema:
2813 """Finalize the core schema.
2814
2815 This traverses the core schema and referenced definitions, replaces `'definition-ref'` schemas
2816 by the referenced definition if possible, and applies deferred discriminators.
2817 """
2818 definitions = self._definitions
2819 try:
2820 gather_result = gather_schemas_for_cleaning(
2821 schema,
2822 definitions=definitions,
2823 )
2824 except MissingDefinitionError as e:
2825 raise InvalidSchemaError from e
2826
2827 remaining_defs: dict[str, CoreSchema] = {}
2828
2829 # Note: this logic doesn't play well when core schemas with deferred discriminator metadata
2830 # and references are encountered. See the `test_deferred_discriminated_union_and_references()` test.
2831 for ref, inlinable_def_ref in gather_result['collected_references'].items():
2832 if inlinable_def_ref is not None and (inlining_behavior := _inlining_behavior(inlinable_def_ref)) != 'keep':
2833 if inlining_behavior == 'inline':
2834 # `ref` was encountered, and only once:
2835 # - `inlinable_def_ref` is a `'definition-ref'` schema and is guaranteed to be
2836 # the only one. Transform it into the definition it points to.
2837 # - Do not store the definition in the `remaining_defs`.
2838 inlinable_def_ref.clear() # pyright: ignore[reportAttributeAccessIssue]
2839 inlinable_def_ref.update(self._resolve_definition(ref, definitions)) # pyright: ignore
2840 elif inlining_behavior == 'preserve_metadata':
2841 # `ref` was encountered, and only once, but contains discriminator metadata.
2842 # We will do the same thing as if `inlining_behavior` was `'inline'`, but make
2843 # sure to keep the metadata for the deferred discriminator application logic below.
2844 meta = inlinable_def_ref.pop('metadata')
2845 inlinable_def_ref.clear() # pyright: ignore[reportAttributeAccessIssue]
2846 inlinable_def_ref.update(self._resolve_definition(ref, definitions)) # pyright: ignore
2847 inlinable_def_ref['metadata'] = meta
2848 else:
2849 # `ref` was encountered, at least two times (or only once, but with metadata or a serialization schema):
2850 # - Do not inline the `'definition-ref'` schemas (they are not provided in the gather result anyway).
2851 # - Store the definition in the `remaining_defs`
2852 remaining_defs[ref] = self._resolve_definition(ref, definitions)
2853
2854 for cs in gather_result['deferred_discriminator_schemas']:
2855 discriminator: str | None = cs['metadata'].pop('pydantic_internal_union_discriminator', None) # pyright: ignore[reportTypedDictNotRequiredAccess]
2856 if discriminator is None:
2857 # This can happen in rare scenarios, when a deferred schema is present multiple times in the
2858 # gather result (e.g. when using the `Sequence` type -- see `test_sequence_discriminated_union()`).
2859 # In this case, a previous loop iteration applied the discriminator and so we can just skip it here.
2860 continue
2861 applied = _discriminated_union.apply_discriminator(cs.copy(), discriminator, remaining_defs)
2862 # Mutate the schema directly to have the discriminator applied
2863 cs.clear() # pyright: ignore[reportAttributeAccessIssue]
2864 cs.update(applied) # pyright: ignore
2865
2866 if remaining_defs:
2867 schema = core_schema.definitions_schema(schema=schema, definitions=[*remaining_defs.values()])
2868 return schema
2869
2870 def _resolve_definition(self, ref: str, definitions: dict[str, CoreSchema]) -> CoreSchema:
2871 definition = definitions[ref]
2872 if definition['type'] != 'definition-ref':
2873 return definition
2874
2875 # Some `'definition-ref'` schemas might act as "intermediate" references (e.g. when using
2876 # a PEP 695 type alias (which is referenceable) that references another PEP 695 type alias):
2877 visited: set[str] = set()
2878 while definition['type'] == 'definition-ref' and _inlining_behavior(definition) == 'inline':
2879 schema_ref = definition['schema_ref']
2880 if schema_ref in visited:
2881 raise PydanticUserError(
2882 f'{ref} contains a circular reference to itself.', code='circular-reference-schema'
2883 )
2884 visited.add(schema_ref)
2885 definition = definitions[schema_ref]
2886 return {**definition, 'ref': ref} # pyright: ignore[reportReturnType]
2887
2888
2889class _FieldNameStack:
2890 __slots__ = ('_stack',)
2891
2892 def __init__(self) -> None:
2893 self._stack: list[str] = []
2894
2895 @contextmanager
2896 def push(self, field_name: str) -> Iterator[None]:
2897 self._stack.append(field_name)
2898 yield
2899 self._stack.pop()
2900
2901 def get(self) -> str | None:
2902 if self._stack:
2903 return self._stack[-1]
2904 else:
2905 return None
2906
2907
2908class _ModelTypeStack:
2909 __slots__ = ('_stack',)
2910
2911 def __init__(self) -> None:
2912 self._stack: list[type] = []
2913
2914 @contextmanager
2915 def push(self, type_obj: type) -> Iterator[None]:
2916 self._stack.append(type_obj)
2917 yield
2918 self._stack.pop()
2919
2920 def get(self) -> type | None:
2921 if self._stack:
2922 return self._stack[-1]
2923 else:
2924 return None