1"""Convert python types to pydantic-core schema."""
2
3from __future__ import annotations as _annotations
4
5import collections.abc
6import dataclasses
7import datetime
8import inspect
9import os
10import pathlib
11import re
12import sys
13import typing
14import warnings
15from collections.abc import Generator, Iterable, Iterator, Mapping
16from contextlib import contextmanager
17from copy import copy
18from decimal import Decimal
19from enum import Enum
20from fractions import Fraction
21from functools import partial
22from inspect import Parameter, _ParameterKind, signature
23from ipaddress import IPv4Address, IPv4Interface, IPv4Network, IPv6Address, IPv6Interface, IPv6Network
24from itertools import chain
25from operator import attrgetter
26from types import FunctionType, GenericAlias, LambdaType, MethodType
27from typing import (
28 TYPE_CHECKING,
29 Any,
30 Callable,
31 Final,
32 ForwardRef,
33 Literal,
34 TypeVar,
35 Union,
36 cast,
37 overload,
38)
39from uuid import UUID
40from warnings import warn
41from zoneinfo import ZoneInfo
42
43import typing_extensions
44from pydantic_core import (
45 CoreSchema,
46 MultiHostUrl,
47 PydanticCustomError,
48 PydanticSerializationUnexpectedValue,
49 PydanticUndefined,
50 Url,
51 core_schema,
52 to_jsonable_python,
53)
54from typing_extensions import TypeAlias, TypeAliasType, TypedDict, get_args, get_origin, is_typeddict
55from typing_inspection import typing_objects
56from typing_inspection.introspection import AnnotationSource, get_literal_values, is_union_origin
57
58from ..aliases import AliasChoices, AliasGenerator, AliasPath
59from ..annotated_handlers import GetCoreSchemaHandler, GetJsonSchemaHandler
60from ..config import ConfigDict, JsonDict, JsonEncoder, JsonSchemaExtraCallable
61from ..errors import PydanticSchemaGenerationError, PydanticUndefinedAnnotation, PydanticUserError
62from ..functional_validators import AfterValidator, BeforeValidator, FieldValidatorModes, PlainValidator, WrapValidator
63from ..json_schema import JsonSchemaValue
64from ..version import version_short
65from ..warnings import PydanticDeprecatedSince20
66from . import _decorators, _discriminated_union, _known_annotated_metadata, _repr, _typing_extra
67from ._config import ConfigWrapper, ConfigWrapperStack
68from ._core_metadata import CoreMetadata, update_core_metadata
69from ._core_utils import (
70 get_ref,
71 get_type_ref,
72 is_list_like_schema_with_items_schema,
73 validate_core_schema,
74)
75from ._decorators import (
76 Decorator,
77 DecoratorInfos,
78 FieldSerializerDecoratorInfo,
79 FieldValidatorDecoratorInfo,
80 ModelSerializerDecoratorInfo,
81 ModelValidatorDecoratorInfo,
82 RootValidatorDecoratorInfo,
83 ValidatorDecoratorInfo,
84 get_attribute_from_bases,
85 inspect_field_serializer,
86 inspect_model_serializer,
87 inspect_validator,
88)
89from ._docs_extraction import extract_docstrings_from_cls
90from ._fields import (
91 collect_dataclass_fields,
92 rebuild_dataclass_fields,
93 rebuild_model_fields,
94 takes_validated_data_argument,
95)
96from ._forward_ref import PydanticRecursiveRef
97from ._generics import get_standard_typevars_map, replace_types
98from ._import_utils import import_cached_base_model, import_cached_field_info
99from ._mock_val_ser import MockCoreSchema
100from ._namespace_utils import NamespacesTuple, NsResolver
101from ._schema_gather import MissingDefinitionError, gather_schemas_for_cleaning
102from ._schema_generation_shared import CallbackGetCoreSchemaHandler
103from ._utils import lenient_issubclass, smart_deepcopy
104
105if TYPE_CHECKING:
106 from ..fields import ComputedFieldInfo, FieldInfo
107 from ..main import BaseModel
108 from ..types import Discriminator
109 from ._dataclasses import StandardDataclass
110 from ._schema_generation_shared import GetJsonSchemaFunction
111
112_SUPPORTS_TYPEDDICT = sys.version_info >= (3, 12)
113
114FieldDecoratorInfo = Union[ValidatorDecoratorInfo, FieldValidatorDecoratorInfo, FieldSerializerDecoratorInfo]
115FieldDecoratorInfoType = TypeVar('FieldDecoratorInfoType', bound=FieldDecoratorInfo)
116AnyFieldDecorator = Union[
117 Decorator[ValidatorDecoratorInfo],
118 Decorator[FieldValidatorDecoratorInfo],
119 Decorator[FieldSerializerDecoratorInfo],
120]
121
122ModifyCoreSchemaWrapHandler: TypeAlias = GetCoreSchemaHandler
123GetCoreSchemaFunction: TypeAlias = Callable[[Any, ModifyCoreSchemaWrapHandler], core_schema.CoreSchema]
124ParametersCallback: TypeAlias = "Callable[[int, str, Any], Literal['skip'] | None]"
125
126TUPLE_TYPES: list[type] = [typing.Tuple, tuple] # noqa: UP006
127LIST_TYPES: list[type] = [typing.List, list, collections.abc.MutableSequence] # noqa: UP006
128SET_TYPES: list[type] = [typing.Set, set, collections.abc.MutableSet] # noqa: UP006
129FROZEN_SET_TYPES: list[type] = [typing.FrozenSet, frozenset, collections.abc.Set] # noqa: UP006
130DICT_TYPES: list[type] = [typing.Dict, dict] # noqa: UP006
131IP_TYPES: list[type] = [IPv4Address, IPv4Interface, IPv4Network, IPv6Address, IPv6Interface, IPv6Network]
132SEQUENCE_TYPES: list[type] = [typing.Sequence, collections.abc.Sequence]
133ITERABLE_TYPES: list[type] = [typing.Iterable, collections.abc.Iterable, typing.Generator, collections.abc.Generator]
134TYPE_TYPES: list[type] = [typing.Type, type] # noqa: UP006
135PATTERN_TYPES: list[type] = [typing.Pattern, re.Pattern]
136PATH_TYPES: list[type] = [
137 os.PathLike,
138 pathlib.Path,
139 pathlib.PurePath,
140 pathlib.PosixPath,
141 pathlib.PurePosixPath,
142 pathlib.PureWindowsPath,
143]
144MAPPING_TYPES = [
145 typing.Mapping,
146 typing.MutableMapping,
147 collections.abc.Mapping,
148 collections.abc.MutableMapping,
149 collections.OrderedDict,
150 typing_extensions.OrderedDict,
151 typing.DefaultDict, # noqa: UP006
152 collections.defaultdict,
153]
154COUNTER_TYPES = [collections.Counter, typing.Counter]
155DEQUE_TYPES: list[type] = [collections.deque, typing.Deque] # noqa: UP006
156
157# Note: This does not play very well with type checkers. For example,
158# `a: LambdaType = lambda x: x` will raise a type error by Pyright.
159ValidateCallSupportedTypes = Union[
160 LambdaType,
161 FunctionType,
162 MethodType,
163 partial,
164]
165
166VALIDATE_CALL_SUPPORTED_TYPES = get_args(ValidateCallSupportedTypes)
167
168_mode_to_validator: dict[
169 FieldValidatorModes, type[BeforeValidator | AfterValidator | PlainValidator | WrapValidator]
170] = {'before': BeforeValidator, 'after': AfterValidator, 'plain': PlainValidator, 'wrap': WrapValidator}
171
172
173def check_validator_fields_against_field_name(
174 info: FieldDecoratorInfo,
175 field: str,
176) -> bool:
177 """Check if field name is in validator fields.
178
179 Args:
180 info: The field info.
181 field: The field name to check.
182
183 Returns:
184 `True` if field name is in validator fields, `False` otherwise.
185 """
186 fields = info.fields
187 return '*' in fields or field in fields
188
189
190def check_decorator_fields_exist(decorators: Iterable[AnyFieldDecorator], fields: Iterable[str]) -> None:
191 """Check if the defined fields in decorators exist in `fields` param.
192
193 It ignores the check for a decorator if the decorator has `*` as field or `check_fields=False`.
194
195 Args:
196 decorators: An iterable of decorators.
197 fields: An iterable of fields name.
198
199 Raises:
200 PydanticUserError: If one of the field names does not exist in `fields` param.
201 """
202 fields = set(fields)
203 for dec in decorators:
204 if '*' in dec.info.fields:
205 continue
206 if dec.info.check_fields is False:
207 continue
208 for field in dec.info.fields:
209 if field not in fields:
210 raise PydanticUserError(
211 f'Decorators defined with incorrect fields: {dec.cls_ref}.{dec.cls_var_name}'
212 " (use check_fields=False if you're inheriting from the model and intended this)",
213 code='decorator-missing-field',
214 )
215
216
217def filter_field_decorator_info_by_field(
218 validator_functions: Iterable[Decorator[FieldDecoratorInfoType]], field: str
219) -> list[Decorator[FieldDecoratorInfoType]]:
220 return [dec for dec in validator_functions if check_validator_fields_against_field_name(dec.info, field)]
221
222
223def apply_each_item_validators(
224 schema: core_schema.CoreSchema,
225 each_item_validators: list[Decorator[ValidatorDecoratorInfo]],
226 field_name: str | None,
227) -> core_schema.CoreSchema:
228 # This V1 compatibility shim should eventually be removed
229
230 # fail early if each_item_validators is empty
231 if not each_item_validators:
232 return schema
233
234 # push down any `each_item=True` validators
235 # note that this won't work for any Annotated types that get wrapped by a function validator
236 # but that's okay because that didn't exist in V1
237 if schema['type'] == 'nullable':
238 schema['schema'] = apply_each_item_validators(schema['schema'], each_item_validators, field_name)
239 return schema
240 elif schema['type'] == 'tuple':
241 if (variadic_item_index := schema.get('variadic_item_index')) is not None:
242 schema['items_schema'][variadic_item_index] = apply_validators(
243 schema['items_schema'][variadic_item_index],
244 each_item_validators,
245 field_name,
246 )
247 elif is_list_like_schema_with_items_schema(schema):
248 inner_schema = schema.get('items_schema', core_schema.any_schema())
249 schema['items_schema'] = apply_validators(inner_schema, each_item_validators, field_name)
250 elif schema['type'] == 'dict':
251 inner_schema = schema.get('values_schema', core_schema.any_schema())
252 schema['values_schema'] = apply_validators(inner_schema, each_item_validators, field_name)
253 else:
254 raise TypeError(
255 f'`@validator(..., each_item=True)` cannot be applied to fields with a schema of {schema["type"]}'
256 )
257 return schema
258
259
260def _extract_json_schema_info_from_field_info(
261 info: FieldInfo | ComputedFieldInfo,
262) -> tuple[JsonDict | None, JsonDict | JsonSchemaExtraCallable | None]:
263 json_schema_updates = {
264 'title': info.title,
265 'description': info.description,
266 'deprecated': bool(info.deprecated) or info.deprecated == '' or None,
267 'examples': to_jsonable_python(info.examples),
268 }
269 json_schema_updates = {k: v for k, v in json_schema_updates.items() if v is not None}
270 return (json_schema_updates or None, info.json_schema_extra)
271
272
273JsonEncoders = dict[type[Any], JsonEncoder]
274
275
276def _add_custom_serialization_from_json_encoders(
277 json_encoders: JsonEncoders | None, tp: Any, schema: CoreSchema
278) -> CoreSchema:
279 """Iterate over the json_encoders and add the first matching encoder to the schema.
280
281 Args:
282 json_encoders: A dictionary of types and their encoder functions.
283 tp: The type to check for a matching encoder.
284 schema: The schema to add the encoder to.
285 """
286 if not json_encoders:
287 return schema
288 if 'serialization' in schema:
289 return schema
290 # Check the class type and its superclasses for a matching encoder
291 # Decimal.__class__.__mro__ (and probably other cases) doesn't include Decimal itself
292 # if the type is a GenericAlias (e.g. from list[int]) we need to use __class__ instead of .__mro__
293 for base in (tp, *getattr(tp, '__mro__', tp.__class__.__mro__)[:-1]):
294 encoder = json_encoders.get(base)
295 if encoder is None:
296 continue
297
298 warnings.warn(
299 f'`json_encoders` is deprecated. See https://docs.pydantic.dev/{version_short()}/concepts/serialization/#custom-serializers for alternatives',
300 PydanticDeprecatedSince20,
301 )
302
303 # TODO: in theory we should check that the schema accepts a serialization key
304 schema['serialization'] = core_schema.plain_serializer_function_ser_schema(encoder, when_used='json')
305 return schema
306
307 return schema
308
309
310def _get_first_non_null(a: Any, b: Any) -> Any:
311 """Return the first argument if it is not None, otherwise return the second argument.
312
313 Use case: serialization_alias (argument a) and alias (argument b) are both defined, and serialization_alias is ''.
314 This function will return serialization_alias, which is the first argument, even though it is an empty string.
315 """
316 return a if a is not None else b
317
318
319class InvalidSchemaError(Exception):
320 """The core schema is invalid."""
321
322
323class GenerateSchema:
324 """Generate core schema for a Pydantic model, dataclass and types like `str`, `datetime`, ... ."""
325
326 __slots__ = (
327 '_config_wrapper_stack',
328 '_ns_resolver',
329 '_typevars_map',
330 'field_name_stack',
331 'model_type_stack',
332 'defs',
333 )
334
335 def __init__(
336 self,
337 config_wrapper: ConfigWrapper,
338 ns_resolver: NsResolver | None = None,
339 typevars_map: Mapping[TypeVar, Any] | None = None,
340 ) -> None:
341 # we need a stack for recursing into nested models
342 self._config_wrapper_stack = ConfigWrapperStack(config_wrapper)
343 self._ns_resolver = ns_resolver or NsResolver()
344 self._typevars_map = typevars_map
345 self.field_name_stack = _FieldNameStack()
346 self.model_type_stack = _ModelTypeStack()
347 self.defs = _Definitions()
348
349 def __init_subclass__(cls) -> None:
350 super().__init_subclass__()
351 warnings.warn(
352 'Subclassing `GenerateSchema` is not supported. The API is highly subject to change in minor versions.',
353 UserWarning,
354 stacklevel=2,
355 )
356
357 @property
358 def _config_wrapper(self) -> ConfigWrapper:
359 return self._config_wrapper_stack.tail
360
361 @property
362 def _types_namespace(self) -> NamespacesTuple:
363 return self._ns_resolver.types_namespace
364
365 @property
366 def _arbitrary_types(self) -> bool:
367 return self._config_wrapper.arbitrary_types_allowed
368
369 # the following methods can be overridden but should be considered
370 # unstable / private APIs
371 def _list_schema(self, items_type: Any) -> CoreSchema:
372 return core_schema.list_schema(self.generate_schema(items_type))
373
374 def _dict_schema(self, keys_type: Any, values_type: Any) -> CoreSchema:
375 return core_schema.dict_schema(self.generate_schema(keys_type), self.generate_schema(values_type))
376
377 def _set_schema(self, items_type: Any) -> CoreSchema:
378 return core_schema.set_schema(self.generate_schema(items_type))
379
380 def _frozenset_schema(self, items_type: Any) -> CoreSchema:
381 return core_schema.frozenset_schema(self.generate_schema(items_type))
382
383 def _enum_schema(self, enum_type: type[Enum]) -> CoreSchema:
384 cases: list[Any] = list(enum_type.__members__.values())
385
386 enum_ref = get_type_ref(enum_type)
387 description = None if not enum_type.__doc__ else inspect.cleandoc(enum_type.__doc__)
388 if (
389 description == 'An enumeration.'
390 ): # This is the default value provided by enum.EnumMeta.__new__; don't use it
391 description = None
392 js_updates = {'title': enum_type.__name__, 'description': description}
393 js_updates = {k: v for k, v in js_updates.items() if v is not None}
394
395 sub_type: Literal['str', 'int', 'float'] | None = None
396 if issubclass(enum_type, int):
397 sub_type = 'int'
398 value_ser_type: core_schema.SerSchema = core_schema.simple_ser_schema('int')
399 elif issubclass(enum_type, str):
400 # this handles `StrEnum` (3.11 only), and also `Foobar(str, Enum)`
401 sub_type = 'str'
402 value_ser_type = core_schema.simple_ser_schema('str')
403 elif issubclass(enum_type, float):
404 sub_type = 'float'
405 value_ser_type = core_schema.simple_ser_schema('float')
406 else:
407 # TODO this is an ugly hack, how do we trigger an Any schema for serialization?
408 value_ser_type = core_schema.plain_serializer_function_ser_schema(lambda x: x)
409
410 if cases:
411
412 def get_json_schema(schema: CoreSchema, handler: GetJsonSchemaHandler) -> JsonSchemaValue:
413 json_schema = handler(schema)
414 original_schema = handler.resolve_ref_schema(json_schema)
415 original_schema.update(js_updates)
416 return json_schema
417
418 # we don't want to add the missing to the schema if it's the default one
419 default_missing = getattr(enum_type._missing_, '__func__', None) is Enum._missing_.__func__ # pyright: ignore[reportFunctionMemberAccess]
420 enum_schema = core_schema.enum_schema(
421 enum_type,
422 cases,
423 sub_type=sub_type,
424 missing=None if default_missing else enum_type._missing_,
425 ref=enum_ref,
426 metadata={'pydantic_js_functions': [get_json_schema]},
427 )
428
429 if self._config_wrapper.use_enum_values:
430 enum_schema = core_schema.no_info_after_validator_function(
431 attrgetter('value'), enum_schema, serialization=value_ser_type
432 )
433
434 return enum_schema
435
436 else:
437
438 def get_json_schema_no_cases(_, handler: GetJsonSchemaHandler) -> JsonSchemaValue:
439 json_schema = handler(core_schema.enum_schema(enum_type, cases, sub_type=sub_type, ref=enum_ref))
440 original_schema = handler.resolve_ref_schema(json_schema)
441 original_schema.update(js_updates)
442 return json_schema
443
444 # Use an isinstance check for enums with no cases.
445 # The most important use case for this is creating TypeVar bounds for generics that should
446 # be restricted to enums. This is more consistent than it might seem at first, since you can only
447 # subclass enum.Enum (or subclasses of enum.Enum) if all parent classes have no cases.
448 # We use the get_json_schema function when an Enum subclass has been declared with no cases
449 # so that we can still generate a valid json schema.
450 return core_schema.is_instance_schema(
451 enum_type,
452 metadata={'pydantic_js_functions': [get_json_schema_no_cases]},
453 )
454
455 def _ip_schema(self, tp: Any) -> CoreSchema:
456 from ._validators import IP_VALIDATOR_LOOKUP, IpType
457
458 ip_type_json_schema_format: dict[type[IpType], str] = {
459 IPv4Address: 'ipv4',
460 IPv4Network: 'ipv4network',
461 IPv4Interface: 'ipv4interface',
462 IPv6Address: 'ipv6',
463 IPv6Network: 'ipv6network',
464 IPv6Interface: 'ipv6interface',
465 }
466
467 def ser_ip(ip: Any, info: core_schema.SerializationInfo) -> str | IpType:
468 if not isinstance(ip, (tp, str)):
469 raise PydanticSerializationUnexpectedValue(
470 f"Expected `{tp}` but got `{type(ip)}` with value `'{ip}'` - serialized value may not be as expected."
471 )
472 if info.mode == 'python':
473 return ip
474 return str(ip)
475
476 return core_schema.lax_or_strict_schema(
477 lax_schema=core_schema.no_info_plain_validator_function(IP_VALIDATOR_LOOKUP[tp]),
478 strict_schema=core_schema.json_or_python_schema(
479 json_schema=core_schema.no_info_after_validator_function(tp, core_schema.str_schema()),
480 python_schema=core_schema.is_instance_schema(tp),
481 ),
482 serialization=core_schema.plain_serializer_function_ser_schema(ser_ip, info_arg=True, when_used='always'),
483 metadata={
484 'pydantic_js_functions': [lambda _1, _2: {'type': 'string', 'format': ip_type_json_schema_format[tp]}]
485 },
486 )
487
488 def _path_schema(self, tp: Any, path_type: Any) -> CoreSchema:
489 if tp is os.PathLike and (path_type not in {str, bytes} and not typing_objects.is_any(path_type)):
490 raise PydanticUserError(
491 '`os.PathLike` can only be used with `str`, `bytes` or `Any`', code='schema-for-unknown-type'
492 )
493
494 path_constructor = pathlib.PurePath if tp is os.PathLike else tp
495 strict_inner_schema = (
496 core_schema.bytes_schema(strict=True) if (path_type is bytes) else core_schema.str_schema(strict=True)
497 )
498 lax_inner_schema = core_schema.bytes_schema() if (path_type is bytes) else core_schema.str_schema()
499
500 def path_validator(input_value: str | bytes) -> os.PathLike[Any]: # type: ignore
501 try:
502 if path_type is bytes:
503 if isinstance(input_value, bytes):
504 try:
505 input_value = input_value.decode()
506 except UnicodeDecodeError as e:
507 raise PydanticCustomError('bytes_type', 'Input must be valid bytes') from e
508 else:
509 raise PydanticCustomError('bytes_type', 'Input must be bytes')
510 elif not isinstance(input_value, str):
511 raise PydanticCustomError('path_type', 'Input is not a valid path')
512
513 return path_constructor(input_value) # type: ignore
514 except TypeError as e:
515 raise PydanticCustomError('path_type', 'Input is not a valid path') from e
516
517 def ser_path(path: Any, info: core_schema.SerializationInfo) -> str | os.PathLike[Any]:
518 if not isinstance(path, (tp, str)):
519 raise PydanticSerializationUnexpectedValue(
520 f"Expected `{tp}` but got `{type(path)}` with value `'{path}'` - serialized value may not be as expected."
521 )
522 if info.mode == 'python':
523 return path
524 return str(path)
525
526 instance_schema = core_schema.json_or_python_schema(
527 json_schema=core_schema.no_info_after_validator_function(path_validator, lax_inner_schema),
528 python_schema=core_schema.is_instance_schema(tp),
529 )
530
531 schema = core_schema.lax_or_strict_schema(
532 lax_schema=core_schema.union_schema(
533 [
534 instance_schema,
535 core_schema.no_info_after_validator_function(path_validator, strict_inner_schema),
536 ],
537 custom_error_type='path_type',
538 custom_error_message=f'Input is not a valid path for {tp}',
539 ),
540 strict_schema=instance_schema,
541 serialization=core_schema.plain_serializer_function_ser_schema(ser_path, info_arg=True, when_used='always'),
542 metadata={'pydantic_js_functions': [lambda source, handler: {**handler(source), 'format': 'path'}]},
543 )
544 return schema
545
546 def _deque_schema(self, items_type: Any) -> CoreSchema:
547 from ._serializers import serialize_sequence_via_list
548 from ._validators import deque_validator
549
550 item_type_schema = self.generate_schema(items_type)
551
552 # we have to use a lax list schema here, because we need to validate the deque's
553 # items via a list schema, but it's ok if the deque itself is not a list
554 list_schema = core_schema.list_schema(item_type_schema, strict=False)
555
556 check_instance = core_schema.json_or_python_schema(
557 json_schema=list_schema,
558 python_schema=core_schema.is_instance_schema(collections.deque, cls_repr='Deque'),
559 )
560
561 lax_schema = core_schema.no_info_wrap_validator_function(deque_validator, list_schema)
562
563 return core_schema.lax_or_strict_schema(
564 lax_schema=lax_schema,
565 strict_schema=core_schema.chain_schema([check_instance, lax_schema]),
566 serialization=core_schema.wrap_serializer_function_ser_schema(
567 serialize_sequence_via_list, schema=item_type_schema, info_arg=True
568 ),
569 )
570
571 def _mapping_schema(self, tp: Any, keys_type: Any, values_type: Any) -> CoreSchema:
572 from ._validators import MAPPING_ORIGIN_MAP, defaultdict_validator, get_defaultdict_default_default_factory
573
574 mapped_origin = MAPPING_ORIGIN_MAP[tp]
575 keys_schema = self.generate_schema(keys_type)
576 values_schema = self.generate_schema(values_type)
577 dict_schema = core_schema.dict_schema(keys_schema, values_schema, strict=False)
578
579 if mapped_origin is dict:
580 schema = dict_schema
581 else:
582 check_instance = core_schema.json_or_python_schema(
583 json_schema=dict_schema,
584 python_schema=core_schema.is_instance_schema(mapped_origin),
585 )
586
587 if tp is collections.defaultdict:
588 default_default_factory = get_defaultdict_default_default_factory(values_type)
589 coerce_instance_wrap = partial(
590 core_schema.no_info_wrap_validator_function,
591 partial(defaultdict_validator, default_default_factory=default_default_factory),
592 )
593 else:
594 coerce_instance_wrap = partial(core_schema.no_info_after_validator_function, mapped_origin)
595
596 lax_schema = coerce_instance_wrap(dict_schema)
597 strict_schema = core_schema.chain_schema([check_instance, lax_schema])
598
599 schema = core_schema.lax_or_strict_schema(
600 lax_schema=lax_schema,
601 strict_schema=strict_schema,
602 serialization=core_schema.wrap_serializer_function_ser_schema(
603 lambda v, h: h(v), schema=dict_schema, info_arg=False
604 ),
605 )
606
607 return schema
608
609 def _fraction_schema(self) -> CoreSchema:
610 """Support for [`fractions.Fraction`][fractions.Fraction]."""
611 from ._validators import fraction_validator
612
613 # TODO: note, this is a fairly common pattern, re lax / strict for attempted type coercion,
614 # can we use a helper function to reduce boilerplate?
615 return core_schema.lax_or_strict_schema(
616 lax_schema=core_schema.no_info_plain_validator_function(fraction_validator),
617 strict_schema=core_schema.json_or_python_schema(
618 json_schema=core_schema.no_info_plain_validator_function(fraction_validator),
619 python_schema=core_schema.is_instance_schema(Fraction),
620 ),
621 # use str serialization to guarantee round trip behavior
622 serialization=core_schema.to_string_ser_schema(when_used='always'),
623 metadata={'pydantic_js_functions': [lambda _1, _2: {'type': 'string', 'format': 'fraction'}]},
624 )
625
626 def _arbitrary_type_schema(self, tp: Any) -> CoreSchema:
627 if not isinstance(tp, type):
628 warn(
629 f'{tp!r} is not a Python type (it may be an instance of an object),'
630 ' Pydantic will allow any object with no validation since we cannot even'
631 ' enforce that the input is an instance of the given type.'
632 ' To get rid of this error wrap the type with `pydantic.SkipValidation`.',
633 UserWarning,
634 )
635 return core_schema.any_schema()
636 return core_schema.is_instance_schema(tp)
637
638 def _unknown_type_schema(self, obj: Any) -> CoreSchema:
639 raise PydanticSchemaGenerationError(
640 f'Unable to generate pydantic-core schema for {obj!r}. '
641 'Set `arbitrary_types_allowed=True` in the model_config to ignore this error'
642 ' or implement `__get_pydantic_core_schema__` on your type to fully support it.'
643 '\n\nIf you got this error by calling handler(<some type>) within'
644 ' `__get_pydantic_core_schema__` then you likely need to call'
645 ' `handler.generate_schema(<some type>)` since we do not call'
646 ' `__get_pydantic_core_schema__` on `<some type>` otherwise to avoid infinite recursion.'
647 )
648
649 def _apply_discriminator_to_union(
650 self, schema: CoreSchema, discriminator: str | Discriminator | None
651 ) -> CoreSchema:
652 if discriminator is None:
653 return schema
654 try:
655 return _discriminated_union.apply_discriminator(
656 schema,
657 discriminator,
658 self.defs._definitions,
659 )
660 except _discriminated_union.MissingDefinitionForUnionRef:
661 # defer until defs are resolved
662 _discriminated_union.set_discriminator_in_metadata(
663 schema,
664 discriminator,
665 )
666 return schema
667
668 def clean_schema(self, schema: CoreSchema) -> CoreSchema:
669 schema = self.defs.finalize_schema(schema)
670 schema = validate_core_schema(schema)
671 return schema
672
673 def _add_js_function(self, metadata_schema: CoreSchema, js_function: Callable[..., Any]) -> None:
674 metadata = metadata_schema.get('metadata', {})
675 pydantic_js_functions = metadata.setdefault('pydantic_js_functions', [])
676 # because of how we generate core schemas for nested generic models
677 # we can end up adding `BaseModel.__get_pydantic_json_schema__` multiple times
678 # this check may fail to catch duplicates if the function is a `functools.partial`
679 # or something like that, but if it does it'll fail by inserting the duplicate
680 if js_function not in pydantic_js_functions:
681 pydantic_js_functions.append(js_function)
682 metadata_schema['metadata'] = metadata
683
684 def generate_schema(
685 self,
686 obj: Any,
687 ) -> core_schema.CoreSchema:
688 """Generate core schema.
689
690 Args:
691 obj: The object to generate core schema for.
692
693 Returns:
694 The generated core schema.
695
696 Raises:
697 PydanticUndefinedAnnotation:
698 If it is not possible to evaluate forward reference.
699 PydanticSchemaGenerationError:
700 If it is not possible to generate pydantic-core schema.
701 TypeError:
702 - If `alias_generator` returns a disallowed type (must be str, AliasPath or AliasChoices).
703 - If V1 style validator with `each_item=True` applied on a wrong field.
704 PydanticUserError:
705 - If `typing.TypedDict` is used instead of `typing_extensions.TypedDict` on Python < 3.12.
706 - If `__modify_schema__` method is used instead of `__get_pydantic_json_schema__`.
707 """
708 schema = self._generate_schema_from_get_schema_method(obj, obj)
709
710 if schema is None:
711 schema = self._generate_schema_inner(obj)
712
713 metadata_js_function = _extract_get_pydantic_json_schema(obj)
714 if metadata_js_function is not None:
715 metadata_schema = resolve_original_schema(schema, self.defs)
716 if metadata_schema:
717 self._add_js_function(metadata_schema, metadata_js_function)
718
719 schema = _add_custom_serialization_from_json_encoders(self._config_wrapper.json_encoders, obj, schema)
720
721 return schema
722
723 def _model_schema(self, cls: type[BaseModel]) -> core_schema.CoreSchema:
724 """Generate schema for a Pydantic model."""
725 BaseModel_ = import_cached_base_model()
726
727 with self.defs.get_schema_or_ref(cls) as (model_ref, maybe_schema):
728 if maybe_schema is not None:
729 return maybe_schema
730
731 schema = cls.__dict__.get('__pydantic_core_schema__')
732 if schema is not None and not isinstance(schema, MockCoreSchema):
733 if schema['type'] == 'definitions':
734 schema = self.defs.unpack_definitions(schema)
735 ref = get_ref(schema)
736 if ref:
737 return self.defs.create_definition_reference_schema(schema)
738 else:
739 return schema
740
741 config_wrapper = ConfigWrapper(cls.model_config, check=False)
742
743 with self._config_wrapper_stack.push(config_wrapper), self._ns_resolver.push(cls):
744 core_config = self._config_wrapper.core_config(title=cls.__name__)
745
746 if cls.__pydantic_fields_complete__ or cls is BaseModel_:
747 fields = getattr(cls, '__pydantic_fields__', {})
748 else:
749 if not hasattr(cls, '__pydantic_fields__'):
750 # This happens when we have a loop in the schema generation:
751 # class Base[T](BaseModel):
752 # t: T
753 #
754 # class Other(BaseModel):
755 # b: 'Base[Other]'
756 # When we build fields for `Other`, we evaluate the forward annotation.
757 # At this point, `Other` doesn't have the model fields set. We create
758 # `Base[Other]`; model fields are successfully built, and we try to generate
759 # a schema for `t: Other`. As `Other.__pydantic_fields__` aren't set, we abort.
760 raise PydanticUndefinedAnnotation(
761 name=cls.__name__,
762 message=f'Class {cls.__name__!r} is not defined',
763 )
764 try:
765 fields = rebuild_model_fields(
766 cls,
767 ns_resolver=self._ns_resolver,
768 typevars_map=self._typevars_map or {},
769 )
770 except NameError as e:
771 raise PydanticUndefinedAnnotation.from_name_error(e) from e
772
773 decorators = cls.__pydantic_decorators__
774 computed_fields = decorators.computed_fields
775 check_decorator_fields_exist(
776 chain(
777 decorators.field_validators.values(),
778 decorators.field_serializers.values(),
779 decorators.validators.values(),
780 ),
781 {*fields.keys(), *computed_fields.keys()},
782 )
783
784 model_validators = decorators.model_validators.values()
785
786 extras_schema = None
787 extras_keys_schema = None
788 if core_config.get('extra_fields_behavior') == 'allow':
789 assert cls.__mro__[0] is cls
790 assert cls.__mro__[-1] is object
791 for candidate_cls in cls.__mro__[:-1]:
792 extras_annotation = getattr(candidate_cls, '__annotations__', {}).get(
793 '__pydantic_extra__', None
794 )
795 if extras_annotation is not None:
796 if isinstance(extras_annotation, str):
797 extras_annotation = _typing_extra.eval_type_backport(
798 _typing_extra._make_forward_ref(
799 extras_annotation, is_argument=False, is_class=True
800 ),
801 *self._types_namespace,
802 )
803 tp = get_origin(extras_annotation)
804 if tp not in DICT_TYPES:
805 raise PydanticSchemaGenerationError(
806 'The type annotation for `__pydantic_extra__` must be `dict[str, ...]`'
807 )
808 extra_keys_type, extra_items_type = self._get_args_resolving_forward_refs(
809 extras_annotation,
810 required=True,
811 )
812 if extra_keys_type is not str:
813 extras_keys_schema = self.generate_schema(extra_keys_type)
814 if not typing_objects.is_any(extra_items_type):
815 extras_schema = self.generate_schema(extra_items_type)
816 if extras_keys_schema is not None or extras_schema is not None:
817 break
818
819 generic_origin: type[BaseModel] | None = getattr(cls, '__pydantic_generic_metadata__', {}).get('origin')
820
821 if cls.__pydantic_root_model__:
822 root_field = self._common_field_schema('root', fields['root'], decorators)
823 inner_schema = root_field['schema']
824 inner_schema = apply_model_validators(inner_schema, model_validators, 'inner')
825 model_schema = core_schema.model_schema(
826 cls,
827 inner_schema,
828 generic_origin=generic_origin,
829 custom_init=getattr(cls, '__pydantic_custom_init__', None),
830 root_model=True,
831 post_init=getattr(cls, '__pydantic_post_init__', None),
832 config=core_config,
833 ref=model_ref,
834 )
835 else:
836 fields_schema: core_schema.CoreSchema = core_schema.model_fields_schema(
837 {k: self._generate_md_field_schema(k, v, decorators) for k, v in fields.items()},
838 computed_fields=[
839 self._computed_field_schema(d, decorators.field_serializers)
840 for d in computed_fields.values()
841 ],
842 extras_schema=extras_schema,
843 extras_keys_schema=extras_keys_schema,
844 model_name=cls.__name__,
845 )
846 inner_schema = apply_validators(fields_schema, decorators.root_validators.values(), None)
847 inner_schema = apply_model_validators(inner_schema, model_validators, 'inner')
848
849 model_schema = core_schema.model_schema(
850 cls,
851 inner_schema,
852 generic_origin=generic_origin,
853 custom_init=getattr(cls, '__pydantic_custom_init__', None),
854 root_model=False,
855 post_init=getattr(cls, '__pydantic_post_init__', None),
856 config=core_config,
857 ref=model_ref,
858 )
859
860 schema = self._apply_model_serializers(model_schema, decorators.model_serializers.values())
861 schema = apply_model_validators(schema, model_validators, 'outer')
862 return self.defs.create_definition_reference_schema(schema)
863
864 def _resolve_self_type(self, obj: Any) -> Any:
865 obj = self.model_type_stack.get()
866 if obj is None:
867 raise PydanticUserError('`typing.Self` is invalid in this context', code='invalid-self-type')
868 return obj
869
870 def _generate_schema_from_get_schema_method(self, obj: Any, source: Any) -> core_schema.CoreSchema | None:
871 BaseModel_ = import_cached_base_model()
872
873 get_schema = getattr(obj, '__get_pydantic_core_schema__', None)
874 is_base_model_get_schema = (
875 getattr(get_schema, '__func__', None) is BaseModel_.__get_pydantic_core_schema__.__func__ # pyright: ignore[reportFunctionMemberAccess]
876 )
877
878 if (
879 get_schema is not None
880 # BaseModel.__get_pydantic_core_schema__ is defined for backwards compatibility,
881 # to allow existing code to call `super().__get_pydantic_core_schema__` in Pydantic
882 # model that overrides `__get_pydantic_core_schema__`. However, it raises a deprecation
883 # warning stating that the method will be removed, and during the core schema gen we actually
884 # don't call the method:
885 and not is_base_model_get_schema
886 ):
887 # Some referenceable types might have a `__get_pydantic_core_schema__` method
888 # defined on it by users (e.g. on a dataclass). This generally doesn't play well
889 # as these types are already recognized by the `GenerateSchema` class and isn't ideal
890 # as we might end up calling `get_schema_or_ref` (expensive) on types that are actually
891 # not referenceable:
892 with self.defs.get_schema_or_ref(obj) as (_, maybe_schema):
893 if maybe_schema is not None:
894 return maybe_schema
895
896 if obj is source:
897 ref_mode = 'unpack'
898 else:
899 ref_mode = 'to-def'
900 schema = get_schema(
901 source, CallbackGetCoreSchemaHandler(self._generate_schema_inner, self, ref_mode=ref_mode)
902 )
903 if schema['type'] == 'definitions':
904 schema = self.defs.unpack_definitions(schema)
905
906 ref = get_ref(schema)
907 if ref:
908 return self.defs.create_definition_reference_schema(schema)
909
910 # Note: if schema is of type `'definition-ref'`, we might want to copy it as a
911 # safety measure (because these are inlined in place -- i.e. mutated directly)
912 return schema
913
914 if get_schema is None and (validators := getattr(obj, '__get_validators__', None)) is not None:
915 from pydantic.v1 import BaseModel as BaseModelV1
916
917 if issubclass(obj, BaseModelV1):
918 warn(
919 f'Mixing V1 models and V2 models (or constructs, like `TypeAdapter`) is not supported. Please upgrade `{obj.__name__}` to V2.',
920 UserWarning,
921 )
922 else:
923 warn(
924 '`__get_validators__` is deprecated and will be removed, use `__get_pydantic_core_schema__` instead.',
925 PydanticDeprecatedSince20,
926 )
927 return core_schema.chain_schema([core_schema.with_info_plain_validator_function(v) for v in validators()])
928
929 def _resolve_forward_ref(self, obj: Any) -> Any:
930 # we assume that types_namespace has the target of forward references in its scope,
931 # but this could fail, for example, if calling Validator on an imported type which contains
932 # forward references to other types only defined in the module from which it was imported
933 # `Validator(SomeImportedTypeAliasWithAForwardReference)`
934 # or the equivalent for BaseModel
935 # class Model(BaseModel):
936 # x: SomeImportedTypeAliasWithAForwardReference
937 try:
938 obj = _typing_extra.eval_type_backport(obj, *self._types_namespace)
939 except NameError as e:
940 raise PydanticUndefinedAnnotation.from_name_error(e) from e
941
942 # if obj is still a ForwardRef, it means we can't evaluate it, raise PydanticUndefinedAnnotation
943 if isinstance(obj, ForwardRef):
944 raise PydanticUndefinedAnnotation(obj.__forward_arg__, f'Unable to evaluate forward reference {obj}')
945
946 if self._typevars_map:
947 obj = replace_types(obj, self._typevars_map)
948
949 return obj
950
951 @overload
952 def _get_args_resolving_forward_refs(self, obj: Any, required: Literal[True]) -> tuple[Any, ...]: ...
953
954 @overload
955 def _get_args_resolving_forward_refs(self, obj: Any) -> tuple[Any, ...] | None: ...
956
957 def _get_args_resolving_forward_refs(self, obj: Any, required: bool = False) -> tuple[Any, ...] | None:
958 args = get_args(obj)
959 if args:
960 if isinstance(obj, GenericAlias):
961 # PEP 585 generic aliases don't convert args to ForwardRefs, unlike `typing.List/Dict` etc.
962 args = (_typing_extra._make_forward_ref(a) if isinstance(a, str) else a for a in args)
963 args = tuple(self._resolve_forward_ref(a) if isinstance(a, ForwardRef) else a for a in args)
964 elif required: # pragma: no cover
965 raise TypeError(f'Expected {obj} to have generic parameters but it had none')
966 return args
967
968 def _get_first_arg_or_any(self, obj: Any) -> Any:
969 args = self._get_args_resolving_forward_refs(obj)
970 if not args:
971 return Any
972 return args[0]
973
974 def _get_first_two_args_or_any(self, obj: Any) -> tuple[Any, Any]:
975 args = self._get_args_resolving_forward_refs(obj)
976 if not args:
977 return (Any, Any)
978 if len(args) < 2:
979 origin = get_origin(obj)
980 raise TypeError(f'Expected two type arguments for {origin}, got 1')
981 return args[0], args[1]
982
983 def _generate_schema_inner(self, obj: Any) -> core_schema.CoreSchema:
984 if typing_objects.is_self(obj):
985 obj = self._resolve_self_type(obj)
986
987 if typing_objects.is_annotated(get_origin(obj)):
988 return self._annotated_schema(obj)
989
990 if isinstance(obj, dict):
991 # we assume this is already a valid schema
992 return obj # type: ignore[return-value]
993
994 if isinstance(obj, str):
995 obj = ForwardRef(obj)
996
997 if isinstance(obj, ForwardRef):
998 return self.generate_schema(self._resolve_forward_ref(obj))
999
1000 BaseModel = import_cached_base_model()
1001
1002 if lenient_issubclass(obj, BaseModel):
1003 with self.model_type_stack.push(obj):
1004 return self._model_schema(obj)
1005
1006 if isinstance(obj, PydanticRecursiveRef):
1007 return core_schema.definition_reference_schema(schema_ref=obj.type_ref)
1008
1009 return self.match_type(obj)
1010
1011 def match_type(self, obj: Any) -> core_schema.CoreSchema: # noqa: C901
1012 """Main mapping of types to schemas.
1013
1014 The general structure is a series of if statements starting with the simple cases
1015 (non-generic primitive types) and then handling generics and other more complex cases.
1016
1017 Each case either generates a schema directly, calls into a public user-overridable method
1018 (like `GenerateSchema.tuple_variable_schema`) or calls into a private method that handles some
1019 boilerplate before calling into the user-facing method (e.g. `GenerateSchema._tuple_schema`).
1020
1021 The idea is that we'll evolve this into adding more and more user facing methods over time
1022 as they get requested and we figure out what the right API for them is.
1023 """
1024 if obj is str:
1025 return core_schema.str_schema()
1026 elif obj is bytes:
1027 return core_schema.bytes_schema()
1028 elif obj is int:
1029 return core_schema.int_schema()
1030 elif obj is float:
1031 return core_schema.float_schema()
1032 elif obj is bool:
1033 return core_schema.bool_schema()
1034 elif obj is complex:
1035 return core_schema.complex_schema()
1036 elif typing_objects.is_any(obj) or obj is object:
1037 return core_schema.any_schema()
1038 elif obj is datetime.date:
1039 return core_schema.date_schema()
1040 elif obj is datetime.datetime:
1041 return core_schema.datetime_schema()
1042 elif obj is datetime.time:
1043 return core_schema.time_schema()
1044 elif obj is datetime.timedelta:
1045 return core_schema.timedelta_schema()
1046 elif obj is Decimal:
1047 return core_schema.decimal_schema()
1048 elif obj is UUID:
1049 return core_schema.uuid_schema()
1050 elif obj is Url:
1051 return core_schema.url_schema()
1052 elif obj is Fraction:
1053 return self._fraction_schema()
1054 elif obj is MultiHostUrl:
1055 return core_schema.multi_host_url_schema()
1056 elif obj is None or obj is _typing_extra.NoneType:
1057 return core_schema.none_schema()
1058 elif obj in IP_TYPES:
1059 return self._ip_schema(obj)
1060 elif obj in TUPLE_TYPES:
1061 return self._tuple_schema(obj)
1062 elif obj in LIST_TYPES:
1063 return self._list_schema(Any)
1064 elif obj in SET_TYPES:
1065 return self._set_schema(Any)
1066 elif obj in FROZEN_SET_TYPES:
1067 return self._frozenset_schema(Any)
1068 elif obj in SEQUENCE_TYPES:
1069 return self._sequence_schema(Any)
1070 elif obj in ITERABLE_TYPES:
1071 return self._iterable_schema(obj)
1072 elif obj in DICT_TYPES:
1073 return self._dict_schema(Any, Any)
1074 elif obj in PATH_TYPES:
1075 return self._path_schema(obj, Any)
1076 elif obj in DEQUE_TYPES:
1077 return self._deque_schema(Any)
1078 elif obj in MAPPING_TYPES:
1079 return self._mapping_schema(obj, Any, Any)
1080 elif obj in COUNTER_TYPES:
1081 return self._mapping_schema(obj, Any, int)
1082 elif typing_objects.is_typealiastype(obj):
1083 return self._type_alias_type_schema(obj)
1084 elif obj is type:
1085 return self._type_schema()
1086 elif _typing_extra.is_callable(obj):
1087 return core_schema.callable_schema()
1088 elif typing_objects.is_literal(get_origin(obj)):
1089 return self._literal_schema(obj)
1090 elif is_typeddict(obj):
1091 return self._typed_dict_schema(obj, None)
1092 elif _typing_extra.is_namedtuple(obj):
1093 return self._namedtuple_schema(obj, None)
1094 elif typing_objects.is_newtype(obj):
1095 # NewType, can't use isinstance because it fails <3.10
1096 return self.generate_schema(obj.__supertype__)
1097 elif obj in PATTERN_TYPES:
1098 return self._pattern_schema(obj)
1099 elif _typing_extra.is_hashable(obj):
1100 return self._hashable_schema()
1101 elif isinstance(obj, typing.TypeVar):
1102 return self._unsubstituted_typevar_schema(obj)
1103 elif _typing_extra.is_finalvar(obj):
1104 if obj is Final:
1105 return core_schema.any_schema()
1106 return self.generate_schema(
1107 self._get_first_arg_or_any(obj),
1108 )
1109 elif isinstance(obj, VALIDATE_CALL_SUPPORTED_TYPES):
1110 return self._call_schema(obj)
1111 elif inspect.isclass(obj) and issubclass(obj, Enum):
1112 return self._enum_schema(obj)
1113 elif obj is ZoneInfo:
1114 return self._zoneinfo_schema()
1115
1116 # dataclasses.is_dataclass coerces dc instances to types, but we only handle
1117 # the case of a dc type here
1118 if dataclasses.is_dataclass(obj):
1119 return self._dataclass_schema(obj, None) # pyright: ignore[reportArgumentType]
1120
1121 origin = get_origin(obj)
1122 if origin is not None:
1123 return self._match_generic_type(obj, origin)
1124
1125 if self._arbitrary_types:
1126 return self._arbitrary_type_schema(obj)
1127 return self._unknown_type_schema(obj)
1128
1129 def _match_generic_type(self, obj: Any, origin: Any) -> CoreSchema: # noqa: C901
1130 # Need to handle generic dataclasses before looking for the schema properties because attribute accesses
1131 # on _GenericAlias delegate to the origin type, so lose the information about the concrete parametrization
1132 # As a result, currently, there is no way to cache the schema for generic dataclasses. This may be possible
1133 # to resolve by modifying the value returned by `Generic.__class_getitem__`, but that is a dangerous game.
1134 if dataclasses.is_dataclass(origin):
1135 return self._dataclass_schema(obj, origin) # pyright: ignore[reportArgumentType]
1136 if _typing_extra.is_namedtuple(origin):
1137 return self._namedtuple_schema(obj, origin)
1138
1139 schema = self._generate_schema_from_get_schema_method(origin, obj)
1140 if schema is not None:
1141 return schema
1142
1143 if typing_objects.is_typealiastype(origin):
1144 return self._type_alias_type_schema(obj)
1145 elif is_union_origin(origin):
1146 return self._union_schema(obj)
1147 elif origin in TUPLE_TYPES:
1148 return self._tuple_schema(obj)
1149 elif origin in LIST_TYPES:
1150 return self._list_schema(self._get_first_arg_or_any(obj))
1151 elif origin in SET_TYPES:
1152 return self._set_schema(self._get_first_arg_or_any(obj))
1153 elif origin in FROZEN_SET_TYPES:
1154 return self._frozenset_schema(self._get_first_arg_or_any(obj))
1155 elif origin in DICT_TYPES:
1156 return self._dict_schema(*self._get_first_two_args_or_any(obj))
1157 elif origin in PATH_TYPES:
1158 return self._path_schema(origin, self._get_first_arg_or_any(obj))
1159 elif origin in DEQUE_TYPES:
1160 return self._deque_schema(self._get_first_arg_or_any(obj))
1161 elif origin in MAPPING_TYPES:
1162 return self._mapping_schema(origin, *self._get_first_two_args_or_any(obj))
1163 elif origin in COUNTER_TYPES:
1164 return self._mapping_schema(origin, self._get_first_arg_or_any(obj), int)
1165 elif is_typeddict(origin):
1166 return self._typed_dict_schema(obj, origin)
1167 elif origin in TYPE_TYPES:
1168 return self._subclass_schema(obj)
1169 elif origin in SEQUENCE_TYPES:
1170 return self._sequence_schema(self._get_first_arg_or_any(obj))
1171 elif origin in ITERABLE_TYPES:
1172 return self._iterable_schema(obj)
1173 elif origin in PATTERN_TYPES:
1174 return self._pattern_schema(obj)
1175
1176 if self._arbitrary_types:
1177 return self._arbitrary_type_schema(origin)
1178 return self._unknown_type_schema(obj)
1179
1180 def _generate_td_field_schema(
1181 self,
1182 name: str,
1183 field_info: FieldInfo,
1184 decorators: DecoratorInfos,
1185 *,
1186 required: bool = True,
1187 ) -> core_schema.TypedDictField:
1188 """Prepare a TypedDictField to represent a model or typeddict field."""
1189 common_field = self._common_field_schema(name, field_info, decorators)
1190 return core_schema.typed_dict_field(
1191 common_field['schema'],
1192 required=False if not field_info.is_required() else required,
1193 serialization_exclude=common_field['serialization_exclude'],
1194 validation_alias=common_field['validation_alias'],
1195 serialization_alias=common_field['serialization_alias'],
1196 metadata=common_field['metadata'],
1197 )
1198
1199 def _generate_md_field_schema(
1200 self,
1201 name: str,
1202 field_info: FieldInfo,
1203 decorators: DecoratorInfos,
1204 ) -> core_schema.ModelField:
1205 """Prepare a ModelField to represent a model field."""
1206 common_field = self._common_field_schema(name, field_info, decorators)
1207 return core_schema.model_field(
1208 common_field['schema'],
1209 serialization_exclude=common_field['serialization_exclude'],
1210 validation_alias=common_field['validation_alias'],
1211 serialization_alias=common_field['serialization_alias'],
1212 frozen=common_field['frozen'],
1213 metadata=common_field['metadata'],
1214 )
1215
1216 def _generate_dc_field_schema(
1217 self,
1218 name: str,
1219 field_info: FieldInfo,
1220 decorators: DecoratorInfos,
1221 ) -> core_schema.DataclassField:
1222 """Prepare a DataclassField to represent the parameter/field, of a dataclass."""
1223 common_field = self._common_field_schema(name, field_info, decorators)
1224 return core_schema.dataclass_field(
1225 name,
1226 common_field['schema'],
1227 init=field_info.init,
1228 init_only=field_info.init_var or None,
1229 kw_only=None if field_info.kw_only else False,
1230 serialization_exclude=common_field['serialization_exclude'],
1231 validation_alias=common_field['validation_alias'],
1232 serialization_alias=common_field['serialization_alias'],
1233 frozen=common_field['frozen'],
1234 metadata=common_field['metadata'],
1235 )
1236
1237 @staticmethod
1238 def _apply_alias_generator_to_field_info(
1239 alias_generator: Callable[[str], str] | AliasGenerator, field_info: FieldInfo, field_name: str
1240 ) -> None:
1241 """Apply an alias_generator to aliases on a FieldInfo instance if appropriate.
1242
1243 Args:
1244 alias_generator: A callable that takes a string and returns a string, or an AliasGenerator instance.
1245 field_info: The FieldInfo instance to which the alias_generator is (maybe) applied.
1246 field_name: The name of the field from which to generate the alias.
1247 """
1248 # Apply an alias_generator if
1249 # 1. An alias is not specified
1250 # 2. An alias is specified, but the priority is <= 1
1251 if (
1252 field_info.alias_priority is None
1253 or field_info.alias_priority <= 1
1254 or field_info.alias is None
1255 or field_info.validation_alias is None
1256 or field_info.serialization_alias is None
1257 ):
1258 alias, validation_alias, serialization_alias = None, None, None
1259
1260 if isinstance(alias_generator, AliasGenerator):
1261 alias, validation_alias, serialization_alias = alias_generator.generate_aliases(field_name)
1262 elif isinstance(alias_generator, Callable):
1263 alias = alias_generator(field_name)
1264 if not isinstance(alias, str):
1265 raise TypeError(f'alias_generator {alias_generator} must return str, not {alias.__class__}')
1266
1267 # if priority is not set, we set to 1
1268 # which supports the case where the alias_generator from a child class is used
1269 # to generate an alias for a field in a parent class
1270 if field_info.alias_priority is None or field_info.alias_priority <= 1:
1271 field_info.alias_priority = 1
1272
1273 # if the priority is 1, then we set the aliases to the generated alias
1274 if field_info.alias_priority == 1:
1275 field_info.serialization_alias = _get_first_non_null(serialization_alias, alias)
1276 field_info.validation_alias = _get_first_non_null(validation_alias, alias)
1277 field_info.alias = alias
1278
1279 # if any of the aliases are not set, then we set them to the corresponding generated alias
1280 if field_info.alias is None:
1281 field_info.alias = alias
1282 if field_info.serialization_alias is None:
1283 field_info.serialization_alias = _get_first_non_null(serialization_alias, alias)
1284 if field_info.validation_alias is None:
1285 field_info.validation_alias = _get_first_non_null(validation_alias, alias)
1286
1287 @staticmethod
1288 def _apply_alias_generator_to_computed_field_info(
1289 alias_generator: Callable[[str], str] | AliasGenerator,
1290 computed_field_info: ComputedFieldInfo,
1291 computed_field_name: str,
1292 ):
1293 """Apply an alias_generator to alias on a ComputedFieldInfo instance if appropriate.
1294
1295 Args:
1296 alias_generator: A callable that takes a string and returns a string, or an AliasGenerator instance.
1297 computed_field_info: The ComputedFieldInfo instance to which the alias_generator is (maybe) applied.
1298 computed_field_name: The name of the computed field from which to generate the alias.
1299 """
1300 # Apply an alias_generator if
1301 # 1. An alias is not specified
1302 # 2. An alias is specified, but the priority is <= 1
1303
1304 if (
1305 computed_field_info.alias_priority is None
1306 or computed_field_info.alias_priority <= 1
1307 or computed_field_info.alias is None
1308 ):
1309 alias, validation_alias, serialization_alias = None, None, None
1310
1311 if isinstance(alias_generator, AliasGenerator):
1312 alias, validation_alias, serialization_alias = alias_generator.generate_aliases(computed_field_name)
1313 elif isinstance(alias_generator, Callable):
1314 alias = alias_generator(computed_field_name)
1315 if not isinstance(alias, str):
1316 raise TypeError(f'alias_generator {alias_generator} must return str, not {alias.__class__}')
1317
1318 # if priority is not set, we set to 1
1319 # which supports the case where the alias_generator from a child class is used
1320 # to generate an alias for a field in a parent class
1321 if computed_field_info.alias_priority is None or computed_field_info.alias_priority <= 1:
1322 computed_field_info.alias_priority = 1
1323
1324 # if the priority is 1, then we set the aliases to the generated alias
1325 # note that we use the serialization_alias with priority over alias, as computed_field
1326 # aliases are used for serialization only (not validation)
1327 if computed_field_info.alias_priority == 1:
1328 computed_field_info.alias = _get_first_non_null(serialization_alias, alias)
1329
1330 @staticmethod
1331 def _apply_field_title_generator_to_field_info(
1332 config_wrapper: ConfigWrapper, field_info: FieldInfo | ComputedFieldInfo, field_name: str
1333 ) -> None:
1334 """Apply a field_title_generator on a FieldInfo or ComputedFieldInfo instance if appropriate
1335 Args:
1336 config_wrapper: The config of the model
1337 field_info: The FieldInfo or ComputedField instance to which the title_generator is (maybe) applied.
1338 field_name: The name of the field from which to generate the title.
1339 """
1340 field_title_generator = field_info.field_title_generator or config_wrapper.field_title_generator
1341
1342 if field_title_generator is None:
1343 return
1344
1345 if field_info.title is None:
1346 title = field_title_generator(field_name, field_info) # type: ignore
1347 if not isinstance(title, str):
1348 raise TypeError(f'field_title_generator {field_title_generator} must return str, not {title.__class__}')
1349
1350 field_info.title = title
1351
1352 def _common_field_schema( # C901
1353 self, name: str, field_info: FieldInfo, decorators: DecoratorInfos
1354 ) -> _CommonField:
1355 source_type, annotations = field_info.annotation, field_info.metadata
1356
1357 def set_discriminator(schema: CoreSchema) -> CoreSchema:
1358 schema = self._apply_discriminator_to_union(schema, field_info.discriminator)
1359 return schema
1360
1361 # Convert `@field_validator` decorators to `Before/After/Plain/WrapValidator` instances:
1362 validators_from_decorators = []
1363 for decorator in filter_field_decorator_info_by_field(decorators.field_validators.values(), name):
1364 validators_from_decorators.append(_mode_to_validator[decorator.info.mode]._from_decorator(decorator))
1365
1366 with self.field_name_stack.push(name):
1367 if field_info.discriminator is not None:
1368 schema = self._apply_annotations(
1369 source_type, annotations + validators_from_decorators, transform_inner_schema=set_discriminator
1370 )
1371 else:
1372 schema = self._apply_annotations(
1373 source_type,
1374 annotations + validators_from_decorators,
1375 )
1376
1377 # This V1 compatibility shim should eventually be removed
1378 # push down any `each_item=True` validators
1379 # note that this won't work for any Annotated types that get wrapped by a function validator
1380 # but that's okay because that didn't exist in V1
1381 this_field_validators = filter_field_decorator_info_by_field(decorators.validators.values(), name)
1382 if _validators_require_validate_default(this_field_validators):
1383 field_info.validate_default = True
1384 each_item_validators = [v for v in this_field_validators if v.info.each_item is True]
1385 this_field_validators = [v for v in this_field_validators if v not in each_item_validators]
1386 schema = apply_each_item_validators(schema, each_item_validators, name)
1387
1388 schema = apply_validators(schema, this_field_validators, name)
1389
1390 # the default validator needs to go outside of any other validators
1391 # so that it is the topmost validator for the field validator
1392 # which uses it to check if the field has a default value or not
1393 if not field_info.is_required():
1394 schema = wrap_default(field_info, schema)
1395
1396 schema = self._apply_field_serializers(
1397 schema, filter_field_decorator_info_by_field(decorators.field_serializers.values(), name)
1398 )
1399 self._apply_field_title_generator_to_field_info(self._config_wrapper, field_info, name)
1400
1401 pydantic_js_updates, pydantic_js_extra = _extract_json_schema_info_from_field_info(field_info)
1402 core_metadata: dict[str, Any] = {}
1403 update_core_metadata(
1404 core_metadata, pydantic_js_updates=pydantic_js_updates, pydantic_js_extra=pydantic_js_extra
1405 )
1406
1407 alias_generator = self._config_wrapper.alias_generator
1408 if alias_generator is not None:
1409 self._apply_alias_generator_to_field_info(alias_generator, field_info, name)
1410
1411 if isinstance(field_info.validation_alias, (AliasChoices, AliasPath)):
1412 validation_alias = field_info.validation_alias.convert_to_aliases()
1413 else:
1414 validation_alias = field_info.validation_alias
1415
1416 return _common_field(
1417 schema,
1418 serialization_exclude=True if field_info.exclude else None,
1419 validation_alias=validation_alias,
1420 serialization_alias=field_info.serialization_alias,
1421 frozen=field_info.frozen,
1422 metadata=core_metadata,
1423 )
1424
1425 def _union_schema(self, union_type: Any) -> core_schema.CoreSchema:
1426 """Generate schema for a Union."""
1427 args = self._get_args_resolving_forward_refs(union_type, required=True)
1428 choices: list[CoreSchema] = []
1429 nullable = False
1430 for arg in args:
1431 if arg is None or arg is _typing_extra.NoneType:
1432 nullable = True
1433 else:
1434 choices.append(self.generate_schema(arg))
1435
1436 if len(choices) == 1:
1437 s = choices[0]
1438 else:
1439 choices_with_tags: list[CoreSchema | tuple[CoreSchema, str]] = []
1440 for choice in choices:
1441 tag = cast(CoreMetadata, choice.get('metadata', {})).get('pydantic_internal_union_tag_key')
1442 if tag is not None:
1443 choices_with_tags.append((choice, tag))
1444 else:
1445 choices_with_tags.append(choice)
1446 s = core_schema.union_schema(choices_with_tags)
1447
1448 if nullable:
1449 s = core_schema.nullable_schema(s)
1450 return s
1451
1452 def _type_alias_type_schema(self, obj: TypeAliasType) -> CoreSchema:
1453 with self.defs.get_schema_or_ref(obj) as (ref, maybe_schema):
1454 if maybe_schema is not None:
1455 return maybe_schema
1456
1457 origin: TypeAliasType = get_origin(obj) or obj
1458 typevars_map = get_standard_typevars_map(obj)
1459
1460 with self._ns_resolver.push(origin):
1461 try:
1462 annotation = _typing_extra.eval_type(origin.__value__, *self._types_namespace)
1463 except NameError as e:
1464 raise PydanticUndefinedAnnotation.from_name_error(e) from e
1465 annotation = replace_types(annotation, typevars_map)
1466 schema = self.generate_schema(annotation)
1467 assert schema['type'] != 'definitions'
1468 schema['ref'] = ref # type: ignore
1469 return self.defs.create_definition_reference_schema(schema)
1470
1471 def _literal_schema(self, literal_type: Any) -> CoreSchema:
1472 """Generate schema for a Literal."""
1473 expected = list(get_literal_values(literal_type, type_check=False, unpack_type_aliases='eager'))
1474 assert expected, f'literal "expected" cannot be empty, obj={literal_type}'
1475 schema = core_schema.literal_schema(expected)
1476
1477 if self._config_wrapper.use_enum_values and any(isinstance(v, Enum) for v in expected):
1478 schema = core_schema.no_info_after_validator_function(
1479 lambda v: v.value if isinstance(v, Enum) else v, schema
1480 )
1481
1482 return schema
1483
1484 def _typed_dict_schema(self, typed_dict_cls: Any, origin: Any) -> core_schema.CoreSchema:
1485 """Generate a core schema for a `TypedDict` class.
1486
1487 To be able to build a `DecoratorInfos` instance for the `TypedDict` class (which will include
1488 validators, serializers, etc.), we need to have access to the original bases of the class
1489 (see https://docs.python.org/3/library/types.html#types.get_original_bases).
1490 However, the `__orig_bases__` attribute was only added in 3.12 (https://github.com/python/cpython/pull/103698).
1491
1492 For this reason, we require Python 3.12 (or using the `typing_extensions` backport).
1493 """
1494 FieldInfo = import_cached_field_info()
1495
1496 with (
1497 self.model_type_stack.push(typed_dict_cls),
1498 self.defs.get_schema_or_ref(typed_dict_cls) as (
1499 typed_dict_ref,
1500 maybe_schema,
1501 ),
1502 ):
1503 if maybe_schema is not None:
1504 return maybe_schema
1505
1506 typevars_map = get_standard_typevars_map(typed_dict_cls)
1507 if origin is not None:
1508 typed_dict_cls = origin
1509
1510 if not _SUPPORTS_TYPEDDICT and type(typed_dict_cls).__module__ == 'typing':
1511 raise PydanticUserError(
1512 'Please use `typing_extensions.TypedDict` instead of `typing.TypedDict` on Python < 3.12.',
1513 code='typed-dict-version',
1514 )
1515
1516 try:
1517 # if a typed dictionary class doesn't have config, we use the parent's config, hence a default of `None`
1518 # see https://github.com/pydantic/pydantic/issues/10917
1519 config: ConfigDict | None = get_attribute_from_bases(typed_dict_cls, '__pydantic_config__')
1520 except AttributeError:
1521 config = None
1522
1523 with self._config_wrapper_stack.push(config):
1524 core_config = self._config_wrapper.core_config(title=typed_dict_cls.__name__)
1525
1526 required_keys: frozenset[str] = typed_dict_cls.__required_keys__
1527
1528 fields: dict[str, core_schema.TypedDictField] = {}
1529
1530 decorators = DecoratorInfos.build(typed_dict_cls)
1531
1532 if self._config_wrapper.use_attribute_docstrings:
1533 field_docstrings = extract_docstrings_from_cls(typed_dict_cls, use_inspect=True)
1534 else:
1535 field_docstrings = None
1536
1537 try:
1538 annotations = _typing_extra.get_cls_type_hints(typed_dict_cls, ns_resolver=self._ns_resolver)
1539 except NameError as e:
1540 raise PydanticUndefinedAnnotation.from_name_error(e) from e
1541
1542 readonly_fields: list[str] = []
1543
1544 for field_name, annotation in annotations.items():
1545 field_info = FieldInfo.from_annotation(annotation, _source=AnnotationSource.TYPED_DICT)
1546 field_info.annotation = replace_types(field_info.annotation, typevars_map)
1547
1548 required = (
1549 field_name in required_keys or 'required' in field_info._qualifiers
1550 ) and 'not_required' not in field_info._qualifiers
1551 if 'read_only' in field_info._qualifiers:
1552 readonly_fields.append(field_name)
1553
1554 if (
1555 field_docstrings is not None
1556 and field_info.description is None
1557 and field_name in field_docstrings
1558 ):
1559 field_info.description = field_docstrings[field_name]
1560 self._apply_field_title_generator_to_field_info(self._config_wrapper, field_info, field_name)
1561 fields[field_name] = self._generate_td_field_schema(
1562 field_name, field_info, decorators, required=required
1563 )
1564
1565 if readonly_fields:
1566 fields_repr = ', '.join(repr(f) for f in readonly_fields)
1567 plural = len(readonly_fields) >= 2
1568 warnings.warn(
1569 f'Item{"s" if plural else ""} {fields_repr} on TypedDict class {typed_dict_cls.__name__!r} '
1570 f'{"are" if plural else "is"} using the `ReadOnly` qualifier. Pydantic will not protect items '
1571 'from any mutation on dictionary instances.',
1572 UserWarning,
1573 )
1574
1575 td_schema = core_schema.typed_dict_schema(
1576 fields,
1577 cls=typed_dict_cls,
1578 computed_fields=[
1579 self._computed_field_schema(d, decorators.field_serializers)
1580 for d in decorators.computed_fields.values()
1581 ],
1582 ref=typed_dict_ref,
1583 config=core_config,
1584 )
1585
1586 schema = self._apply_model_serializers(td_schema, decorators.model_serializers.values())
1587 schema = apply_model_validators(schema, decorators.model_validators.values(), 'all')
1588 return self.defs.create_definition_reference_schema(schema)
1589
1590 def _namedtuple_schema(self, namedtuple_cls: Any, origin: Any) -> core_schema.CoreSchema:
1591 """Generate schema for a NamedTuple."""
1592 with (
1593 self.model_type_stack.push(namedtuple_cls),
1594 self.defs.get_schema_or_ref(namedtuple_cls) as (
1595 namedtuple_ref,
1596 maybe_schema,
1597 ),
1598 ):
1599 if maybe_schema is not None:
1600 return maybe_schema
1601 typevars_map = get_standard_typevars_map(namedtuple_cls)
1602 if origin is not None:
1603 namedtuple_cls = origin
1604
1605 try:
1606 annotations = _typing_extra.get_cls_type_hints(namedtuple_cls, ns_resolver=self._ns_resolver)
1607 except NameError as e:
1608 raise PydanticUndefinedAnnotation.from_name_error(e) from e
1609 if not annotations:
1610 # annotations is empty, happens if namedtuple_cls defined via collections.namedtuple(...)
1611 annotations: dict[str, Any] = {k: Any for k in namedtuple_cls._fields}
1612
1613 if typevars_map:
1614 annotations = {
1615 field_name: replace_types(annotation, typevars_map)
1616 for field_name, annotation in annotations.items()
1617 }
1618
1619 arguments_schema = core_schema.arguments_schema(
1620 [
1621 self._generate_parameter_schema(
1622 field_name,
1623 annotation,
1624 source=AnnotationSource.NAMED_TUPLE,
1625 default=namedtuple_cls._field_defaults.get(field_name, Parameter.empty),
1626 )
1627 for field_name, annotation in annotations.items()
1628 ],
1629 metadata={'pydantic_js_prefer_positional_arguments': True},
1630 )
1631 schema = core_schema.call_schema(arguments_schema, namedtuple_cls, ref=namedtuple_ref)
1632 return self.defs.create_definition_reference_schema(schema)
1633
1634 def _generate_parameter_schema(
1635 self,
1636 name: str,
1637 annotation: type[Any],
1638 source: AnnotationSource,
1639 default: Any = Parameter.empty,
1640 mode: Literal['positional_only', 'positional_or_keyword', 'keyword_only'] | None = None,
1641 ) -> core_schema.ArgumentsParameter:
1642 """Generate the definition of a field in a namedtuple or a parameter in a function signature.
1643
1644 This definition is meant to be used for the `'arguments'` core schema, which will be replaced
1645 in V3 by the `'arguments-v3`'.
1646 """
1647 FieldInfo = import_cached_field_info()
1648
1649 if default is Parameter.empty:
1650 field = FieldInfo.from_annotation(annotation, _source=source)
1651 else:
1652 field = FieldInfo.from_annotated_attribute(annotation, default, _source=source)
1653 assert field.annotation is not None, 'field.annotation should not be None when generating a schema'
1654 with self.field_name_stack.push(name):
1655 schema = self._apply_annotations(field.annotation, [field])
1656
1657 if not field.is_required():
1658 schema = wrap_default(field, schema)
1659
1660 parameter_schema = core_schema.arguments_parameter(name, schema)
1661 if mode is not None:
1662 parameter_schema['mode'] = mode
1663 if field.alias is not None:
1664 parameter_schema['alias'] = field.alias
1665 else:
1666 alias_generator = self._config_wrapper.alias_generator
1667 if isinstance(alias_generator, AliasGenerator) and alias_generator.alias is not None:
1668 parameter_schema['alias'] = alias_generator.alias(name)
1669 elif callable(alias_generator):
1670 parameter_schema['alias'] = alias_generator(name)
1671 return parameter_schema
1672
1673 def _generate_parameter_v3_schema(
1674 self,
1675 name: str,
1676 annotation: Any,
1677 source: AnnotationSource,
1678 mode: Literal[
1679 'positional_only',
1680 'positional_or_keyword',
1681 'keyword_only',
1682 'var_args',
1683 'var_kwargs_uniform',
1684 'var_kwargs_unpacked_typed_dict',
1685 ],
1686 default: Any = Parameter.empty,
1687 ) -> core_schema.ArgumentsV3Parameter:
1688 """Generate the definition of a parameter in a function signature.
1689
1690 This definition is meant to be used for the `'arguments-v3'` core schema, which will replace
1691 the `'arguments`' schema in V3.
1692 """
1693 FieldInfo = import_cached_field_info()
1694
1695 if default is Parameter.empty:
1696 field = FieldInfo.from_annotation(annotation, _source=source)
1697 else:
1698 field = FieldInfo.from_annotated_attribute(annotation, default, _source=source)
1699
1700 with self.field_name_stack.push(name):
1701 schema = self._apply_annotations(field.annotation, [field])
1702
1703 if not field.is_required():
1704 schema = wrap_default(field, schema)
1705
1706 parameter_schema = core_schema.arguments_v3_parameter(
1707 name=name,
1708 schema=schema,
1709 mode=mode,
1710 )
1711 if field.alias is not None:
1712 parameter_schema['alias'] = field.alias
1713 else:
1714 alias_generator = self._config_wrapper.alias_generator
1715 if isinstance(alias_generator, AliasGenerator) and alias_generator.alias is not None:
1716 parameter_schema['alias'] = alias_generator.alias(name)
1717 elif callable(alias_generator):
1718 parameter_schema['alias'] = alias_generator(name)
1719
1720 return parameter_schema
1721
1722 def _tuple_schema(self, tuple_type: Any) -> core_schema.CoreSchema:
1723 """Generate schema for a Tuple, e.g. `tuple[int, str]` or `tuple[int, ...]`."""
1724 # TODO: do we really need to resolve type vars here?
1725 typevars_map = get_standard_typevars_map(tuple_type)
1726 params = self._get_args_resolving_forward_refs(tuple_type)
1727
1728 if typevars_map and params:
1729 params = tuple(replace_types(param, typevars_map) for param in params)
1730
1731 # NOTE: subtle difference: `tuple[()]` gives `params=()`, whereas `typing.Tuple[()]` gives `params=((),)`
1732 # This is only true for <3.11, on Python 3.11+ `typing.Tuple[()]` gives `params=()`
1733 if not params:
1734 if tuple_type in TUPLE_TYPES:
1735 return core_schema.tuple_schema([core_schema.any_schema()], variadic_item_index=0)
1736 else:
1737 # special case for `tuple[()]` which means `tuple[]` - an empty tuple
1738 return core_schema.tuple_schema([])
1739 elif params[-1] is Ellipsis:
1740 if len(params) == 2:
1741 return core_schema.tuple_schema([self.generate_schema(params[0])], variadic_item_index=0)
1742 else:
1743 # TODO: something like https://github.com/pydantic/pydantic/issues/5952
1744 raise ValueError('Variable tuples can only have one type')
1745 elif len(params) == 1 and params[0] == ():
1746 # special case for `tuple[()]` which means `tuple[]` - an empty tuple
1747 # NOTE: This conditional can be removed when we drop support for Python 3.10.
1748 return core_schema.tuple_schema([])
1749 else:
1750 return core_schema.tuple_schema([self.generate_schema(param) for param in params])
1751
1752 def _type_schema(self) -> core_schema.CoreSchema:
1753 return core_schema.custom_error_schema(
1754 core_schema.is_instance_schema(type),
1755 custom_error_type='is_type',
1756 custom_error_message='Input should be a type',
1757 )
1758
1759 def _zoneinfo_schema(self) -> core_schema.CoreSchema:
1760 """Generate schema for a zone_info.ZoneInfo object"""
1761 from ._validators import validate_str_is_valid_iana_tz
1762
1763 metadata = {'pydantic_js_functions': [lambda _1, _2: {'type': 'string', 'format': 'zoneinfo'}]}
1764 return core_schema.no_info_plain_validator_function(
1765 validate_str_is_valid_iana_tz,
1766 serialization=core_schema.to_string_ser_schema(),
1767 metadata=metadata,
1768 )
1769
1770 def _union_is_subclass_schema(self, union_type: Any) -> core_schema.CoreSchema:
1771 """Generate schema for `type[Union[X, ...]]`."""
1772 args = self._get_args_resolving_forward_refs(union_type, required=True)
1773 return core_schema.union_schema([self.generate_schema(type[args]) for args in args])
1774
1775 def _subclass_schema(self, type_: Any) -> core_schema.CoreSchema:
1776 """Generate schema for a type, e.g. `type[int]`."""
1777 type_param = self._get_first_arg_or_any(type_)
1778
1779 # Assume `type[Annotated[<typ>, ...]]` is equivalent to `type[<typ>]`:
1780 type_param = _typing_extra.annotated_type(type_param) or type_param
1781
1782 if typing_objects.is_any(type_param):
1783 return self._type_schema()
1784 elif typing_objects.is_typealiastype(type_param):
1785 return self.generate_schema(type[type_param.__value__])
1786 elif typing_objects.is_typevar(type_param):
1787 if type_param.__bound__:
1788 if is_union_origin(get_origin(type_param.__bound__)):
1789 return self._union_is_subclass_schema(type_param.__bound__)
1790 return core_schema.is_subclass_schema(type_param.__bound__)
1791 elif type_param.__constraints__:
1792 return core_schema.union_schema([self.generate_schema(type[c]) for c in type_param.__constraints__])
1793 else:
1794 return self._type_schema()
1795 elif is_union_origin(get_origin(type_param)):
1796 return self._union_is_subclass_schema(type_param)
1797 else:
1798 if typing_objects.is_self(type_param):
1799 type_param = self._resolve_self_type(type_param)
1800 if _typing_extra.is_generic_alias(type_param):
1801 raise PydanticUserError(
1802 'Subscripting `type[]` with an already parametrized type is not supported. '
1803 f'Instead of using type[{type_param!r}], use type[{_repr.display_as_type(get_origin(type_param))}].',
1804 code=None,
1805 )
1806 if not inspect.isclass(type_param):
1807 # when using type[None], this doesn't type convert to type[NoneType], and None isn't a class
1808 # so we handle it manually here
1809 if type_param is None:
1810 return core_schema.is_subclass_schema(_typing_extra.NoneType)
1811 raise TypeError(f'Expected a class, got {type_param!r}')
1812 return core_schema.is_subclass_schema(type_param)
1813
1814 def _sequence_schema(self, items_type: Any) -> core_schema.CoreSchema:
1815 """Generate schema for a Sequence, e.g. `Sequence[int]`."""
1816 from ._serializers import serialize_sequence_via_list
1817
1818 item_type_schema = self.generate_schema(items_type)
1819 list_schema = core_schema.list_schema(item_type_schema)
1820
1821 json_schema = smart_deepcopy(list_schema)
1822 python_schema = core_schema.is_instance_schema(typing.Sequence, cls_repr='Sequence')
1823 if not typing_objects.is_any(items_type):
1824 from ._validators import sequence_validator
1825
1826 python_schema = core_schema.chain_schema(
1827 [python_schema, core_schema.no_info_wrap_validator_function(sequence_validator, list_schema)],
1828 )
1829
1830 serialization = core_schema.wrap_serializer_function_ser_schema(
1831 serialize_sequence_via_list, schema=item_type_schema, info_arg=True
1832 )
1833 return core_schema.json_or_python_schema(
1834 json_schema=json_schema, python_schema=python_schema, serialization=serialization
1835 )
1836
1837 def _iterable_schema(self, type_: Any) -> core_schema.GeneratorSchema:
1838 """Generate a schema for an `Iterable`."""
1839 item_type = self._get_first_arg_or_any(type_)
1840
1841 return core_schema.generator_schema(self.generate_schema(item_type))
1842
1843 def _pattern_schema(self, pattern_type: Any) -> core_schema.CoreSchema:
1844 from . import _validators
1845
1846 metadata = {'pydantic_js_functions': [lambda _1, _2: {'type': 'string', 'format': 'regex'}]}
1847 ser = core_schema.plain_serializer_function_ser_schema(
1848 attrgetter('pattern'), when_used='json', return_schema=core_schema.str_schema()
1849 )
1850 if pattern_type is typing.Pattern or pattern_type is re.Pattern:
1851 # bare type
1852 return core_schema.no_info_plain_validator_function(
1853 _validators.pattern_either_validator, serialization=ser, metadata=metadata
1854 )
1855
1856 param = self._get_args_resolving_forward_refs(
1857 pattern_type,
1858 required=True,
1859 )[0]
1860 if param is str:
1861 return core_schema.no_info_plain_validator_function(
1862 _validators.pattern_str_validator, serialization=ser, metadata=metadata
1863 )
1864 elif param is bytes:
1865 return core_schema.no_info_plain_validator_function(
1866 _validators.pattern_bytes_validator, serialization=ser, metadata=metadata
1867 )
1868 else:
1869 raise PydanticSchemaGenerationError(f'Unable to generate pydantic-core schema for {pattern_type!r}.')
1870
1871 def _hashable_schema(self) -> core_schema.CoreSchema:
1872 return core_schema.custom_error_schema(
1873 schema=core_schema.json_or_python_schema(
1874 json_schema=core_schema.chain_schema(
1875 [core_schema.any_schema(), core_schema.is_instance_schema(collections.abc.Hashable)]
1876 ),
1877 python_schema=core_schema.is_instance_schema(collections.abc.Hashable),
1878 ),
1879 custom_error_type='is_hashable',
1880 custom_error_message='Input should be hashable',
1881 )
1882
1883 def _dataclass_schema(
1884 self, dataclass: type[StandardDataclass], origin: type[StandardDataclass] | None
1885 ) -> core_schema.CoreSchema:
1886 """Generate schema for a dataclass."""
1887 with (
1888 self.model_type_stack.push(dataclass),
1889 self.defs.get_schema_or_ref(dataclass) as (
1890 dataclass_ref,
1891 maybe_schema,
1892 ),
1893 ):
1894 if maybe_schema is not None:
1895 return maybe_schema
1896
1897 schema = dataclass.__dict__.get('__pydantic_core_schema__')
1898 if schema is not None and not isinstance(schema, MockCoreSchema):
1899 if schema['type'] == 'definitions':
1900 schema = self.defs.unpack_definitions(schema)
1901 ref = get_ref(schema)
1902 if ref:
1903 return self.defs.create_definition_reference_schema(schema)
1904 else:
1905 return schema
1906
1907 typevars_map = get_standard_typevars_map(dataclass)
1908 if origin is not None:
1909 dataclass = origin
1910
1911 # if (plain) dataclass doesn't have config, we use the parent's config, hence a default of `None`
1912 # (Pydantic dataclasses have an empty dict config by default).
1913 # see https://github.com/pydantic/pydantic/issues/10917
1914 config = getattr(dataclass, '__pydantic_config__', None)
1915
1916 from ..dataclasses import is_pydantic_dataclass
1917
1918 with self._ns_resolver.push(dataclass), self._config_wrapper_stack.push(config):
1919 if is_pydantic_dataclass(dataclass):
1920 if dataclass.__pydantic_fields_complete__():
1921 # Copy the field info instances to avoid mutating the `FieldInfo` instances
1922 # of the generic dataclass generic origin (e.g. `apply_typevars_map` below).
1923 # Note that we don't apply `deepcopy` on `__pydantic_fields__` because we
1924 # don't want to copy the `FieldInfo` attributes:
1925 fields = {
1926 f_name: copy(field_info) for f_name, field_info in dataclass.__pydantic_fields__.items()
1927 }
1928 if typevars_map:
1929 for field in fields.values():
1930 field.apply_typevars_map(typevars_map, *self._types_namespace)
1931 else:
1932 try:
1933 fields = rebuild_dataclass_fields(
1934 dataclass,
1935 config_wrapper=self._config_wrapper,
1936 ns_resolver=self._ns_resolver,
1937 typevars_map=typevars_map or {},
1938 )
1939 except NameError as e:
1940 raise PydanticUndefinedAnnotation.from_name_error(e) from e
1941 else:
1942 fields = collect_dataclass_fields(
1943 dataclass,
1944 typevars_map=typevars_map,
1945 config_wrapper=self._config_wrapper,
1946 )
1947
1948 if self._config_wrapper.extra == 'allow':
1949 # disallow combination of init=False on a dataclass field and extra='allow' on a dataclass
1950 for field_name, field in fields.items():
1951 if field.init is False:
1952 raise PydanticUserError(
1953 f'Field {field_name} has `init=False` and dataclass has config setting `extra="allow"`. '
1954 f'This combination is not allowed.',
1955 code='dataclass-init-false-extra-allow',
1956 )
1957
1958 decorators = dataclass.__dict__.get('__pydantic_decorators__') or DecoratorInfos.build(dataclass)
1959 # Move kw_only=False args to the start of the list, as this is how vanilla dataclasses work.
1960 # Note that when kw_only is missing or None, it is treated as equivalent to kw_only=True
1961 args = sorted(
1962 (self._generate_dc_field_schema(k, v, decorators) for k, v in fields.items()),
1963 key=lambda a: a.get('kw_only') is not False,
1964 )
1965 has_post_init = hasattr(dataclass, '__post_init__')
1966 has_slots = hasattr(dataclass, '__slots__')
1967
1968 args_schema = core_schema.dataclass_args_schema(
1969 dataclass.__name__,
1970 args,
1971 computed_fields=[
1972 self._computed_field_schema(d, decorators.field_serializers)
1973 for d in decorators.computed_fields.values()
1974 ],
1975 collect_init_only=has_post_init,
1976 )
1977
1978 inner_schema = apply_validators(args_schema, decorators.root_validators.values(), None)
1979
1980 model_validators = decorators.model_validators.values()
1981 inner_schema = apply_model_validators(inner_schema, model_validators, 'inner')
1982
1983 core_config = self._config_wrapper.core_config(title=dataclass.__name__)
1984
1985 dc_schema = core_schema.dataclass_schema(
1986 dataclass,
1987 inner_schema,
1988 generic_origin=origin,
1989 post_init=has_post_init,
1990 ref=dataclass_ref,
1991 fields=[field.name for field in dataclasses.fields(dataclass)],
1992 slots=has_slots,
1993 config=core_config,
1994 # we don't use a custom __setattr__ for dataclasses, so we must
1995 # pass along the frozen config setting to the pydantic-core schema
1996 frozen=self._config_wrapper_stack.tail.frozen,
1997 )
1998 schema = self._apply_model_serializers(dc_schema, decorators.model_serializers.values())
1999 schema = apply_model_validators(schema, model_validators, 'outer')
2000 return self.defs.create_definition_reference_schema(schema)
2001
2002 def _call_schema(self, function: ValidateCallSupportedTypes) -> core_schema.CallSchema:
2003 """Generate schema for a Callable.
2004
2005 TODO support functional validators once we support them in Config
2006 """
2007 arguments_schema = self._arguments_schema(function)
2008
2009 return_schema: core_schema.CoreSchema | None = None
2010 config_wrapper = self._config_wrapper
2011 if config_wrapper.validate_return:
2012 sig = signature(function)
2013 return_hint = sig.return_annotation
2014 if return_hint is not sig.empty:
2015 globalns, localns = self._types_namespace
2016 type_hints = _typing_extra.get_function_type_hints(
2017 function, globalns=globalns, localns=localns, include_keys={'return'}
2018 )
2019 return_schema = self.generate_schema(type_hints['return'])
2020
2021 return core_schema.call_schema(
2022 arguments_schema,
2023 function,
2024 return_schema=return_schema,
2025 )
2026
2027 def _arguments_schema(
2028 self, function: ValidateCallSupportedTypes, parameters_callback: ParametersCallback | None = None
2029 ) -> core_schema.ArgumentsSchema:
2030 """Generate schema for a Signature."""
2031 mode_lookup: dict[_ParameterKind, Literal['positional_only', 'positional_or_keyword', 'keyword_only']] = {
2032 Parameter.POSITIONAL_ONLY: 'positional_only',
2033 Parameter.POSITIONAL_OR_KEYWORD: 'positional_or_keyword',
2034 Parameter.KEYWORD_ONLY: 'keyword_only',
2035 }
2036
2037 sig = signature(function)
2038 globalns, localns = self._types_namespace
2039 type_hints = _typing_extra.get_function_type_hints(function, globalns=globalns, localns=localns)
2040
2041 arguments_list: list[core_schema.ArgumentsParameter] = []
2042 var_args_schema: core_schema.CoreSchema | None = None
2043 var_kwargs_schema: core_schema.CoreSchema | None = None
2044 var_kwargs_mode: core_schema.VarKwargsMode | None = None
2045
2046 for i, (name, p) in enumerate(sig.parameters.items()):
2047 if p.annotation is sig.empty:
2048 annotation = typing.cast(Any, Any)
2049 else:
2050 annotation = type_hints[name]
2051
2052 if parameters_callback is not None:
2053 result = parameters_callback(i, name, annotation)
2054 if result == 'skip':
2055 continue
2056
2057 parameter_mode = mode_lookup.get(p.kind)
2058 if parameter_mode is not None:
2059 arg_schema = self._generate_parameter_schema(
2060 name, annotation, AnnotationSource.FUNCTION, p.default, parameter_mode
2061 )
2062 arguments_list.append(arg_schema)
2063 elif p.kind == Parameter.VAR_POSITIONAL:
2064 var_args_schema = self.generate_schema(annotation)
2065 else:
2066 assert p.kind == Parameter.VAR_KEYWORD, p.kind
2067
2068 unpack_type = _typing_extra.unpack_type(annotation)
2069 if unpack_type is not None:
2070 origin = get_origin(unpack_type) or unpack_type
2071 if not is_typeddict(origin):
2072 raise PydanticUserError(
2073 f'Expected a `TypedDict` class inside `Unpack[...]`, got {unpack_type!r}',
2074 code='unpack-typed-dict',
2075 )
2076 non_pos_only_param_names = {
2077 name for name, p in sig.parameters.items() if p.kind != Parameter.POSITIONAL_ONLY
2078 }
2079 overlapping_params = non_pos_only_param_names.intersection(origin.__annotations__)
2080 if overlapping_params:
2081 raise PydanticUserError(
2082 f'Typed dictionary {origin.__name__!r} overlaps with parameter'
2083 f'{"s" if len(overlapping_params) >= 2 else ""} '
2084 f'{", ".join(repr(p) for p in sorted(overlapping_params))}',
2085 code='overlapping-unpack-typed-dict',
2086 )
2087
2088 var_kwargs_mode = 'unpacked-typed-dict'
2089 var_kwargs_schema = self._typed_dict_schema(unpack_type, get_origin(unpack_type))
2090 else:
2091 var_kwargs_mode = 'uniform'
2092 var_kwargs_schema = self.generate_schema(annotation)
2093
2094 return core_schema.arguments_schema(
2095 arguments_list,
2096 var_args_schema=var_args_schema,
2097 var_kwargs_mode=var_kwargs_mode,
2098 var_kwargs_schema=var_kwargs_schema,
2099 validate_by_name=self._config_wrapper.validate_by_name,
2100 )
2101
2102 def _arguments_v3_schema(
2103 self, function: ValidateCallSupportedTypes, parameters_callback: ParametersCallback | None = None
2104 ) -> core_schema.ArgumentsV3Schema:
2105 mode_lookup: dict[
2106 _ParameterKind, Literal['positional_only', 'positional_or_keyword', 'var_args', 'keyword_only']
2107 ] = {
2108 Parameter.POSITIONAL_ONLY: 'positional_only',
2109 Parameter.POSITIONAL_OR_KEYWORD: 'positional_or_keyword',
2110 Parameter.VAR_POSITIONAL: 'var_args',
2111 Parameter.KEYWORD_ONLY: 'keyword_only',
2112 }
2113
2114 sig = signature(function)
2115 globalns, localns = self._types_namespace
2116 type_hints = _typing_extra.get_function_type_hints(function, globalns=globalns, localns=localns)
2117
2118 parameters_list: list[core_schema.ArgumentsV3Parameter] = []
2119
2120 for i, (name, p) in enumerate(sig.parameters.items()):
2121 if parameters_callback is not None:
2122 result = parameters_callback(i, name, p.annotation)
2123 if result == 'skip':
2124 continue
2125
2126 if p.annotation is Parameter.empty:
2127 annotation = typing.cast(Any, Any)
2128 else:
2129 annotation = type_hints[name]
2130
2131 parameter_mode = mode_lookup.get(p.kind)
2132 if parameter_mode is None:
2133 assert p.kind == Parameter.VAR_KEYWORD, p.kind
2134
2135 unpack_type = _typing_extra.unpack_type(annotation)
2136 if unpack_type is not None:
2137 origin = get_origin(unpack_type) or unpack_type
2138 if not is_typeddict(origin):
2139 raise PydanticUserError(
2140 f'Expected a `TypedDict` class inside `Unpack[...]`, got {unpack_type!r}',
2141 code='unpack-typed-dict',
2142 )
2143 non_pos_only_param_names = {
2144 name for name, p in sig.parameters.items() if p.kind != Parameter.POSITIONAL_ONLY
2145 }
2146 overlapping_params = non_pos_only_param_names.intersection(origin.__annotations__)
2147 if overlapping_params:
2148 raise PydanticUserError(
2149 f'Typed dictionary {origin.__name__!r} overlaps with parameter'
2150 f'{"s" if len(overlapping_params) >= 2 else ""} '
2151 f'{", ".join(repr(p) for p in sorted(overlapping_params))}',
2152 code='overlapping-unpack-typed-dict',
2153 )
2154 parameter_mode = 'var_kwargs_unpacked_typed_dict'
2155 annotation = unpack_type
2156 else:
2157 parameter_mode = 'var_kwargs_uniform'
2158
2159 parameters_list.append(
2160 self._generate_parameter_v3_schema(
2161 name, annotation, AnnotationSource.FUNCTION, parameter_mode, default=p.default
2162 )
2163 )
2164
2165 return core_schema.arguments_v3_schema(
2166 parameters_list,
2167 validate_by_name=self._config_wrapper.validate_by_name,
2168 )
2169
2170 def _unsubstituted_typevar_schema(self, typevar: typing.TypeVar) -> core_schema.CoreSchema:
2171 try:
2172 has_default = typevar.has_default()
2173 except AttributeError:
2174 # Happens if using `typing.TypeVar` (and not `typing_extensions`) on Python < 3.13
2175 pass
2176 else:
2177 if has_default:
2178 return self.generate_schema(typevar.__default__)
2179
2180 if constraints := typevar.__constraints__:
2181 return self._union_schema(typing.Union[constraints])
2182
2183 if bound := typevar.__bound__:
2184 schema = self.generate_schema(bound)
2185 schema['serialization'] = core_schema.wrap_serializer_function_ser_schema(
2186 lambda x, h: h(x),
2187 schema=core_schema.any_schema(),
2188 )
2189 return schema
2190
2191 return core_schema.any_schema()
2192
2193 def _computed_field_schema(
2194 self,
2195 d: Decorator[ComputedFieldInfo],
2196 field_serializers: dict[str, Decorator[FieldSerializerDecoratorInfo]],
2197 ) -> core_schema.ComputedField:
2198 if d.info.return_type is not PydanticUndefined:
2199 return_type = d.info.return_type
2200 else:
2201 try:
2202 # Do not pass in globals as the function could be defined in a different module.
2203 # Instead, let `get_callable_return_type` infer the globals to use, but still pass
2204 # in locals that may contain a parent/rebuild namespace:
2205 return_type = _decorators.get_callable_return_type(d.func, localns=self._types_namespace.locals)
2206 except NameError as e:
2207 raise PydanticUndefinedAnnotation.from_name_error(e) from e
2208 if return_type is PydanticUndefined:
2209 raise PydanticUserError(
2210 'Computed field is missing return type annotation or specifying `return_type`'
2211 ' to the `@computed_field` decorator (e.g. `@computed_field(return_type=int | str)`)',
2212 code='model-field-missing-annotation',
2213 )
2214
2215 return_type = replace_types(return_type, self._typevars_map)
2216 # Create a new ComputedFieldInfo so that different type parametrizations of the same
2217 # generic model's computed field can have different return types.
2218 d.info = dataclasses.replace(d.info, return_type=return_type)
2219 return_type_schema = self.generate_schema(return_type)
2220 # Apply serializers to computed field if there exist
2221 return_type_schema = self._apply_field_serializers(
2222 return_type_schema,
2223 filter_field_decorator_info_by_field(field_serializers.values(), d.cls_var_name),
2224 )
2225
2226 alias_generator = self._config_wrapper.alias_generator
2227 if alias_generator is not None:
2228 self._apply_alias_generator_to_computed_field_info(
2229 alias_generator=alias_generator, computed_field_info=d.info, computed_field_name=d.cls_var_name
2230 )
2231 self._apply_field_title_generator_to_field_info(self._config_wrapper, d.info, d.cls_var_name)
2232
2233 pydantic_js_updates, pydantic_js_extra = _extract_json_schema_info_from_field_info(d.info)
2234 core_metadata: dict[str, Any] = {}
2235 update_core_metadata(
2236 core_metadata,
2237 pydantic_js_updates={'readOnly': True, **(pydantic_js_updates if pydantic_js_updates else {})},
2238 pydantic_js_extra=pydantic_js_extra,
2239 )
2240 return core_schema.computed_field(
2241 d.cls_var_name, return_schema=return_type_schema, alias=d.info.alias, metadata=core_metadata
2242 )
2243
2244 def _annotated_schema(self, annotated_type: Any) -> core_schema.CoreSchema:
2245 """Generate schema for an Annotated type, e.g. `Annotated[int, Field(...)]` or `Annotated[int, Gt(0)]`."""
2246 FieldInfo = import_cached_field_info()
2247 source_type, *annotations = self._get_args_resolving_forward_refs(
2248 annotated_type,
2249 required=True,
2250 )
2251 schema = self._apply_annotations(source_type, annotations)
2252 # put the default validator last so that TypeAdapter.get_default_value() works
2253 # even if there are function validators involved
2254 for annotation in annotations:
2255 if isinstance(annotation, FieldInfo):
2256 schema = wrap_default(annotation, schema)
2257 return schema
2258
2259 def _apply_annotations(
2260 self,
2261 source_type: Any,
2262 annotations: list[Any],
2263 transform_inner_schema: Callable[[CoreSchema], CoreSchema] = lambda x: x,
2264 ) -> CoreSchema:
2265 """Apply arguments from `Annotated` or from `FieldInfo` to a schema.
2266
2267 This gets called by `GenerateSchema._annotated_schema` but differs from it in that it does
2268 not expect `source_type` to be an `Annotated` object, it expects it to be the first argument of that
2269 (in other words, `GenerateSchema._annotated_schema` just unpacks `Annotated`, this process it).
2270 """
2271 annotations = list(_known_annotated_metadata.expand_grouped_metadata(annotations))
2272
2273 pydantic_js_annotation_functions: list[GetJsonSchemaFunction] = []
2274
2275 def inner_handler(obj: Any) -> CoreSchema:
2276 schema = self._generate_schema_from_get_schema_method(obj, source_type)
2277
2278 if schema is None:
2279 schema = self._generate_schema_inner(obj)
2280
2281 metadata_js_function = _extract_get_pydantic_json_schema(obj)
2282 if metadata_js_function is not None:
2283 metadata_schema = resolve_original_schema(schema, self.defs)
2284 if metadata_schema is not None:
2285 self._add_js_function(metadata_schema, metadata_js_function)
2286 return transform_inner_schema(schema)
2287
2288 get_inner_schema = CallbackGetCoreSchemaHandler(inner_handler, self)
2289
2290 for annotation in annotations:
2291 if annotation is None:
2292 continue
2293 get_inner_schema = self._get_wrapped_inner_schema(
2294 get_inner_schema, annotation, pydantic_js_annotation_functions
2295 )
2296
2297 schema = get_inner_schema(source_type)
2298 if pydantic_js_annotation_functions:
2299 core_metadata = schema.setdefault('metadata', {})
2300 update_core_metadata(core_metadata, pydantic_js_annotation_functions=pydantic_js_annotation_functions)
2301 return _add_custom_serialization_from_json_encoders(self._config_wrapper.json_encoders, source_type, schema)
2302
2303 def _apply_single_annotation(self, schema: core_schema.CoreSchema, metadata: Any) -> core_schema.CoreSchema:
2304 FieldInfo = import_cached_field_info()
2305
2306 if isinstance(metadata, FieldInfo):
2307 for field_metadata in metadata.metadata:
2308 schema = self._apply_single_annotation(schema, field_metadata)
2309
2310 if metadata.discriminator is not None:
2311 schema = self._apply_discriminator_to_union(schema, metadata.discriminator)
2312 return schema
2313
2314 if schema['type'] == 'nullable':
2315 # for nullable schemas, metadata is automatically applied to the inner schema
2316 inner = schema.get('schema', core_schema.any_schema())
2317 inner = self._apply_single_annotation(inner, metadata)
2318 if inner:
2319 schema['schema'] = inner
2320 return schema
2321
2322 original_schema = schema
2323 ref = schema.get('ref')
2324 if ref is not None:
2325 schema = schema.copy()
2326 new_ref = ref + f'_{repr(metadata)}'
2327 if (existing := self.defs.get_schema_from_ref(new_ref)) is not None:
2328 return existing
2329 schema['ref'] = new_ref # pyright: ignore[reportGeneralTypeIssues]
2330 elif schema['type'] == 'definition-ref':
2331 ref = schema['schema_ref']
2332 if (referenced_schema := self.defs.get_schema_from_ref(ref)) is not None:
2333 schema = referenced_schema.copy()
2334 new_ref = ref + f'_{repr(metadata)}'
2335 if (existing := self.defs.get_schema_from_ref(new_ref)) is not None:
2336 return existing
2337 schema['ref'] = new_ref # pyright: ignore[reportGeneralTypeIssues]
2338
2339 maybe_updated_schema = _known_annotated_metadata.apply_known_metadata(metadata, schema)
2340
2341 if maybe_updated_schema is not None:
2342 return maybe_updated_schema
2343 return original_schema
2344
2345 def _apply_single_annotation_json_schema(
2346 self, schema: core_schema.CoreSchema, metadata: Any
2347 ) -> core_schema.CoreSchema:
2348 FieldInfo = import_cached_field_info()
2349
2350 if isinstance(metadata, FieldInfo):
2351 for field_metadata in metadata.metadata:
2352 schema = self._apply_single_annotation_json_schema(schema, field_metadata)
2353
2354 pydantic_js_updates, pydantic_js_extra = _extract_json_schema_info_from_field_info(metadata)
2355 core_metadata = schema.setdefault('metadata', {})
2356 update_core_metadata(
2357 core_metadata, pydantic_js_updates=pydantic_js_updates, pydantic_js_extra=pydantic_js_extra
2358 )
2359 return schema
2360
2361 def _get_wrapped_inner_schema(
2362 self,
2363 get_inner_schema: GetCoreSchemaHandler,
2364 annotation: Any,
2365 pydantic_js_annotation_functions: list[GetJsonSchemaFunction],
2366 ) -> CallbackGetCoreSchemaHandler:
2367 annotation_get_schema: GetCoreSchemaFunction | None = getattr(annotation, '__get_pydantic_core_schema__', None)
2368
2369 def new_handler(source: Any) -> core_schema.CoreSchema:
2370 if annotation_get_schema is not None:
2371 schema = annotation_get_schema(source, get_inner_schema)
2372 else:
2373 schema = get_inner_schema(source)
2374 schema = self._apply_single_annotation(schema, annotation)
2375 schema = self._apply_single_annotation_json_schema(schema, annotation)
2376
2377 metadata_js_function = _extract_get_pydantic_json_schema(annotation)
2378 if metadata_js_function is not None:
2379 pydantic_js_annotation_functions.append(metadata_js_function)
2380 return schema
2381
2382 return CallbackGetCoreSchemaHandler(new_handler, self)
2383
2384 def _apply_field_serializers(
2385 self,
2386 schema: core_schema.CoreSchema,
2387 serializers: list[Decorator[FieldSerializerDecoratorInfo]],
2388 ) -> core_schema.CoreSchema:
2389 """Apply field serializers to a schema."""
2390 if serializers:
2391 schema = copy(schema)
2392 if schema['type'] == 'definitions':
2393 inner_schema = schema['schema']
2394 schema['schema'] = self._apply_field_serializers(inner_schema, serializers)
2395 return schema
2396 elif 'ref' in schema:
2397 schema = self.defs.create_definition_reference_schema(schema)
2398
2399 # use the last serializer to make it easy to override a serializer set on a parent model
2400 serializer = serializers[-1]
2401 is_field_serializer, info_arg = inspect_field_serializer(serializer.func, serializer.info.mode)
2402
2403 if serializer.info.return_type is not PydanticUndefined:
2404 return_type = serializer.info.return_type
2405 else:
2406 try:
2407 # Do not pass in globals as the function could be defined in a different module.
2408 # Instead, let `get_callable_return_type` infer the globals to use, but still pass
2409 # in locals that may contain a parent/rebuild namespace:
2410 return_type = _decorators.get_callable_return_type(
2411 serializer.func, localns=self._types_namespace.locals
2412 )
2413 except NameError as e:
2414 raise PydanticUndefinedAnnotation.from_name_error(e) from e
2415
2416 if return_type is PydanticUndefined:
2417 return_schema = None
2418 else:
2419 return_schema = self.generate_schema(return_type)
2420
2421 if serializer.info.mode == 'wrap':
2422 schema['serialization'] = core_schema.wrap_serializer_function_ser_schema(
2423 serializer.func,
2424 is_field_serializer=is_field_serializer,
2425 info_arg=info_arg,
2426 return_schema=return_schema,
2427 when_used=serializer.info.when_used,
2428 )
2429 else:
2430 assert serializer.info.mode == 'plain'
2431 schema['serialization'] = core_schema.plain_serializer_function_ser_schema(
2432 serializer.func,
2433 is_field_serializer=is_field_serializer,
2434 info_arg=info_arg,
2435 return_schema=return_schema,
2436 when_used=serializer.info.when_used,
2437 )
2438 return schema
2439
2440 def _apply_model_serializers(
2441 self, schema: core_schema.CoreSchema, serializers: Iterable[Decorator[ModelSerializerDecoratorInfo]]
2442 ) -> core_schema.CoreSchema:
2443 """Apply model serializers to a schema."""
2444 ref: str | None = schema.pop('ref', None) # type: ignore
2445 if serializers:
2446 serializer = list(serializers)[-1]
2447 info_arg = inspect_model_serializer(serializer.func, serializer.info.mode)
2448
2449 if serializer.info.return_type is not PydanticUndefined:
2450 return_type = serializer.info.return_type
2451 else:
2452 try:
2453 # Do not pass in globals as the function could be defined in a different module.
2454 # Instead, let `get_callable_return_type` infer the globals to use, but still pass
2455 # in locals that may contain a parent/rebuild namespace:
2456 return_type = _decorators.get_callable_return_type(
2457 serializer.func, localns=self._types_namespace.locals
2458 )
2459 except NameError as e:
2460 raise PydanticUndefinedAnnotation.from_name_error(e) from e
2461
2462 if return_type is PydanticUndefined:
2463 return_schema = None
2464 else:
2465 return_schema = self.generate_schema(return_type)
2466
2467 if serializer.info.mode == 'wrap':
2468 ser_schema: core_schema.SerSchema = core_schema.wrap_serializer_function_ser_schema(
2469 serializer.func,
2470 info_arg=info_arg,
2471 return_schema=return_schema,
2472 when_used=serializer.info.when_used,
2473 )
2474 else:
2475 # plain
2476 ser_schema = core_schema.plain_serializer_function_ser_schema(
2477 serializer.func,
2478 info_arg=info_arg,
2479 return_schema=return_schema,
2480 when_used=serializer.info.when_used,
2481 )
2482 schema['serialization'] = ser_schema
2483 if ref:
2484 schema['ref'] = ref # type: ignore
2485 return schema
2486
2487
2488_VALIDATOR_F_MATCH: Mapping[
2489 tuple[FieldValidatorModes, Literal['no-info', 'with-info']],
2490 Callable[[Callable[..., Any], core_schema.CoreSchema, str | None], core_schema.CoreSchema],
2491] = {
2492 ('before', 'no-info'): lambda f, schema, _: core_schema.no_info_before_validator_function(f, schema),
2493 ('after', 'no-info'): lambda f, schema, _: core_schema.no_info_after_validator_function(f, schema),
2494 ('plain', 'no-info'): lambda f, _1, _2: core_schema.no_info_plain_validator_function(f),
2495 ('wrap', 'no-info'): lambda f, schema, _: core_schema.no_info_wrap_validator_function(f, schema),
2496 ('before', 'with-info'): lambda f, schema, field_name: core_schema.with_info_before_validator_function(
2497 f, schema, field_name=field_name
2498 ),
2499 ('after', 'with-info'): lambda f, schema, field_name: core_schema.with_info_after_validator_function(
2500 f, schema, field_name=field_name
2501 ),
2502 ('plain', 'with-info'): lambda f, _, field_name: core_schema.with_info_plain_validator_function(
2503 f, field_name=field_name
2504 ),
2505 ('wrap', 'with-info'): lambda f, schema, field_name: core_schema.with_info_wrap_validator_function(
2506 f, schema, field_name=field_name
2507 ),
2508}
2509
2510
2511# TODO V3: this function is only used for deprecated decorators. It should
2512# be removed once we drop support for those.
2513def apply_validators(
2514 schema: core_schema.CoreSchema,
2515 validators: Iterable[Decorator[RootValidatorDecoratorInfo]]
2516 | Iterable[Decorator[ValidatorDecoratorInfo]]
2517 | Iterable[Decorator[FieldValidatorDecoratorInfo]],
2518 field_name: str | None,
2519) -> core_schema.CoreSchema:
2520 """Apply validators to a schema.
2521
2522 Args:
2523 schema: The schema to apply validators on.
2524 validators: An iterable of validators.
2525 field_name: The name of the field if validators are being applied to a model field.
2526
2527 Returns:
2528 The updated schema.
2529 """
2530 for validator in validators:
2531 info_arg = inspect_validator(validator.func, validator.info.mode)
2532 val_type = 'with-info' if info_arg else 'no-info'
2533
2534 schema = _VALIDATOR_F_MATCH[(validator.info.mode, val_type)](validator.func, schema, field_name)
2535 return schema
2536
2537
2538def _validators_require_validate_default(validators: Iterable[Decorator[ValidatorDecoratorInfo]]) -> bool:
2539 """In v1, if any of the validators for a field had `always=True`, the default value would be validated.
2540
2541 This serves as an auxiliary function for re-implementing that logic, by looping over a provided
2542 collection of (v1-style) ValidatorDecoratorInfo's and checking if any of them have `always=True`.
2543
2544 We should be able to drop this function and the associated logic calling it once we drop support
2545 for v1-style validator decorators. (Or we can extend it and keep it if we add something equivalent
2546 to the v1-validator `always` kwarg to `field_validator`.)
2547 """
2548 for validator in validators:
2549 if validator.info.always:
2550 return True
2551 return False
2552
2553
2554def apply_model_validators(
2555 schema: core_schema.CoreSchema,
2556 validators: Iterable[Decorator[ModelValidatorDecoratorInfo]],
2557 mode: Literal['inner', 'outer', 'all'],
2558) -> core_schema.CoreSchema:
2559 """Apply model validators to a schema.
2560
2561 If mode == 'inner', only "before" validators are applied
2562 If mode == 'outer', validators other than "before" are applied
2563 If mode == 'all', all validators are applied
2564
2565 Args:
2566 schema: The schema to apply validators on.
2567 validators: An iterable of validators.
2568 mode: The validator mode.
2569
2570 Returns:
2571 The updated schema.
2572 """
2573 ref: str | None = schema.pop('ref', None) # type: ignore
2574 for validator in validators:
2575 if mode == 'inner' and validator.info.mode != 'before':
2576 continue
2577 if mode == 'outer' and validator.info.mode == 'before':
2578 continue
2579 info_arg = inspect_validator(validator.func, validator.info.mode)
2580 if validator.info.mode == 'wrap':
2581 if info_arg:
2582 schema = core_schema.with_info_wrap_validator_function(function=validator.func, schema=schema)
2583 else:
2584 schema = core_schema.no_info_wrap_validator_function(function=validator.func, schema=schema)
2585 elif validator.info.mode == 'before':
2586 if info_arg:
2587 schema = core_schema.with_info_before_validator_function(function=validator.func, schema=schema)
2588 else:
2589 schema = core_schema.no_info_before_validator_function(function=validator.func, schema=schema)
2590 else:
2591 assert validator.info.mode == 'after'
2592 if info_arg:
2593 schema = core_schema.with_info_after_validator_function(function=validator.func, schema=schema)
2594 else:
2595 schema = core_schema.no_info_after_validator_function(function=validator.func, schema=schema)
2596 if ref:
2597 schema['ref'] = ref # type: ignore
2598 return schema
2599
2600
2601def wrap_default(field_info: FieldInfo, schema: core_schema.CoreSchema) -> core_schema.CoreSchema:
2602 """Wrap schema with default schema if default value or `default_factory` are available.
2603
2604 Args:
2605 field_info: The field info object.
2606 schema: The schema to apply default on.
2607
2608 Returns:
2609 Updated schema by default value or `default_factory`.
2610 """
2611 if field_info.default_factory:
2612 return core_schema.with_default_schema(
2613 schema,
2614 default_factory=field_info.default_factory,
2615 default_factory_takes_data=takes_validated_data_argument(field_info.default_factory),
2616 validate_default=field_info.validate_default,
2617 )
2618 elif field_info.default is not PydanticUndefined:
2619 return core_schema.with_default_schema(
2620 schema, default=field_info.default, validate_default=field_info.validate_default
2621 )
2622 else:
2623 return schema
2624
2625
2626def _extract_get_pydantic_json_schema(tp: Any) -> GetJsonSchemaFunction | None:
2627 """Extract `__get_pydantic_json_schema__` from a type, handling the deprecated `__modify_schema__`."""
2628 js_modify_function = getattr(tp, '__get_pydantic_json_schema__', None)
2629
2630 if hasattr(tp, '__modify_schema__'):
2631 BaseModel = import_cached_base_model()
2632
2633 has_custom_v2_modify_js_func = (
2634 js_modify_function is not None
2635 and BaseModel.__get_pydantic_json_schema__.__func__ # type: ignore
2636 not in (js_modify_function, getattr(js_modify_function, '__func__', None))
2637 )
2638
2639 if not has_custom_v2_modify_js_func:
2640 cls_name = getattr(tp, '__name__', None)
2641 raise PydanticUserError(
2642 f'The `__modify_schema__` method is not supported in Pydantic v2. '
2643 f'Use `__get_pydantic_json_schema__` instead{f" in class `{cls_name}`" if cls_name else ""}.',
2644 code='custom-json-schema',
2645 )
2646
2647 if (origin := get_origin(tp)) is not None:
2648 # Generic aliases proxy attribute access to the origin, *except* dunder attributes,
2649 # such as `__get_pydantic_json_schema__`, hence the explicit check.
2650 return _extract_get_pydantic_json_schema(origin)
2651
2652 if js_modify_function is None:
2653 return None
2654
2655 return js_modify_function
2656
2657
2658class _CommonField(TypedDict):
2659 schema: core_schema.CoreSchema
2660 validation_alias: str | list[str | int] | list[list[str | int]] | None
2661 serialization_alias: str | None
2662 serialization_exclude: bool | None
2663 frozen: bool | None
2664 metadata: dict[str, Any]
2665
2666
2667def _common_field(
2668 schema: core_schema.CoreSchema,
2669 *,
2670 validation_alias: str | list[str | int] | list[list[str | int]] | None = None,
2671 serialization_alias: str | None = None,
2672 serialization_exclude: bool | None = None,
2673 frozen: bool | None = None,
2674 metadata: Any = None,
2675) -> _CommonField:
2676 return {
2677 'schema': schema,
2678 'validation_alias': validation_alias,
2679 'serialization_alias': serialization_alias,
2680 'serialization_exclude': serialization_exclude,
2681 'frozen': frozen,
2682 'metadata': metadata,
2683 }
2684
2685
2686def resolve_original_schema(schema: CoreSchema, definitions: _Definitions) -> CoreSchema | None:
2687 if schema['type'] == 'definition-ref':
2688 return definitions.get_schema_from_ref(schema['schema_ref'])
2689 elif schema['type'] == 'definitions':
2690 return schema['schema']
2691 else:
2692 return schema
2693
2694
2695def _inlining_behavior(
2696 def_ref: core_schema.DefinitionReferenceSchema,
2697) -> Literal['inline', 'keep', 'preserve_metadata']:
2698 """Determine the inlining behavior of the `'definition-ref'` schema.
2699
2700 - If no `'serialization'` schema and no metadata is attached, the schema can safely be inlined.
2701 - If it has metadata but only related to the deferred discriminator application, it can be inlined
2702 provided that such metadata is kept.
2703 - Otherwise, the schema should not be inlined. Doing so would remove the `'serialization'` schema or metadata.
2704 """
2705 if 'serialization' in def_ref:
2706 return 'keep'
2707 metadata = def_ref.get('metadata')
2708 if not metadata:
2709 return 'inline'
2710 if len(metadata) == 1 and 'pydantic_internal_union_discriminator' in metadata:
2711 return 'preserve_metadata'
2712 return 'keep'
2713
2714
2715class _Definitions:
2716 """Keeps track of references and definitions."""
2717
2718 _recursively_seen: set[str]
2719 """A set of recursively seen references.
2720
2721 When a referenceable type is encountered, the `get_schema_or_ref` context manager is
2722 entered to compute the reference. If the type references itself by some way (e.g. for
2723 a dataclass a Pydantic model, the class can be referenced as a field annotation),
2724 entering the context manager again will yield a `'definition-ref'` schema that should
2725 short-circuit the normal generation process, as the reference was already in this set.
2726 """
2727
2728 _definitions: dict[str, core_schema.CoreSchema]
2729 """A mapping of references to their corresponding schema.
2730
2731 When a schema for a referenceable type is generated, it is stored in this mapping. If the
2732 same type is encountered again, the reference is yielded by the `get_schema_or_ref` context
2733 manager.
2734 """
2735
2736 def __init__(self) -> None:
2737 self._recursively_seen = set()
2738 self._definitions = {}
2739
2740 @contextmanager
2741 def get_schema_or_ref(self, tp: Any, /) -> Generator[tuple[str, core_schema.DefinitionReferenceSchema | None]]:
2742 """Get a definition for `tp` if one exists.
2743
2744 If a definition exists, a tuple of `(ref_string, CoreSchema)` is returned.
2745 If no definition exists yet, a tuple of `(ref_string, None)` is returned.
2746
2747 Note that the returned `CoreSchema` will always be a `DefinitionReferenceSchema`,
2748 not the actual definition itself.
2749
2750 This should be called for any type that can be identified by reference.
2751 This includes any recursive types.
2752
2753 At present the following types can be named/recursive:
2754
2755 - Pydantic model
2756 - Pydantic and stdlib dataclasses
2757 - Typed dictionaries
2758 - Named tuples
2759 - `TypeAliasType` instances
2760 - Enums
2761 """
2762 ref = get_type_ref(tp)
2763 # return the reference if we're either (1) in a cycle or (2) it the reference was already encountered:
2764 if ref in self._recursively_seen or ref in self._definitions:
2765 yield (ref, core_schema.definition_reference_schema(ref))
2766 else:
2767 self._recursively_seen.add(ref)
2768 try:
2769 yield (ref, None)
2770 finally:
2771 self._recursively_seen.discard(ref)
2772
2773 def get_schema_from_ref(self, ref: str) -> CoreSchema | None:
2774 """Resolve the schema from the given reference."""
2775 return self._definitions.get(ref)
2776
2777 def create_definition_reference_schema(self, schema: CoreSchema) -> core_schema.DefinitionReferenceSchema:
2778 """Store the schema as a definition and return a `'definition-reference'` schema pointing to it.
2779
2780 The schema must have a reference attached to it.
2781 """
2782 ref = schema['ref'] # pyright: ignore
2783 self._definitions[ref] = schema
2784 return core_schema.definition_reference_schema(ref)
2785
2786 def unpack_definitions(self, schema: core_schema.DefinitionsSchema) -> CoreSchema:
2787 """Store the definitions of the `'definitions'` core schema and return the inner core schema."""
2788 for def_schema in schema['definitions']:
2789 self._definitions[def_schema['ref']] = def_schema # pyright: ignore
2790 return schema['schema']
2791
2792 def finalize_schema(self, schema: CoreSchema) -> CoreSchema:
2793 """Finalize the core schema.
2794
2795 This traverses the core schema and referenced definitions, replaces `'definition-ref'` schemas
2796 by the referenced definition if possible, and applies deferred discriminators.
2797 """
2798 definitions = self._definitions
2799 try:
2800 gather_result = gather_schemas_for_cleaning(
2801 schema,
2802 definitions=definitions,
2803 )
2804 except MissingDefinitionError as e:
2805 raise InvalidSchemaError from e
2806
2807 remaining_defs: dict[str, CoreSchema] = {}
2808
2809 # Note: this logic doesn't play well when core schemas with deferred discriminator metadata
2810 # and references are encountered. See the `test_deferred_discriminated_union_and_references()` test.
2811 for ref, inlinable_def_ref in gather_result['collected_references'].items():
2812 if inlinable_def_ref is not None and (inlining_behavior := _inlining_behavior(inlinable_def_ref)) != 'keep':
2813 if inlining_behavior == 'inline':
2814 # `ref` was encountered, and only once:
2815 # - `inlinable_def_ref` is a `'definition-ref'` schema and is guaranteed to be
2816 # the only one. Transform it into the definition it points to.
2817 # - Do not store the definition in the `remaining_defs`.
2818 inlinable_def_ref.clear() # pyright: ignore[reportAttributeAccessIssue]
2819 inlinable_def_ref.update(self._resolve_definition(ref, definitions)) # pyright: ignore
2820 elif inlining_behavior == 'preserve_metadata':
2821 # `ref` was encountered, and only once, but contains discriminator metadata.
2822 # We will do the same thing as if `inlining_behavior` was `'inline'`, but make
2823 # sure to keep the metadata for the deferred discriminator application logic below.
2824 meta = inlinable_def_ref.pop('metadata')
2825 inlinable_def_ref.clear() # pyright: ignore[reportAttributeAccessIssue]
2826 inlinable_def_ref.update(self._resolve_definition(ref, definitions)) # pyright: ignore
2827 inlinable_def_ref['metadata'] = meta
2828 else:
2829 # `ref` was encountered, at least two times (or only once, but with metadata or a serialization schema):
2830 # - Do not inline the `'definition-ref'` schemas (they are not provided in the gather result anyway).
2831 # - Store the the definition in the `remaining_defs`
2832 remaining_defs[ref] = self._resolve_definition(ref, definitions)
2833
2834 for cs in gather_result['deferred_discriminator_schemas']:
2835 discriminator: str | None = cs['metadata'].pop('pydantic_internal_union_discriminator', None) # pyright: ignore[reportTypedDictNotRequiredAccess]
2836 if discriminator is None:
2837 # This can happen in rare scenarios, when a deferred schema is present multiple times in the
2838 # gather result (e.g. when using the `Sequence` type -- see `test_sequence_discriminated_union()`).
2839 # In this case, a previous loop iteration applied the discriminator and so we can just skip it here.
2840 continue
2841 applied = _discriminated_union.apply_discriminator(cs.copy(), discriminator, remaining_defs)
2842 # Mutate the schema directly to have the discriminator applied
2843 cs.clear() # pyright: ignore[reportAttributeAccessIssue]
2844 cs.update(applied) # pyright: ignore
2845
2846 if remaining_defs:
2847 schema = core_schema.definitions_schema(schema=schema, definitions=[*remaining_defs.values()])
2848 return schema
2849
2850 def _resolve_definition(self, ref: str, definitions: dict[str, CoreSchema]) -> CoreSchema:
2851 definition = definitions[ref]
2852 if definition['type'] != 'definition-ref':
2853 return definition
2854
2855 # Some `'definition-ref'` schemas might act as "intermediate" references (e.g. when using
2856 # a PEP 695 type alias (which is referenceable) that references another PEP 695 type alias):
2857 visited: set[str] = set()
2858 while definition['type'] == 'definition-ref' and _inlining_behavior(definition) == 'inline':
2859 schema_ref = definition['schema_ref']
2860 if schema_ref in visited:
2861 raise PydanticUserError(
2862 f'{ref} contains a circular reference to itself.', code='circular-reference-schema'
2863 )
2864 visited.add(schema_ref)
2865 definition = definitions[schema_ref]
2866 return {**definition, 'ref': ref} # pyright: ignore[reportReturnType]
2867
2868
2869class _FieldNameStack:
2870 __slots__ = ('_stack',)
2871
2872 def __init__(self) -> None:
2873 self._stack: list[str] = []
2874
2875 @contextmanager
2876 def push(self, field_name: str) -> Iterator[None]:
2877 self._stack.append(field_name)
2878 yield
2879 self._stack.pop()
2880
2881 def get(self) -> str | None:
2882 if self._stack:
2883 return self._stack[-1]
2884 else:
2885 return None
2886
2887
2888class _ModelTypeStack:
2889 __slots__ = ('_stack',)
2890
2891 def __init__(self) -> None:
2892 self._stack: list[type] = []
2893
2894 @contextmanager
2895 def push(self, type_obj: type) -> Iterator[None]:
2896 self._stack.append(type_obj)
2897 yield
2898 self._stack.pop()
2899
2900 def get(self) -> type | None:
2901 if self._stack:
2902 return self._stack[-1]
2903 else:
2904 return None