Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pydantic/_internal/_generate_schema.py: 44%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1328 statements  

1"""Convert python types to pydantic-core schema.""" 

2 

3from __future__ import annotations as _annotations 

4 

5import collections.abc 

6import dataclasses 

7import datetime 

8import inspect 

9import os 

10import pathlib 

11import re 

12import sys 

13import typing 

14import warnings 

15from collections.abc import Generator, Iterable, Iterator, Mapping 

16from contextlib import contextmanager 

17from copy import copy 

18from decimal import Decimal 

19from enum import Enum 

20from fractions import Fraction 

21from functools import partial 

22from inspect import Parameter, _ParameterKind 

23from ipaddress import IPv4Address, IPv4Interface, IPv4Network, IPv6Address, IPv6Interface, IPv6Network 

24from itertools import chain 

25from operator import attrgetter 

26from types import FunctionType, GenericAlias, LambdaType, MethodType 

27from typing import ( 

28 TYPE_CHECKING, 

29 Any, 

30 Callable, 

31 Final, 

32 ForwardRef, 

33 Literal, 

34 TypeVar, 

35 Union, 

36 cast, 

37 overload, 

38) 

39from uuid import UUID 

40from zoneinfo import ZoneInfo 

41 

42import typing_extensions 

43from pydantic_core import ( 

44 MISSING, 

45 CoreSchema, 

46 MultiHostUrl, 

47 PydanticCustomError, 

48 PydanticSerializationUnexpectedValue, 

49 PydanticUndefined, 

50 Url, 

51 core_schema, 

52 to_jsonable_python, 

53) 

54from typing_extensions import TypeAlias, TypeAliasType, get_args, get_origin, is_typeddict 

55from typing_inspection import typing_objects 

56from typing_inspection.introspection import AnnotationSource, get_literal_values, is_union_origin 

57 

58from ..aliases import AliasChoices, AliasPath 

59from ..annotated_handlers import GetCoreSchemaHandler, GetJsonSchemaHandler 

60from ..config import ConfigDict, JsonDict, JsonEncoder, JsonSchemaExtraCallable 

61from ..errors import ( 

62 PydanticForbiddenQualifier, 

63 PydanticInvalidForJsonSchema, 

64 PydanticSchemaGenerationError, 

65 PydanticUndefinedAnnotation, 

66 PydanticUserError, 

67) 

68from ..functional_validators import AfterValidator, BeforeValidator, FieldValidatorModes, PlainValidator, WrapValidator 

69from ..json_schema import JsonSchemaValue 

70from ..version import version_short 

71from ..warnings import ( 

72 ArbitraryTypeWarning, 

73 PydanticDeprecatedSince20, 

74 TypedDictExtraConfigWarning, 

75 UnsupportedFieldAttributeWarning, 

76) 

77from . import _decorators, _discriminated_union, _known_annotated_metadata, _repr, _typing_extra 

78from ._config import ConfigWrapper, ConfigWrapperStack 

79from ._core_metadata import CoreMetadata, update_core_metadata 

80from ._core_utils import ( 

81 get_ref, 

82 get_type_ref, 

83 is_list_like_schema_with_items_schema, 

84) 

85from ._decorators import ( 

86 Decorator, 

87 DecoratorInfos, 

88 FieldSerializerDecoratorInfo, 

89 FieldValidatorDecoratorInfo, 

90 ModelSerializerDecoratorInfo, 

91 ModelValidatorDecoratorInfo, 

92 RootValidatorDecoratorInfo, 

93 ValidatorDecoratorInfo, 

94 get_attribute_from_bases, 

95 inspect_field_serializer, 

96 inspect_model_serializer, 

97 inspect_validator, 

98) 

99from ._docs_extraction import extract_docstrings_from_cls 

100from ._fields import ( 

101 collect_dataclass_fields, 

102 rebuild_dataclass_fields, 

103 rebuild_model_fields, 

104 takes_validated_data_argument, 

105 update_field_from_config, 

106) 

107from ._forward_ref import PydanticRecursiveRef 

108from ._generics import get_standard_typevars_map, replace_types 

109from ._import_utils import import_cached_base_model, import_cached_field_info 

110from ._mock_val_ser import MockCoreSchema 

111from ._namespace_utils import NamespacesTuple, NsResolver 

112from ._schema_gather import MissingDefinitionError, gather_schemas_for_cleaning 

113from ._schema_generation_shared import CallbackGetCoreSchemaHandler 

114from ._utils import lenient_issubclass, smart_deepcopy 

115 

116if TYPE_CHECKING: 

117 from ..fields import ComputedFieldInfo, FieldInfo 

118 from ..main import BaseModel 

119 from ..types import Discriminator 

120 from ._dataclasses import StandardDataclass 

121 from ._schema_generation_shared import GetJsonSchemaFunction 

122 

123_SUPPORTS_TYPEDDICT = sys.version_info >= (3, 12) 

124 

125FieldDecoratorInfo = Union[ValidatorDecoratorInfo, FieldValidatorDecoratorInfo, FieldSerializerDecoratorInfo] 

126FieldDecoratorInfoType = TypeVar('FieldDecoratorInfoType', bound=FieldDecoratorInfo) 

127AnyFieldDecorator = Union[ 

128 Decorator[ValidatorDecoratorInfo], 

129 Decorator[FieldValidatorDecoratorInfo], 

130 Decorator[FieldSerializerDecoratorInfo], 

131] 

132 

133ModifyCoreSchemaWrapHandler: TypeAlias = GetCoreSchemaHandler 

134GetCoreSchemaFunction: TypeAlias = Callable[[Any, ModifyCoreSchemaWrapHandler], core_schema.CoreSchema] 

135ParametersCallback: TypeAlias = "Callable[[int, str, Any], Literal['skip'] | None]" 

136 

137TUPLE_TYPES: list[type] = [typing.Tuple, tuple] # noqa: UP006 

138LIST_TYPES: list[type] = [typing.List, list, collections.abc.MutableSequence] # noqa: UP006 

139SET_TYPES: list[type] = [typing.Set, set, collections.abc.MutableSet] # noqa: UP006 

140FROZEN_SET_TYPES: list[type] = [typing.FrozenSet, frozenset, collections.abc.Set] # noqa: UP006 

141DICT_TYPES: list[type] = [typing.Dict, dict] # noqa: UP006 

142IP_TYPES: list[type] = [IPv4Address, IPv4Interface, IPv4Network, IPv6Address, IPv6Interface, IPv6Network] 

143SEQUENCE_TYPES: list[type] = [typing.Sequence, collections.abc.Sequence] 

144ITERABLE_TYPES: list[type] = [typing.Iterable, collections.abc.Iterable, typing.Generator, collections.abc.Generator] 

145TYPE_TYPES: list[type] = [typing.Type, type] # noqa: UP006 

146PATTERN_TYPES: list[type] = [typing.Pattern, re.Pattern] 

147PATH_TYPES: list[type] = [ 

148 os.PathLike, 

149 pathlib.Path, 

150 pathlib.PurePath, 

151 pathlib.PosixPath, 

152 pathlib.PurePosixPath, 

153 pathlib.PureWindowsPath, 

154] 

155MAPPING_TYPES = [ 

156 typing.Mapping, 

157 typing.MutableMapping, 

158 collections.abc.Mapping, 

159 collections.abc.MutableMapping, 

160 collections.OrderedDict, 

161 typing_extensions.OrderedDict, 

162 typing.DefaultDict, # noqa: UP006 

163 collections.defaultdict, 

164] 

165COUNTER_TYPES = [collections.Counter, typing.Counter] 

166DEQUE_TYPES: list[type] = [collections.deque, typing.Deque] # noqa: UP006 

167 

168# Note: This does not play very well with type checkers. For example, 

169# `a: LambdaType = lambda x: x` will raise a type error by Pyright. 

170ValidateCallSupportedTypes = Union[ 

171 LambdaType, 

172 FunctionType, 

173 MethodType, 

174 partial, 

175] 

176 

177VALIDATE_CALL_SUPPORTED_TYPES = get_args(ValidateCallSupportedTypes) 

178UNSUPPORTED_STANDALONE_FIELDINFO_ATTRIBUTES: list[tuple[str, Any]] = [ 

179 ('alias', None), 

180 ('validation_alias', None), 

181 ('serialization_alias', None), 

182 # will be set if any alias is set, so disable it to avoid double warnings: 

183 # 'alias_priority', 

184 ('default', PydanticUndefined), 

185 ('default_factory', None), 

186 ('exclude', None), 

187 ('deprecated', None), 

188 ('repr', True), 

189 ('validate_default', None), 

190 ('frozen', None), 

191 ('init', None), 

192 ('init_var', None), 

193 ('kw_only', None), 

194] 

195"""`FieldInfo` attributes (and their default value) that can't be used outside of a model (e.g. in a type adapter or a PEP 695 type alias).""" 

196 

197_mode_to_validator: dict[ 

198 FieldValidatorModes, type[BeforeValidator | AfterValidator | PlainValidator | WrapValidator] 

199] = {'before': BeforeValidator, 'after': AfterValidator, 'plain': PlainValidator, 'wrap': WrapValidator} 

200 

201 

202def check_validator_fields_against_field_name( 

203 info: FieldDecoratorInfo, 

204 field: str, 

205) -> bool: 

206 """Check if field name is in validator fields. 

207 

208 Args: 

209 info: The field info. 

210 field: The field name to check. 

211 

212 Returns: 

213 `True` if field name is in validator fields, `False` otherwise. 

214 """ 

215 fields = info.fields 

216 return '*' in fields or field in fields 

217 

218 

219def check_decorator_fields_exist(decorators: Iterable[AnyFieldDecorator], fields: Iterable[str]) -> None: 

220 """Check if the defined fields in decorators exist in `fields` param. 

221 

222 It ignores the check for a decorator if the decorator has `*` as field or `check_fields=False`. 

223 

224 Args: 

225 decorators: An iterable of decorators. 

226 fields: An iterable of fields name. 

227 

228 Raises: 

229 PydanticUserError: If one of the field names does not exist in `fields` param. 

230 """ 

231 fields = set(fields) 

232 for dec in decorators: 

233 if '*' in dec.info.fields: 

234 continue 

235 if dec.info.check_fields is False: 

236 continue 

237 for field in dec.info.fields: 

238 if field not in fields: 

239 raise PydanticUserError( 

240 f'Decorators defined with incorrect fields: {dec.cls_ref}.{dec.cls_var_name}' 

241 " (use check_fields=False if you're inheriting from the model and intended this)", 

242 code='decorator-missing-field', 

243 ) 

244 

245 

246def filter_field_decorator_info_by_field( 

247 validator_functions: Iterable[Decorator[FieldDecoratorInfoType]], field: str 

248) -> list[Decorator[FieldDecoratorInfoType]]: 

249 return [dec for dec in validator_functions if check_validator_fields_against_field_name(dec.info, field)] 

250 

251 

252def apply_each_item_validators( 

253 schema: core_schema.CoreSchema, 

254 each_item_validators: list[Decorator[ValidatorDecoratorInfo]], 

255) -> core_schema.CoreSchema: 

256 # This V1 compatibility shim should eventually be removed 

257 

258 # fail early if each_item_validators is empty 

259 if not each_item_validators: 

260 return schema 

261 

262 # push down any `each_item=True` validators 

263 # note that this won't work for any Annotated types that get wrapped by a function validator 

264 # but that's okay because that didn't exist in V1 

265 if schema['type'] == 'nullable': 

266 schema['schema'] = apply_each_item_validators(schema['schema'], each_item_validators) 

267 return schema 

268 elif schema['type'] == 'tuple': 

269 if (variadic_item_index := schema.get('variadic_item_index')) is not None: 

270 schema['items_schema'][variadic_item_index] = apply_validators( 

271 schema['items_schema'][variadic_item_index], 

272 each_item_validators, 

273 ) 

274 elif is_list_like_schema_with_items_schema(schema): 

275 inner_schema = schema.get('items_schema', core_schema.any_schema()) 

276 schema['items_schema'] = apply_validators(inner_schema, each_item_validators) 

277 elif schema['type'] == 'dict': 

278 inner_schema = schema.get('values_schema', core_schema.any_schema()) 

279 schema['values_schema'] = apply_validators(inner_schema, each_item_validators) 

280 else: 

281 raise TypeError( 

282 f'`@validator(..., each_item=True)` cannot be applied to fields with a schema of {schema["type"]}' 

283 ) 

284 return schema 

285 

286 

287def _extract_json_schema_info_from_field_info( 

288 info: FieldInfo | ComputedFieldInfo, 

289) -> tuple[JsonDict | None, JsonDict | JsonSchemaExtraCallable | None]: 

290 json_schema_updates = { 

291 'title': info.title, 

292 'description': info.description, 

293 'deprecated': bool(info.deprecated) or info.deprecated == '' or None, 

294 'examples': to_jsonable_python(info.examples), 

295 } 

296 json_schema_updates = {k: v for k, v in json_schema_updates.items() if v is not None} 

297 return (json_schema_updates or None, info.json_schema_extra) 

298 

299 

300JsonEncoders = dict[type[Any], JsonEncoder] 

301 

302 

303def _add_custom_serialization_from_json_encoders( 

304 json_encoders: JsonEncoders | None, tp: Any, schema: CoreSchema 

305) -> CoreSchema: 

306 """Iterate over the json_encoders and add the first matching encoder to the schema. 

307 

308 Args: 

309 json_encoders: A dictionary of types and their encoder functions. 

310 tp: The type to check for a matching encoder. 

311 schema: The schema to add the encoder to. 

312 """ 

313 if not json_encoders: 

314 return schema 

315 if 'serialization' in schema: 

316 return schema 

317 # Check the class type and its superclasses for a matching encoder 

318 # Decimal.__class__.__mro__ (and probably other cases) doesn't include Decimal itself 

319 # if the type is a GenericAlias (e.g. from list[int]) we need to use __class__ instead of .__mro__ 

320 for base in (tp, *getattr(tp, '__mro__', tp.__class__.__mro__)[:-1]): 

321 encoder = json_encoders.get(base) 

322 if encoder is None: 

323 continue 

324 

325 warnings.warn( 

326 f'`json_encoders` is deprecated. See https://docs.pydantic.dev/{version_short()}/concepts/serialization/#custom-serializers for alternatives', 

327 PydanticDeprecatedSince20, 

328 ) 

329 

330 # TODO: in theory we should check that the schema accepts a serialization key 

331 schema['serialization'] = core_schema.plain_serializer_function_ser_schema(encoder, when_used='json') 

332 return schema 

333 

334 return schema 

335 

336 

337GENERATE_SCHEMA_ERRORS = ( 

338 PydanticForbiddenQualifier, 

339 PydanticInvalidForJsonSchema, 

340 PydanticSchemaGenerationError, 

341 PydanticUndefinedAnnotation, 

342) 

343"""Errors raised during core schema generation. This does *not* include `InvalidSchemaError`, which is raised during schema cleaning.""" 

344 

345 

346class InvalidSchemaError(Exception): 

347 """The core schema is invalid.""" 

348 

349 

350class GenerateSchema: 

351 """Generate core schema for a Pydantic model, dataclass and types like `str`, `datetime`, ... .""" 

352 

353 __slots__ = ( 

354 '_config_wrapper_stack', 

355 '_ns_resolver', 

356 '_typevars_map', 

357 'field_name_stack', 

358 'model_type_stack', 

359 'defs', 

360 ) 

361 

362 def __init__( 

363 self, 

364 config_wrapper: ConfigWrapper, 

365 ns_resolver: NsResolver | None = None, 

366 typevars_map: Mapping[TypeVar, Any] | None = None, 

367 ) -> None: 

368 # we need a stack for recursing into nested models 

369 self._config_wrapper_stack = ConfigWrapperStack(config_wrapper) 

370 self._ns_resolver = ns_resolver or NsResolver() 

371 self._typevars_map = typevars_map 

372 self.field_name_stack = _FieldNameStack() 

373 self.model_type_stack = _ModelTypeStack() 

374 self.defs = _Definitions() 

375 

376 def __init_subclass__(cls) -> None: 

377 super().__init_subclass__() 

378 warnings.warn( 

379 'Subclassing `GenerateSchema` is not supported. The API is highly subject to change in minor versions.', 

380 UserWarning, 

381 stacklevel=2, 

382 ) 

383 

384 @property 

385 def _config_wrapper(self) -> ConfigWrapper: 

386 return self._config_wrapper_stack.tail 

387 

388 @property 

389 def _types_namespace(self) -> NamespacesTuple: 

390 return self._ns_resolver.types_namespace 

391 

392 @property 

393 def _arbitrary_types(self) -> bool: 

394 return self._config_wrapper.arbitrary_types_allowed 

395 

396 # the following methods can be overridden but should be considered 

397 # unstable / private APIs 

398 def _list_schema(self, items_type: Any) -> CoreSchema: 

399 return core_schema.list_schema(self.generate_schema(items_type)) 

400 

401 def _dict_schema(self, keys_type: Any, values_type: Any) -> CoreSchema: 

402 return core_schema.dict_schema(self.generate_schema(keys_type), self.generate_schema(values_type)) 

403 

404 def _set_schema(self, items_type: Any) -> CoreSchema: 

405 return core_schema.set_schema(self.generate_schema(items_type)) 

406 

407 def _frozenset_schema(self, items_type: Any) -> CoreSchema: 

408 return core_schema.frozenset_schema(self.generate_schema(items_type)) 

409 

410 def _enum_schema(self, enum_type: type[Enum]) -> CoreSchema: 

411 cases: list[Any] = list(enum_type.__members__.values()) 

412 

413 enum_ref = get_type_ref(enum_type) 

414 description = None if not enum_type.__doc__ else inspect.cleandoc(enum_type.__doc__) 

415 if ( 

416 description == 'An enumeration.' 

417 ): # This is the default value provided by enum.EnumMeta.__new__; don't use it 

418 description = None 

419 js_updates = {'title': enum_type.__name__, 'description': description} 

420 js_updates = {k: v for k, v in js_updates.items() if v is not None} 

421 

422 sub_type: Literal['str', 'int', 'float'] | None = None 

423 if issubclass(enum_type, int): 

424 sub_type = 'int' 

425 value_ser_type: core_schema.SerSchema = core_schema.simple_ser_schema('int') 

426 elif issubclass(enum_type, str): 

427 # this handles `StrEnum` (3.11 only), and also `Foobar(str, Enum)` 

428 sub_type = 'str' 

429 value_ser_type = core_schema.simple_ser_schema('str') 

430 elif issubclass(enum_type, float): 

431 sub_type = 'float' 

432 value_ser_type = core_schema.simple_ser_schema('float') 

433 else: 

434 # TODO this is an ugly hack, how do we trigger an Any schema for serialization? 

435 value_ser_type = core_schema.plain_serializer_function_ser_schema(lambda x: x) 

436 

437 if cases: 

438 

439 def get_json_schema(schema: CoreSchema, handler: GetJsonSchemaHandler) -> JsonSchemaValue: 

440 json_schema = handler(schema) 

441 original_schema = handler.resolve_ref_schema(json_schema) 

442 original_schema.update(js_updates) 

443 return json_schema 

444 

445 # we don't want to add the missing to the schema if it's the default one 

446 default_missing = getattr(enum_type._missing_, '__func__', None) is Enum._missing_.__func__ # pyright: ignore[reportFunctionMemberAccess] 

447 enum_schema = core_schema.enum_schema( 

448 enum_type, 

449 cases, 

450 sub_type=sub_type, 

451 missing=None if default_missing else enum_type._missing_, 

452 ref=enum_ref, 

453 metadata={'pydantic_js_functions': [get_json_schema]}, 

454 ) 

455 

456 if self._config_wrapper.use_enum_values: 

457 enum_schema = core_schema.no_info_after_validator_function( 

458 attrgetter('value'), enum_schema, serialization=value_ser_type 

459 ) 

460 

461 return enum_schema 

462 

463 else: 

464 

465 def get_json_schema_no_cases(_, handler: GetJsonSchemaHandler) -> JsonSchemaValue: 

466 json_schema = handler(core_schema.enum_schema(enum_type, cases, sub_type=sub_type, ref=enum_ref)) 

467 original_schema = handler.resolve_ref_schema(json_schema) 

468 original_schema.update(js_updates) 

469 return json_schema 

470 

471 # Use an isinstance check for enums with no cases. 

472 # The most important use case for this is creating TypeVar bounds for generics that should 

473 # be restricted to enums. This is more consistent than it might seem at first, since you can only 

474 # subclass enum.Enum (or subclasses of enum.Enum) if all parent classes have no cases. 

475 # We use the get_json_schema function when an Enum subclass has been declared with no cases 

476 # so that we can still generate a valid json schema. 

477 return core_schema.is_instance_schema( 

478 enum_type, 

479 metadata={'pydantic_js_functions': [get_json_schema_no_cases]}, 

480 ) 

481 

482 def _ip_schema(self, tp: Any) -> CoreSchema: 

483 from ._validators import IP_VALIDATOR_LOOKUP, IpType 

484 

485 ip_type_json_schema_format: dict[type[IpType], str] = { 

486 IPv4Address: 'ipv4', 

487 IPv4Network: 'ipv4network', 

488 IPv4Interface: 'ipv4interface', 

489 IPv6Address: 'ipv6', 

490 IPv6Network: 'ipv6network', 

491 IPv6Interface: 'ipv6interface', 

492 } 

493 

494 def ser_ip(ip: Any, info: core_schema.SerializationInfo) -> str | IpType: 

495 if not isinstance(ip, (tp, str)): 

496 raise PydanticSerializationUnexpectedValue( 

497 f"Expected `{tp}` but got `{type(ip)}` with value `'{ip}'` - serialized value may not be as expected." 

498 ) 

499 if info.mode == 'python': 

500 return ip 

501 return str(ip) 

502 

503 return core_schema.lax_or_strict_schema( 

504 lax_schema=core_schema.no_info_plain_validator_function(IP_VALIDATOR_LOOKUP[tp]), 

505 strict_schema=core_schema.json_or_python_schema( 

506 json_schema=core_schema.no_info_after_validator_function(tp, core_schema.str_schema()), 

507 python_schema=core_schema.is_instance_schema(tp), 

508 ), 

509 serialization=core_schema.plain_serializer_function_ser_schema(ser_ip, info_arg=True, when_used='always'), 

510 metadata={ 

511 'pydantic_js_functions': [lambda _1, _2: {'type': 'string', 'format': ip_type_json_schema_format[tp]}] 

512 }, 

513 ) 

514 

515 def _path_schema(self, tp: Any, path_type: Any) -> CoreSchema: 

516 if tp is os.PathLike and (path_type not in {str, bytes} and not typing_objects.is_any(path_type)): 

517 raise PydanticUserError( 

518 '`os.PathLike` can only be used with `str`, `bytes` or `Any`', code='schema-for-unknown-type' 

519 ) 

520 

521 path_constructor = pathlib.PurePath if tp is os.PathLike else tp 

522 strict_inner_schema = ( 

523 core_schema.bytes_schema(strict=True) if (path_type is bytes) else core_schema.str_schema(strict=True) 

524 ) 

525 lax_inner_schema = core_schema.bytes_schema() if (path_type is bytes) else core_schema.str_schema() 

526 

527 def path_validator(input_value: str | bytes) -> os.PathLike[Any]: # type: ignore 

528 try: 

529 if path_type is bytes: 

530 if isinstance(input_value, bytes): 

531 try: 

532 input_value = input_value.decode() 

533 except UnicodeDecodeError as e: 

534 raise PydanticCustomError('bytes_type', 'Input must be valid bytes') from e 

535 else: 

536 raise PydanticCustomError('bytes_type', 'Input must be bytes') 

537 elif not isinstance(input_value, str): 

538 raise PydanticCustomError('path_type', 'Input is not a valid path') 

539 

540 return path_constructor(input_value) # type: ignore 

541 except TypeError as e: 

542 raise PydanticCustomError('path_type', 'Input is not a valid path') from e 

543 

544 def ser_path(path: Any, info: core_schema.SerializationInfo) -> str | os.PathLike[Any]: 

545 if not isinstance(path, (tp, str)): 

546 raise PydanticSerializationUnexpectedValue( 

547 f"Expected `{tp}` but got `{type(path)}` with value `'{path}'` - serialized value may not be as expected." 

548 ) 

549 if info.mode == 'python': 

550 return path 

551 return str(path) 

552 

553 instance_schema = core_schema.json_or_python_schema( 

554 json_schema=core_schema.no_info_after_validator_function(path_validator, lax_inner_schema), 

555 python_schema=core_schema.is_instance_schema(tp), 

556 ) 

557 

558 schema = core_schema.lax_or_strict_schema( 

559 lax_schema=core_schema.union_schema( 

560 [ 

561 instance_schema, 

562 core_schema.no_info_after_validator_function(path_validator, strict_inner_schema), 

563 ], 

564 custom_error_type='path_type', 

565 custom_error_message=f'Input is not a valid path for {tp}', 

566 ), 

567 strict_schema=instance_schema, 

568 serialization=core_schema.plain_serializer_function_ser_schema(ser_path, info_arg=True, when_used='always'), 

569 metadata={'pydantic_js_functions': [lambda source, handler: {**handler(source), 'format': 'path'}]}, 

570 ) 

571 return schema 

572 

573 def _deque_schema(self, items_type: Any) -> CoreSchema: 

574 from ._serializers import serialize_sequence_via_list 

575 from ._validators import deque_validator 

576 

577 item_type_schema = self.generate_schema(items_type) 

578 

579 # we have to use a lax list schema here, because we need to validate the deque's 

580 # items via a list schema, but it's ok if the deque itself is not a list 

581 list_schema = core_schema.list_schema(item_type_schema, strict=False) 

582 

583 check_instance = core_schema.json_or_python_schema( 

584 json_schema=list_schema, 

585 python_schema=core_schema.is_instance_schema(collections.deque, cls_repr='Deque'), 

586 ) 

587 

588 lax_schema = core_schema.no_info_wrap_validator_function(deque_validator, list_schema) 

589 

590 return core_schema.lax_or_strict_schema( 

591 lax_schema=lax_schema, 

592 strict_schema=core_schema.chain_schema([check_instance, lax_schema]), 

593 serialization=core_schema.wrap_serializer_function_ser_schema( 

594 serialize_sequence_via_list, schema=item_type_schema, info_arg=True 

595 ), 

596 ) 

597 

598 def _mapping_schema(self, tp: Any, keys_type: Any, values_type: Any) -> CoreSchema: 

599 from ._validators import MAPPING_ORIGIN_MAP, defaultdict_validator, get_defaultdict_default_default_factory 

600 

601 mapped_origin = MAPPING_ORIGIN_MAP[tp] 

602 keys_schema = self.generate_schema(keys_type) 

603 with warnings.catch_warnings(): 

604 # We kind of abused `Field()` default factories to be able to specify 

605 # the `defaultdict`'s `default_factory`. As a consequence, we get warnings 

606 # as normally `FieldInfo.default_factory` is unsupported in the context where 

607 # `Field()` is used and our only solution is to ignore them (note that this might 

608 # wrongfully ignore valid warnings, e.g. if the `value_type` is a PEP 695 type alias 

609 # with unsupported metadata). 

610 warnings.simplefilter('ignore', category=UnsupportedFieldAttributeWarning) 

611 values_schema = self.generate_schema(values_type) 

612 dict_schema = core_schema.dict_schema(keys_schema, values_schema, strict=False) 

613 

614 if mapped_origin is dict: 

615 schema = dict_schema 

616 else: 

617 check_instance = core_schema.json_or_python_schema( 

618 json_schema=dict_schema, 

619 python_schema=core_schema.is_instance_schema(mapped_origin), 

620 ) 

621 

622 if tp is collections.defaultdict: 

623 default_default_factory = get_defaultdict_default_default_factory(values_type) 

624 coerce_instance_wrap = partial( 

625 core_schema.no_info_wrap_validator_function, 

626 partial(defaultdict_validator, default_default_factory=default_default_factory), 

627 ) 

628 else: 

629 coerce_instance_wrap = partial(core_schema.no_info_after_validator_function, mapped_origin) 

630 

631 lax_schema = coerce_instance_wrap(dict_schema) 

632 strict_schema = core_schema.chain_schema([check_instance, lax_schema]) 

633 

634 schema = core_schema.lax_or_strict_schema( 

635 lax_schema=lax_schema, 

636 strict_schema=strict_schema, 

637 serialization=core_schema.wrap_serializer_function_ser_schema( 

638 lambda v, h: h(v), schema=dict_schema, info_arg=False 

639 ), 

640 ) 

641 

642 return schema 

643 

644 def _fraction_schema(self) -> CoreSchema: 

645 """Support for [`fractions.Fraction`][fractions.Fraction].""" 

646 from ._validators import fraction_validator 

647 

648 # TODO: note, this is a fairly common pattern, re lax / strict for attempted type coercion, 

649 # can we use a helper function to reduce boilerplate? 

650 return core_schema.lax_or_strict_schema( 

651 lax_schema=core_schema.no_info_plain_validator_function(fraction_validator), 

652 strict_schema=core_schema.json_or_python_schema( 

653 json_schema=core_schema.no_info_plain_validator_function(fraction_validator), 

654 python_schema=core_schema.is_instance_schema(Fraction), 

655 ), 

656 # use str serialization to guarantee round trip behavior 

657 serialization=core_schema.to_string_ser_schema(when_used='always'), 

658 metadata={'pydantic_js_functions': [lambda _1, _2: {'type': 'string', 'format': 'fraction'}]}, 

659 ) 

660 

661 def _arbitrary_type_schema(self, tp: Any) -> CoreSchema: 

662 if not isinstance(tp, type): 

663 warnings.warn( 

664 f'{tp!r} is not a Python type (it may be an instance of an object),' 

665 ' Pydantic will allow any object with no validation since we cannot even' 

666 ' enforce that the input is an instance of the given type.' 

667 ' To get rid of this error wrap the type with `pydantic.SkipValidation`.', 

668 ArbitraryTypeWarning, 

669 ) 

670 return core_schema.any_schema() 

671 return core_schema.is_instance_schema(tp) 

672 

673 def _unknown_type_schema(self, obj: Any) -> CoreSchema: 

674 raise PydanticSchemaGenerationError( 

675 f'Unable to generate pydantic-core schema for {obj!r}. ' 

676 'Set `arbitrary_types_allowed=True` in the model_config to ignore this error' 

677 ' or implement `__get_pydantic_core_schema__` on your type to fully support it.' 

678 '\n\nIf you got this error by calling handler(<some type>) within' 

679 ' `__get_pydantic_core_schema__` then you likely need to call' 

680 ' `handler.generate_schema(<some type>)` since we do not call' 

681 ' `__get_pydantic_core_schema__` on `<some type>` otherwise to avoid infinite recursion.' 

682 ) 

683 

684 def _apply_discriminator_to_union( 

685 self, schema: CoreSchema, discriminator: str | Discriminator | None 

686 ) -> CoreSchema: 

687 if discriminator is None: 

688 return schema 

689 try: 

690 return _discriminated_union.apply_discriminator( 

691 schema, 

692 discriminator, 

693 self.defs._definitions, 

694 ) 

695 except _discriminated_union.MissingDefinitionForUnionRef: 

696 # defer until defs are resolved 

697 _discriminated_union.set_discriminator_in_metadata( 

698 schema, 

699 discriminator, 

700 ) 

701 return schema 

702 

703 def clean_schema(self, schema: CoreSchema) -> CoreSchema: 

704 return self.defs.finalize_schema(schema) 

705 

706 def _add_js_function(self, metadata_schema: CoreSchema, js_function: Callable[..., Any]) -> None: 

707 metadata = metadata_schema.get('metadata', {}) 

708 pydantic_js_functions = metadata.setdefault('pydantic_js_functions', []) 

709 # because of how we generate core schemas for nested generic models 

710 # we can end up adding `BaseModel.__get_pydantic_json_schema__` multiple times 

711 # this check may fail to catch duplicates if the function is a `functools.partial` 

712 # or something like that, but if it does it'll fail by inserting the duplicate 

713 if js_function not in pydantic_js_functions: 

714 pydantic_js_functions.append(js_function) 

715 metadata_schema['metadata'] = metadata 

716 

717 def generate_schema( 

718 self, 

719 obj: Any, 

720 ) -> core_schema.CoreSchema: 

721 """Generate core schema. 

722 

723 Args: 

724 obj: The object to generate core schema for. 

725 

726 Returns: 

727 The generated core schema. 

728 

729 Raises: 

730 PydanticUndefinedAnnotation: 

731 If it is not possible to evaluate forward reference. 

732 PydanticSchemaGenerationError: 

733 If it is not possible to generate pydantic-core schema. 

734 TypeError: 

735 - If `alias_generator` returns a disallowed type (must be str, AliasPath or AliasChoices). 

736 - If V1 style validator with `each_item=True` applied on a wrong field. 

737 PydanticUserError: 

738 - If `typing.TypedDict` is used instead of `typing_extensions.TypedDict` on Python < 3.12. 

739 - If `__modify_schema__` method is used instead of `__get_pydantic_json_schema__`. 

740 """ 

741 schema = self._generate_schema_from_get_schema_method(obj, obj) 

742 

743 if schema is None: 

744 schema = self._generate_schema_inner(obj) 

745 

746 metadata_js_function = _extract_get_pydantic_json_schema(obj) 

747 if metadata_js_function is not None: 

748 metadata_schema = resolve_original_schema(schema, self.defs) 

749 if metadata_schema: 

750 self._add_js_function(metadata_schema, metadata_js_function) 

751 

752 schema = _add_custom_serialization_from_json_encoders(self._config_wrapper.json_encoders, obj, schema) 

753 

754 return schema 

755 

756 def _model_schema(self, cls: type[BaseModel]) -> core_schema.CoreSchema: 

757 """Generate schema for a Pydantic model.""" 

758 BaseModel_ = import_cached_base_model() 

759 

760 with self.defs.get_schema_or_ref(cls) as (model_ref, maybe_schema): 

761 if maybe_schema is not None: 

762 return maybe_schema 

763 

764 schema = cls.__dict__.get('__pydantic_core_schema__') 

765 if schema is not None and not isinstance(schema, MockCoreSchema): 

766 if schema['type'] == 'definitions': 

767 schema = self.defs.unpack_definitions(schema) 

768 ref = get_ref(schema) 

769 if ref: 

770 return self.defs.create_definition_reference_schema(schema) 

771 else: 

772 return schema 

773 

774 config_wrapper = ConfigWrapper(cls.model_config, check=False) 

775 

776 with self._config_wrapper_stack.push(config_wrapper), self._ns_resolver.push(cls): 

777 core_config = self._config_wrapper.core_config(title=cls.__name__) 

778 

779 if cls.__pydantic_fields_complete__ or cls is BaseModel_: 

780 fields = getattr(cls, '__pydantic_fields__', {}) 

781 extra_info = getattr(cls, '__pydantic_extra_info__', None) 

782 else: 

783 if '__pydantic_fields__' not in cls.__dict__: 

784 # This happens when we have a loop in the schema generation: 

785 # class Base[T](BaseModel): 

786 # t: T 

787 # 

788 # class Other(BaseModel): 

789 # b: 'Base[Other]' 

790 # When we build fields for `Other`, we evaluate the forward annotation. 

791 # At this point, `Other` doesn't have the model fields set. We create 

792 # `Base[Other]`; model fields are successfully built, and we try to generate 

793 # a schema for `t: Other`. As `Other.__pydantic_fields__` aren't set, we abort. 

794 raise PydanticUndefinedAnnotation( 

795 name=cls.__name__, 

796 message=f'Class {cls.__name__!r} is not defined', 

797 ) 

798 try: 

799 fields, extra_info = rebuild_model_fields( 

800 cls, 

801 config_wrapper=self._config_wrapper, 

802 ns_resolver=self._ns_resolver, 

803 typevars_map=self._typevars_map or {}, 

804 ) 

805 except NameError as e: 

806 raise PydanticUndefinedAnnotation.from_name_error(e) from e 

807 

808 decorators = cls.__pydantic_decorators__ 

809 computed_fields = decorators.computed_fields 

810 check_decorator_fields_exist( 

811 chain( 

812 decorators.field_validators.values(), 

813 decorators.field_serializers.values(), 

814 decorators.validators.values(), 

815 ), 

816 {*fields.keys(), *computed_fields.keys()}, 

817 ) 

818 

819 model_validators = decorators.model_validators.values() 

820 

821 extras_schema = None 

822 extras_keys_schema = None 

823 if core_config.get('extra_fields_behavior') == 'allow' and extra_info is not None: 

824 tp = get_origin(extra_info.annotation) 

825 if tp not in DICT_TYPES: 

826 raise PydanticSchemaGenerationError( 

827 'The type annotation for `__pydantic_extra__` must be `dict[str, ...]`' 

828 ) 

829 # See the comments in `_get_args_resolving_forward_refs()` for why we need 

830 # to re-evaluate the annotation: 

831 extra_keys_type, extra_items_type = self._get_args_resolving_forward_refs( 

832 extra_info.annotation, 

833 required=True, 

834 ) 

835 if extra_keys_type is not str: 

836 extras_keys_schema = self.generate_schema(extra_keys_type) 

837 if not typing_objects.is_any(extra_items_type): 

838 extras_schema = self.generate_schema(extra_items_type) 

839 

840 generic_origin: type[BaseModel] | None = getattr(cls, '__pydantic_generic_metadata__', {}).get('origin') 

841 

842 if cls.__pydantic_root_model__: 

843 # FIXME: should the common field metadata be used here? 

844 inner_schema, _ = self._common_field_schema('root', fields['root'], decorators) 

845 inner_schema = apply_model_validators(inner_schema, model_validators, 'inner') 

846 model_schema = core_schema.model_schema( 

847 cls, 

848 inner_schema, 

849 generic_origin=generic_origin, 

850 custom_init=getattr(cls, '__pydantic_custom_init__', None), 

851 root_model=True, 

852 post_init=getattr(cls, '__pydantic_post_init__', None), 

853 config=core_config, 

854 ref=model_ref, 

855 ) 

856 else: 

857 fields_schema: core_schema.CoreSchema = core_schema.model_fields_schema( 

858 {k: self._generate_md_field_schema(k, v, decorators) for k, v in fields.items()}, 

859 computed_fields=[ 

860 self._computed_field_schema(d, decorators.field_serializers) 

861 for d in computed_fields.values() 

862 ], 

863 extras_schema=extras_schema, 

864 extras_keys_schema=extras_keys_schema, 

865 model_name=cls.__name__, 

866 ) 

867 inner_schema = apply_validators(fields_schema, decorators.root_validators.values()) 

868 inner_schema = apply_model_validators(inner_schema, model_validators, 'inner') 

869 

870 model_schema = core_schema.model_schema( 

871 cls, 

872 inner_schema, 

873 generic_origin=generic_origin, 

874 custom_init=getattr(cls, '__pydantic_custom_init__', None), 

875 root_model=False, 

876 post_init=getattr(cls, '__pydantic_post_init__', None), 

877 config=core_config, 

878 ref=model_ref, 

879 ) 

880 

881 schema = self._apply_model_serializers(model_schema, decorators.model_serializers.values()) 

882 schema = apply_model_validators(schema, model_validators, 'outer') 

883 return self.defs.create_definition_reference_schema(schema) 

884 

885 def _resolve_self_type(self, obj: Any) -> Any: 

886 obj = self.model_type_stack.get() 

887 if obj is None: 

888 raise PydanticUserError('`typing.Self` is invalid in this context', code='invalid-self-type') 

889 return obj 

890 

891 def _generate_schema_from_get_schema_method(self, obj: Any, source: Any) -> core_schema.CoreSchema | None: 

892 BaseModel_ = import_cached_base_model() 

893 

894 get_schema = getattr(obj, '__get_pydantic_core_schema__', None) 

895 is_base_model_get_schema = ( 

896 getattr(get_schema, '__func__', None) is BaseModel_.__get_pydantic_core_schema__.__func__ # pyright: ignore[reportFunctionMemberAccess] 

897 ) 

898 

899 if ( 

900 get_schema is not None 

901 # BaseModel.__get_pydantic_core_schema__ is defined for backwards compatibility, 

902 # to allow existing code to call `super().__get_pydantic_core_schema__` in Pydantic 

903 # model that overrides `__get_pydantic_core_schema__`. However, it raises a deprecation 

904 # warning stating that the method will be removed, and during the core schema gen we actually 

905 # don't call the method: 

906 and not is_base_model_get_schema 

907 ): 

908 # Some referenceable types might have a `__get_pydantic_core_schema__` method 

909 # defined on it by users (e.g. on a dataclass). This generally doesn't play well 

910 # as these types are already recognized by the `GenerateSchema` class and isn't ideal 

911 # as we might end up calling `get_schema_or_ref` (expensive) on types that are actually 

912 # not referenceable: 

913 with self.defs.get_schema_or_ref(obj) as (_, maybe_schema): 

914 if maybe_schema is not None: 

915 return maybe_schema 

916 

917 if obj is source: 

918 ref_mode = 'unpack' 

919 else: 

920 ref_mode = 'to-def' 

921 schema = get_schema( 

922 source, CallbackGetCoreSchemaHandler(self._generate_schema_inner, self, ref_mode=ref_mode) 

923 ) 

924 if schema['type'] == 'definitions': 

925 schema = self.defs.unpack_definitions(schema) 

926 

927 ref = get_ref(schema) 

928 if ref: 

929 return self.defs.create_definition_reference_schema(schema) 

930 

931 # Note: if schema is of type `'definition-ref'`, we might want to copy it as a 

932 # safety measure (because these are inlined in place -- i.e. mutated directly) 

933 return schema 

934 

935 if get_schema is None and (validators := getattr(obj, '__get_validators__', None)) is not None: 

936 from pydantic.v1 import BaseModel as BaseModelV1 

937 

938 if issubclass(obj, BaseModelV1): 

939 warnings.warn( 

940 f'Mixing V1 models and V2 models (or constructs, like `TypeAdapter`) is not supported. Please upgrade `{obj.__name__}` to V2.', 

941 UserWarning, 

942 ) 

943 else: 

944 warnings.warn( 

945 '`__get_validators__` is deprecated and will be removed, use `__get_pydantic_core_schema__` instead.', 

946 PydanticDeprecatedSince20, 

947 ) 

948 return core_schema.chain_schema([core_schema.with_info_plain_validator_function(v) for v in validators()]) 

949 

950 def _resolve_forward_ref(self, obj: Any) -> Any: 

951 # we assume that types_namespace has the target of forward references in its scope, 

952 # but this could fail, for example, if calling Validator on an imported type which contains 

953 # forward references to other types only defined in the module from which it was imported 

954 # `Validator(SomeImportedTypeAliasWithAForwardReference)` 

955 # or the equivalent for BaseModel 

956 # class Model(BaseModel): 

957 # x: SomeImportedTypeAliasWithAForwardReference 

958 try: 

959 obj = _typing_extra.eval_type_backport(obj, *self._types_namespace) 

960 except NameError as e: 

961 raise PydanticUndefinedAnnotation.from_name_error(e) from e 

962 

963 # if obj is still a ForwardRef, it means we can't evaluate it, raise PydanticUndefinedAnnotation 

964 if isinstance(obj, ForwardRef): 

965 raise PydanticUndefinedAnnotation(obj.__forward_arg__, f'Unable to evaluate forward reference {obj}') 

966 

967 if self._typevars_map: 

968 obj = replace_types(obj, self._typevars_map) 

969 

970 return obj 

971 

972 @overload 

973 def _get_args_resolving_forward_refs(self, obj: Any, required: Literal[True]) -> tuple[Any, ...]: ... 

974 

975 @overload 

976 def _get_args_resolving_forward_refs(self, obj: Any) -> tuple[Any, ...] | None: ... 

977 

978 def _get_args_resolving_forward_refs(self, obj: Any, required: bool = False) -> tuple[Any, ...] | None: 

979 args = get_args(obj) 

980 if args: 

981 if isinstance(obj, GenericAlias): 

982 # PEP 585 generic aliases don't convert args to ForwardRefs, unlike `typing.List/Dict` etc. 

983 # This was fixed in https://github.com/python/cpython/pull/30900 (Python 3.11). 

984 # TODO: this shouldn't be necessary (probably even this `_get_args_resolving_forward_refs()` function) 

985 # once we drop support for Python 3.10 *or* if we implement our own `typing._eval_type()` implementation. 

986 args = (_typing_extra._make_forward_ref(a) if isinstance(a, str) else a for a in args) 

987 args = tuple(self._resolve_forward_ref(a) if isinstance(a, ForwardRef) else a for a in args) 

988 elif required: # pragma: no cover 

989 raise TypeError(f'Expected {obj} to have generic parameters but it had none') 

990 return args 

991 

992 def _get_first_arg_or_any(self, obj: Any) -> Any: 

993 args = self._get_args_resolving_forward_refs(obj) 

994 if not args: 

995 return Any 

996 return args[0] 

997 

998 def _get_first_two_args_or_any(self, obj: Any) -> tuple[Any, Any]: 

999 args = self._get_args_resolving_forward_refs(obj) 

1000 if not args: 

1001 return (Any, Any) 

1002 if len(args) < 2: 

1003 origin = get_origin(obj) 

1004 raise TypeError(f'Expected two type arguments for {origin}, got 1') 

1005 return args[0], args[1] 

1006 

1007 def _generate_schema_inner(self, obj: Any) -> core_schema.CoreSchema: 

1008 if typing_objects.is_self(obj): 

1009 obj = self._resolve_self_type(obj) 

1010 

1011 if typing_objects.is_annotated(get_origin(obj)): 

1012 return self._annotated_schema(obj) 

1013 

1014 if isinstance(obj, dict): 

1015 # we assume this is already a valid schema 

1016 return obj # type: ignore[return-value] 

1017 

1018 if isinstance(obj, str): 

1019 obj = ForwardRef(obj) 

1020 

1021 if isinstance(obj, ForwardRef): 

1022 return self.generate_schema(self._resolve_forward_ref(obj)) 

1023 

1024 BaseModel = import_cached_base_model() 

1025 

1026 if lenient_issubclass(obj, BaseModel): 

1027 with self.model_type_stack.push(obj): 

1028 return self._model_schema(obj) 

1029 

1030 if isinstance(obj, PydanticRecursiveRef): 

1031 return core_schema.definition_reference_schema(schema_ref=obj.type_ref) 

1032 

1033 return self.match_type(obj) 

1034 

1035 def match_type(self, obj: Any) -> core_schema.CoreSchema: # noqa: C901 

1036 """Main mapping of types to schemas. 

1037 

1038 The general structure is a series of if statements starting with the simple cases 

1039 (non-generic primitive types) and then handling generics and other more complex cases. 

1040 

1041 Each case either generates a schema directly, calls into a public user-overridable method 

1042 (like `GenerateSchema.tuple_variable_schema`) or calls into a private method that handles some 

1043 boilerplate before calling into the user-facing method (e.g. `GenerateSchema._tuple_schema`). 

1044 

1045 The idea is that we'll evolve this into adding more and more user facing methods over time 

1046 as they get requested and we figure out what the right API for them is. 

1047 """ 

1048 if obj is str: 

1049 return core_schema.str_schema() 

1050 elif obj is bytes: 

1051 return core_schema.bytes_schema() 

1052 elif obj is int: 

1053 return core_schema.int_schema() 

1054 elif obj is float: 

1055 return core_schema.float_schema() 

1056 elif obj is bool: 

1057 return core_schema.bool_schema() 

1058 elif obj is complex: 

1059 return core_schema.complex_schema() 

1060 elif typing_objects.is_any(obj) or obj is object: 

1061 return core_schema.any_schema() 

1062 elif obj is datetime.date: 

1063 return core_schema.date_schema() 

1064 elif obj is datetime.datetime: 

1065 return core_schema.datetime_schema() 

1066 elif obj is datetime.time: 

1067 return core_schema.time_schema() 

1068 elif obj is datetime.timedelta: 

1069 return core_schema.timedelta_schema() 

1070 elif obj is Decimal: 

1071 return core_schema.decimal_schema() 

1072 elif obj is UUID: 

1073 return core_schema.uuid_schema() 

1074 elif obj is Url: 

1075 return core_schema.url_schema() 

1076 elif obj is Fraction: 

1077 return self._fraction_schema() 

1078 elif obj is MultiHostUrl: 

1079 return core_schema.multi_host_url_schema() 

1080 elif obj is None or obj is _typing_extra.NoneType: 

1081 return core_schema.none_schema() 

1082 if obj is MISSING: 

1083 return core_schema.missing_sentinel_schema() 

1084 elif obj in IP_TYPES: 

1085 return self._ip_schema(obj) 

1086 elif obj in TUPLE_TYPES: 

1087 return self._tuple_schema(obj) 

1088 elif obj in LIST_TYPES: 

1089 return self._list_schema(Any) 

1090 elif obj in SET_TYPES: 

1091 return self._set_schema(Any) 

1092 elif obj in FROZEN_SET_TYPES: 

1093 return self._frozenset_schema(Any) 

1094 elif obj in SEQUENCE_TYPES: 

1095 return self._sequence_schema(Any) 

1096 elif obj in ITERABLE_TYPES: 

1097 return self._iterable_schema(obj) 

1098 elif obj in DICT_TYPES: 

1099 return self._dict_schema(Any, Any) 

1100 elif obj in PATH_TYPES: 

1101 return self._path_schema(obj, Any) 

1102 elif obj in DEQUE_TYPES: 

1103 return self._deque_schema(Any) 

1104 elif obj in MAPPING_TYPES: 

1105 return self._mapping_schema(obj, Any, Any) 

1106 elif obj in COUNTER_TYPES: 

1107 return self._mapping_schema(obj, Any, int) 

1108 elif typing_objects.is_typealiastype(obj): 

1109 return self._type_alias_type_schema(obj) 

1110 elif obj is type: 

1111 return self._type_schema() 

1112 elif _typing_extra.is_callable(obj): 

1113 return core_schema.callable_schema() 

1114 elif typing_objects.is_literal(get_origin(obj)): 

1115 return self._literal_schema(obj) 

1116 elif is_typeddict(obj): 

1117 return self._typed_dict_schema(obj, None) 

1118 elif inspect.isclass(obj) and issubclass(obj, Enum): 

1119 # NOTE: this must come before the `is_namedtuple()` check as enums values 

1120 # can be namedtuples: 

1121 return self._enum_schema(obj) 

1122 elif _typing_extra.is_namedtuple(obj): 

1123 return self._namedtuple_schema(obj, None) 

1124 elif typing_objects.is_newtype(obj): 

1125 # NewType, can't use isinstance because it fails <3.10 

1126 return self.generate_schema(obj.__supertype__) 

1127 elif obj in PATTERN_TYPES: 

1128 return self._pattern_schema(obj) 

1129 elif _typing_extra.is_hashable(obj): 

1130 return self._hashable_schema() 

1131 elif isinstance(obj, typing.TypeVar): 

1132 return self._unsubstituted_typevar_schema(obj) 

1133 elif _typing_extra.is_finalvar(obj): 

1134 if obj is Final: 

1135 return core_schema.any_schema() 

1136 return self.generate_schema( 

1137 self._get_first_arg_or_any(obj), 

1138 ) 

1139 elif isinstance(obj, VALIDATE_CALL_SUPPORTED_TYPES): 

1140 return self._call_schema(obj) # pyright: ignore[reportArgumentType] 

1141 elif obj is ZoneInfo: 

1142 return self._zoneinfo_schema() 

1143 

1144 # dataclasses.is_dataclass coerces dc instances to types, but we only handle 

1145 # the case of a dc type here 

1146 if dataclasses.is_dataclass(obj): 

1147 return self._dataclass_schema(obj, None) # pyright: ignore[reportArgumentType] 

1148 

1149 origin = get_origin(obj) 

1150 if origin is not None: 

1151 return self._match_generic_type(obj, origin) 

1152 

1153 if self._arbitrary_types: 

1154 return self._arbitrary_type_schema(obj) 

1155 return self._unknown_type_schema(obj) 

1156 

1157 def _match_generic_type(self, obj: Any, origin: Any) -> CoreSchema: # noqa: C901 

1158 # Need to handle generic dataclasses before looking for the schema properties because attribute accesses 

1159 # on _GenericAlias delegate to the origin type, so lose the information about the concrete parametrization 

1160 # As a result, currently, there is no way to cache the schema for generic dataclasses. This may be possible 

1161 # to resolve by modifying the value returned by `Generic.__class_getitem__`, but that is a dangerous game. 

1162 if dataclasses.is_dataclass(origin): 

1163 return self._dataclass_schema(obj, origin) # pyright: ignore[reportArgumentType] 

1164 if _typing_extra.is_namedtuple(origin): 

1165 return self._namedtuple_schema(obj, origin) 

1166 

1167 schema = self._generate_schema_from_get_schema_method(origin, obj) 

1168 if schema is not None: 

1169 return schema 

1170 

1171 if typing_objects.is_typealiastype(origin): 

1172 return self._type_alias_type_schema(obj) 

1173 elif is_union_origin(origin): 

1174 return self._union_schema(obj) 

1175 elif origin in TUPLE_TYPES: 

1176 return self._tuple_schema(obj) 

1177 elif origin in LIST_TYPES: 

1178 return self._list_schema(self._get_first_arg_or_any(obj)) 

1179 elif origin in SET_TYPES: 

1180 return self._set_schema(self._get_first_arg_or_any(obj)) 

1181 elif origin in FROZEN_SET_TYPES: 

1182 return self._frozenset_schema(self._get_first_arg_or_any(obj)) 

1183 elif origin in DICT_TYPES: 

1184 return self._dict_schema(*self._get_first_two_args_or_any(obj)) 

1185 elif origin in PATH_TYPES: 

1186 return self._path_schema(origin, self._get_first_arg_or_any(obj)) 

1187 elif origin in DEQUE_TYPES: 

1188 return self._deque_schema(self._get_first_arg_or_any(obj)) 

1189 elif origin in MAPPING_TYPES: 

1190 return self._mapping_schema(origin, *self._get_first_two_args_or_any(obj)) 

1191 elif origin in COUNTER_TYPES: 

1192 return self._mapping_schema(origin, self._get_first_arg_or_any(obj), int) 

1193 elif is_typeddict(origin): 

1194 return self._typed_dict_schema(obj, origin) 

1195 elif origin in TYPE_TYPES: 

1196 return self._subclass_schema(obj) 

1197 elif origin in SEQUENCE_TYPES: 

1198 return self._sequence_schema(self._get_first_arg_or_any(obj)) 

1199 elif origin in ITERABLE_TYPES: 

1200 return self._iterable_schema(obj) 

1201 elif origin in PATTERN_TYPES: 

1202 return self._pattern_schema(obj) 

1203 

1204 if self._arbitrary_types: 

1205 return self._arbitrary_type_schema(origin) 

1206 return self._unknown_type_schema(obj) 

1207 

1208 def _generate_td_field_schema( 

1209 self, 

1210 name: str, 

1211 field_info: FieldInfo, 

1212 decorators: DecoratorInfos, 

1213 *, 

1214 required: bool = True, 

1215 ) -> core_schema.TypedDictField: 

1216 """Prepare a TypedDictField to represent a model or typeddict field.""" 

1217 schema, metadata = self._common_field_schema(name, field_info, decorators) 

1218 return core_schema.typed_dict_field( 

1219 schema, 

1220 required=False if not field_info.is_required() else required, 

1221 serialization_exclude=field_info.exclude, 

1222 validation_alias=_convert_to_aliases(field_info.validation_alias), 

1223 serialization_alias=field_info.serialization_alias, 

1224 serialization_exclude_if=field_info.exclude_if, 

1225 metadata=metadata, 

1226 ) 

1227 

1228 def _generate_md_field_schema( 

1229 self, 

1230 name: str, 

1231 field_info: FieldInfo, 

1232 decorators: DecoratorInfos, 

1233 ) -> core_schema.ModelField: 

1234 """Prepare a ModelField to represent a model field.""" 

1235 schema, metadata = self._common_field_schema(name, field_info, decorators) 

1236 return core_schema.model_field( 

1237 schema, 

1238 serialization_exclude=field_info.exclude, 

1239 validation_alias=_convert_to_aliases(field_info.validation_alias), 

1240 serialization_alias=field_info.serialization_alias, 

1241 serialization_exclude_if=field_info.exclude_if, 

1242 frozen=field_info.frozen, 

1243 metadata=metadata, 

1244 ) 

1245 

1246 def _generate_dc_field_schema( 

1247 self, 

1248 name: str, 

1249 field_info: FieldInfo, 

1250 decorators: DecoratorInfos, 

1251 ) -> core_schema.DataclassField: 

1252 """Prepare a DataclassField to represent the parameter/field, of a dataclass.""" 

1253 schema, metadata = self._common_field_schema(name, field_info, decorators) 

1254 return core_schema.dataclass_field( 

1255 name, 

1256 schema, 

1257 init=field_info.init, 

1258 init_only=field_info.init_var or None, 

1259 kw_only=None if field_info.kw_only else False, 

1260 serialization_exclude=field_info.exclude, 

1261 validation_alias=_convert_to_aliases(field_info.validation_alias), 

1262 serialization_alias=field_info.serialization_alias, 

1263 serialization_exclude_if=field_info.exclude_if, 

1264 frozen=field_info.frozen, 

1265 metadata=metadata, 

1266 ) 

1267 

1268 def _common_field_schema( # C901 

1269 self, name: str, field_info: FieldInfo, decorators: DecoratorInfos 

1270 ) -> tuple[CoreSchema, dict[str, Any]]: 

1271 source_type, annotations = field_info.annotation, field_info.metadata 

1272 

1273 def set_discriminator(schema: CoreSchema) -> CoreSchema: 

1274 schema = self._apply_discriminator_to_union(schema, field_info.discriminator) 

1275 return schema 

1276 

1277 # Convert `@field_validator` decorators to `Before/After/Plain/WrapValidator` instances: 

1278 validators_from_decorators = [ 

1279 _mode_to_validator[decorator.info.mode]._from_decorator(decorator) 

1280 for decorator in filter_field_decorator_info_by_field(decorators.field_validators.values(), name) 

1281 ] 

1282 

1283 with self.field_name_stack.push(name): 

1284 if field_info.discriminator is not None: 

1285 schema = self._apply_annotations( 

1286 source_type, annotations + validators_from_decorators, transform_inner_schema=set_discriminator 

1287 ) 

1288 else: 

1289 schema = self._apply_annotations( 

1290 source_type, 

1291 annotations + validators_from_decorators, 

1292 ) 

1293 

1294 # This V1 compatibility shim should eventually be removed 

1295 # push down any `each_item=True` validators 

1296 # note that this won't work for any Annotated types that get wrapped by a function validator 

1297 # but that's okay because that didn't exist in V1 

1298 this_field_validators = filter_field_decorator_info_by_field(decorators.validators.values(), name) 

1299 if _validators_require_validate_default(this_field_validators): 

1300 field_info.validate_default = True 

1301 each_item_validators = [v for v in this_field_validators if v.info.each_item is True] 

1302 this_field_validators = [v for v in this_field_validators if v not in each_item_validators] 

1303 schema = apply_each_item_validators(schema, each_item_validators) 

1304 

1305 schema = apply_validators(schema, this_field_validators) 

1306 

1307 # the default validator needs to go outside of any other validators 

1308 # so that it is the topmost validator for the field validator 

1309 # which uses it to check if the field has a default value or not 

1310 if not field_info.is_required(): 

1311 schema = wrap_default(field_info, schema) 

1312 

1313 schema = self._apply_field_serializers( 

1314 schema, filter_field_decorator_info_by_field(decorators.field_serializers.values(), name) 

1315 ) 

1316 

1317 pydantic_js_updates, pydantic_js_extra = _extract_json_schema_info_from_field_info(field_info) 

1318 core_metadata: dict[str, Any] = {} 

1319 update_core_metadata( 

1320 core_metadata, pydantic_js_updates=pydantic_js_updates, pydantic_js_extra=pydantic_js_extra 

1321 ) 

1322 

1323 return schema, core_metadata 

1324 

1325 def _union_schema(self, union_type: Any) -> core_schema.CoreSchema: 

1326 """Generate schema for a Union.""" 

1327 args = self._get_args_resolving_forward_refs(union_type, required=True) 

1328 choices: list[CoreSchema] = [] 

1329 nullable = False 

1330 for arg in args: 

1331 if arg is None or arg is _typing_extra.NoneType: 

1332 nullable = True 

1333 else: 

1334 choices.append(self.generate_schema(arg)) 

1335 

1336 if len(choices) == 1: 

1337 s = choices[0] 

1338 else: 

1339 choices_with_tags: list[CoreSchema | tuple[CoreSchema, str]] = [] 

1340 for choice in choices: 

1341 tag = cast(CoreMetadata, choice.get('metadata', {})).get('pydantic_internal_union_tag_key') 

1342 if tag is not None: 

1343 choices_with_tags.append((choice, tag)) 

1344 else: 

1345 choices_with_tags.append(choice) 

1346 s = core_schema.union_schema(choices_with_tags) 

1347 

1348 if nullable: 

1349 s = core_schema.nullable_schema(s) 

1350 return s 

1351 

1352 def _type_alias_type_schema(self, obj: TypeAliasType) -> CoreSchema: 

1353 with self.defs.get_schema_or_ref(obj) as (ref, maybe_schema): 

1354 if maybe_schema is not None: 

1355 return maybe_schema 

1356 

1357 origin: TypeAliasType = get_origin(obj) or obj 

1358 typevars_map = get_standard_typevars_map(obj) 

1359 

1360 with self._ns_resolver.push(origin): 

1361 try: 

1362 annotation = _typing_extra.eval_type(origin.__value__, *self._types_namespace) 

1363 except NameError as e: 

1364 raise PydanticUndefinedAnnotation.from_name_error(e) from e 

1365 annotation = replace_types(annotation, typevars_map) 

1366 schema = self.generate_schema(annotation) 

1367 assert schema['type'] != 'definitions' 

1368 schema['ref'] = ref # type: ignore 

1369 return self.defs.create_definition_reference_schema(schema) 

1370 

1371 def _literal_schema(self, literal_type: Any) -> CoreSchema: 

1372 """Generate schema for a Literal.""" 

1373 expected = list(get_literal_values(literal_type, type_check=False, unpack_type_aliases='eager')) 

1374 assert expected, f'literal "expected" cannot be empty, obj={literal_type}' 

1375 schema = core_schema.literal_schema(expected) 

1376 

1377 if self._config_wrapper.use_enum_values and any(isinstance(v, Enum) for v in expected): 

1378 schema = core_schema.no_info_after_validator_function( 

1379 lambda v: v.value if isinstance(v, Enum) else v, schema 

1380 ) 

1381 

1382 return schema 

1383 

1384 def _typed_dict_schema(self, typed_dict_cls: Any, origin: Any) -> core_schema.CoreSchema: 

1385 """Generate a core schema for a `TypedDict` class. 

1386 

1387 To be able to build a `DecoratorInfos` instance for the `TypedDict` class (which will include 

1388 validators, serializers, etc.), we need to have access to the original bases of the class 

1389 (see https://docs.python.org/3/library/types.html#types.get_original_bases). 

1390 However, the `__orig_bases__` attribute was only added in 3.12 (https://github.com/python/cpython/pull/103698). 

1391 

1392 For this reason, we require Python 3.12 (or using the `typing_extensions` backport). 

1393 """ 

1394 FieldInfo = import_cached_field_info() 

1395 

1396 with ( 

1397 self.model_type_stack.push(typed_dict_cls), 

1398 self.defs.get_schema_or_ref(typed_dict_cls) as ( 

1399 typed_dict_ref, 

1400 maybe_schema, 

1401 ), 

1402 ): 

1403 if maybe_schema is not None: 

1404 return maybe_schema 

1405 

1406 typevars_map = get_standard_typevars_map(typed_dict_cls) 

1407 if origin is not None: 

1408 typed_dict_cls = origin 

1409 

1410 if not _SUPPORTS_TYPEDDICT and type(typed_dict_cls).__module__ == 'typing': 

1411 raise PydanticUserError( 

1412 'Please use `typing_extensions.TypedDict` instead of `typing.TypedDict` on Python < 3.12.', 

1413 code='typed-dict-version', 

1414 ) 

1415 

1416 try: 

1417 # if a typed dictionary class doesn't have config, we use the parent's config, hence a default of `None` 

1418 # see https://github.com/pydantic/pydantic/issues/10917 

1419 config: ConfigDict | None = get_attribute_from_bases(typed_dict_cls, '__pydantic_config__') 

1420 except AttributeError: 

1421 config = None 

1422 

1423 with self._config_wrapper_stack.push(config): 

1424 core_config = self._config_wrapper.core_config(title=typed_dict_cls.__name__) 

1425 

1426 required_keys: frozenset[str] = typed_dict_cls.__required_keys__ 

1427 

1428 fields: dict[str, core_schema.TypedDictField] = {} 

1429 

1430 decorators = DecoratorInfos.build(typed_dict_cls, replace_wrapped_methods=False) 

1431 decorators.update_from_config(self._config_wrapper) 

1432 

1433 if self._config_wrapper.use_attribute_docstrings: 

1434 field_docstrings = extract_docstrings_from_cls(typed_dict_cls, use_inspect=True) 

1435 else: 

1436 field_docstrings = None 

1437 

1438 try: 

1439 annotations = _typing_extra.get_cls_type_hints(typed_dict_cls, ns_resolver=self._ns_resolver) 

1440 except NameError as e: 

1441 raise PydanticUndefinedAnnotation.from_name_error(e) from e 

1442 

1443 readonly_fields: list[str] = [] 

1444 

1445 for field_name, annotation in annotations.items(): 

1446 field_info = FieldInfo.from_annotation(annotation, _source=AnnotationSource.TYPED_DICT) 

1447 field_info.annotation = replace_types(field_info.annotation, typevars_map) 

1448 

1449 required = ( 

1450 field_name in required_keys or 'required' in field_info._qualifiers 

1451 ) and 'not_required' not in field_info._qualifiers 

1452 if 'read_only' in field_info._qualifiers: 

1453 readonly_fields.append(field_name) 

1454 

1455 if ( 

1456 field_docstrings is not None 

1457 and field_info.description is None 

1458 and field_name in field_docstrings 

1459 ): 

1460 field_info.description = field_docstrings[field_name] 

1461 update_field_from_config(self._config_wrapper, field_name, field_info) 

1462 

1463 fields[field_name] = self._generate_td_field_schema( 

1464 field_name, field_info, decorators, required=required 

1465 ) 

1466 

1467 if readonly_fields: 

1468 fields_repr = ', '.join(repr(f) for f in readonly_fields) 

1469 plural = len(readonly_fields) >= 2 

1470 warnings.warn( 

1471 f'Item{"s" if plural else ""} {fields_repr} on TypedDict class {typed_dict_cls.__name__!r} ' 

1472 f'{"are" if plural else "is"} using the `ReadOnly` qualifier. Pydantic will not protect items ' 

1473 'from any mutation on dictionary instances.', 

1474 UserWarning, 

1475 ) 

1476 

1477 extra_behavior: core_schema.ExtraBehavior = 'ignore' 

1478 extras_schema: CoreSchema | None = None # For 'allow', equivalent to `Any` - no validation performed. 

1479 

1480 # `__closed__` is `None` when not specified (equivalent to `False`): 

1481 is_closed = bool(getattr(typed_dict_cls, '__closed__', False)) 

1482 extra_items = getattr(typed_dict_cls, '__extra_items__', typing_extensions.NoExtraItems) 

1483 if is_closed: 

1484 extra_behavior = 'forbid' 

1485 extras_schema = None 

1486 elif not typing_objects.is_noextraitems(extra_items): 

1487 extra_behavior = 'allow' 

1488 extras_schema = self.generate_schema(replace_types(extra_items, typevars_map)) 

1489 

1490 if (config_extra := self._config_wrapper.extra) in ('allow', 'forbid'): 

1491 if is_closed and config_extra == 'allow': 

1492 warnings.warn( 

1493 f"TypedDict class {typed_dict_cls.__qualname__!r} is closed, but 'extra' configuration " 

1494 "is set to `'allow'`. The 'extra' configuration value will be ignored.", 

1495 category=TypedDictExtraConfigWarning, 

1496 ) 

1497 elif not typing_objects.is_noextraitems(extra_items) and config_extra == 'forbid': 

1498 warnings.warn( 

1499 f"TypedDict class {typed_dict_cls.__qualname__!r} allows extra items, but 'extra' configuration " 

1500 "is set to `'forbid'`. The 'extra' configuration value will be ignored.", 

1501 category=TypedDictExtraConfigWarning, 

1502 ) 

1503 else: 

1504 extra_behavior = config_extra 

1505 

1506 td_schema = core_schema.typed_dict_schema( 

1507 fields, 

1508 cls=typed_dict_cls, 

1509 computed_fields=[ 

1510 self._computed_field_schema(d, decorators.field_serializers) 

1511 for d in decorators.computed_fields.values() 

1512 ], 

1513 extra_behavior=extra_behavior, 

1514 extras_schema=extras_schema, 

1515 ref=typed_dict_ref, 

1516 config=core_config, 

1517 ) 

1518 

1519 schema = self._apply_model_serializers(td_schema, decorators.model_serializers.values()) 

1520 schema = apply_model_validators(schema, decorators.model_validators.values(), 'all') 

1521 return self.defs.create_definition_reference_schema(schema) 

1522 

1523 def _namedtuple_schema(self, namedtuple_cls: Any, origin: Any) -> core_schema.CoreSchema: 

1524 """Generate schema for a NamedTuple.""" 

1525 with ( 

1526 self.model_type_stack.push(namedtuple_cls), 

1527 self.defs.get_schema_or_ref(namedtuple_cls) as ( 

1528 namedtuple_ref, 

1529 maybe_schema, 

1530 ), 

1531 ): 

1532 if maybe_schema is not None: 

1533 return maybe_schema 

1534 typevars_map = get_standard_typevars_map(namedtuple_cls) 

1535 if origin is not None: 

1536 namedtuple_cls = origin 

1537 

1538 try: 

1539 annotations = _typing_extra.get_cls_type_hints(namedtuple_cls, ns_resolver=self._ns_resolver) 

1540 except NameError as e: 

1541 raise PydanticUndefinedAnnotation.from_name_error(e) from e 

1542 

1543 # Filter annotations to only include fields that are actually in the NamedTuple 

1544 # (as subclassing an existing NamedTuple is not supported yet - see https://github.com/python/typing/issues/427) 

1545 # and use `Any` if no annotation exist (i.e. when using `collections.namedtuple()`). 

1546 annotations = {field_name: annotations.get(field_name, Any) for field_name in namedtuple_cls._fields} 

1547 

1548 if typevars_map: 

1549 annotations = { 

1550 field_name: replace_types(annotation, typevars_map) 

1551 for field_name, annotation in annotations.items() 

1552 } 

1553 

1554 arguments_schema = core_schema.arguments_schema( 

1555 [ 

1556 self._generate_parameter_schema( 

1557 field_name, 

1558 annotation, 

1559 source=AnnotationSource.NAMED_TUPLE, 

1560 default=namedtuple_cls._field_defaults.get(field_name, Parameter.empty), 

1561 ) 

1562 for field_name, annotation in annotations.items() 

1563 ], 

1564 metadata={'pydantic_js_prefer_positional_arguments': True}, 

1565 ) 

1566 schema = core_schema.call_schema(arguments_schema, namedtuple_cls, ref=namedtuple_ref) 

1567 return self.defs.create_definition_reference_schema(schema) 

1568 

1569 def _generate_parameter_schema( 

1570 self, 

1571 name: str, 

1572 annotation: type[Any], 

1573 source: AnnotationSource, 

1574 default: Any = Parameter.empty, 

1575 mode: Literal['positional_only', 'positional_or_keyword', 'keyword_only'] | None = None, 

1576 ) -> core_schema.ArgumentsParameter: 

1577 """Generate the definition of a field in a namedtuple or a parameter in a function signature. 

1578 

1579 This definition is meant to be used for the `'arguments'` core schema, which will be replaced 

1580 in V3 by the `'arguments-v3`'. 

1581 """ 

1582 FieldInfo = import_cached_field_info() 

1583 

1584 if default is Parameter.empty: 

1585 field = FieldInfo.from_annotation(annotation, _source=source) 

1586 else: 

1587 field = FieldInfo.from_annotated_attribute(annotation, default, _source=source) 

1588 

1589 assert field.annotation is not None, 'field.annotation should not be None when generating a schema' 

1590 update_field_from_config(self._config_wrapper, name, field) 

1591 

1592 with self.field_name_stack.push(name): 

1593 schema = self._apply_annotations( 

1594 field.annotation, 

1595 [field], 

1596 # Because we pass `field` as metadata above (required for attributes relevant for 

1597 # JSON Scheme generation), we need to ignore the potential warnings about `FieldInfo` 

1598 # attributes that will not be used: 

1599 check_unsupported_field_info_attributes=False, 

1600 ) 

1601 

1602 if not field.is_required(): 

1603 schema = wrap_default(field, schema) 

1604 

1605 parameter_schema = core_schema.arguments_parameter( 

1606 name, 

1607 schema, 

1608 mode=mode, 

1609 alias=_convert_to_aliases(field.validation_alias), 

1610 ) 

1611 

1612 return parameter_schema 

1613 

1614 def _generate_parameter_v3_schema( 

1615 self, 

1616 name: str, 

1617 annotation: Any, 

1618 source: AnnotationSource, 

1619 mode: Literal[ 

1620 'positional_only', 

1621 'positional_or_keyword', 

1622 'keyword_only', 

1623 'var_args', 

1624 'var_kwargs_uniform', 

1625 'var_kwargs_unpacked_typed_dict', 

1626 ], 

1627 default: Any = Parameter.empty, 

1628 ) -> core_schema.ArgumentsV3Parameter: 

1629 """Generate the definition of a parameter in a function signature. 

1630 

1631 This definition is meant to be used for the `'arguments-v3'` core schema, which will replace 

1632 the `'arguments`' schema in V3. 

1633 """ 

1634 FieldInfo = import_cached_field_info() 

1635 

1636 if default is Parameter.empty: 

1637 field = FieldInfo.from_annotation(annotation, _source=source) 

1638 else: 

1639 field = FieldInfo.from_annotated_attribute(annotation, default, _source=source) 

1640 update_field_from_config(self._config_wrapper, name, field) 

1641 

1642 with self.field_name_stack.push(name): 

1643 schema = self._apply_annotations( 

1644 field.annotation, 

1645 [field], 

1646 # Because we pass `field` as metadata above (required for attributes relevant for 

1647 # JSON Scheme generation), we need to ignore the potential warnings about `FieldInfo` 

1648 # attributes that will not be used: 

1649 check_unsupported_field_info_attributes=False, 

1650 ) 

1651 

1652 if not field.is_required(): 

1653 schema = wrap_default(field, schema) 

1654 

1655 parameter_schema = core_schema.arguments_v3_parameter( 

1656 name=name, 

1657 schema=schema, 

1658 mode=mode, 

1659 alias=_convert_to_aliases(field.validation_alias), 

1660 ) 

1661 

1662 return parameter_schema 

1663 

1664 def _tuple_schema(self, tuple_type: Any) -> core_schema.CoreSchema: 

1665 """Generate schema for a Tuple, e.g. `tuple[int, str]` or `tuple[int, ...]`.""" 

1666 # TODO: do we really need to resolve type vars here? 

1667 typevars_map = get_standard_typevars_map(tuple_type) 

1668 params = self._get_args_resolving_forward_refs(tuple_type) 

1669 

1670 if typevars_map and params: 

1671 params = tuple(replace_types(param, typevars_map) for param in params) 

1672 

1673 # NOTE: subtle difference: `tuple[()]` gives `params=()`, whereas `typing.Tuple[()]` gives `params=((),)` 

1674 # This is only true for <3.11, on Python 3.11+ `typing.Tuple[()]` gives `params=()` 

1675 if not params: 

1676 if tuple_type in TUPLE_TYPES: 

1677 return core_schema.tuple_schema([core_schema.any_schema()], variadic_item_index=0) 

1678 else: 

1679 # special case for `tuple[()]` which means `tuple[]` - an empty tuple 

1680 return core_schema.tuple_schema([]) 

1681 elif params[-1] is Ellipsis: 

1682 if len(params) == 2: 

1683 return core_schema.tuple_schema([self.generate_schema(params[0])], variadic_item_index=0) 

1684 else: 

1685 # TODO: something like https://github.com/pydantic/pydantic/issues/5952 

1686 raise ValueError('Variable tuples can only have one type') 

1687 elif len(params) == 1 and params[0] == (): 

1688 # special case for `tuple[()]` which means `tuple[]` - an empty tuple 

1689 # NOTE: This conditional can be removed when we drop support for Python 3.10. 

1690 return core_schema.tuple_schema([]) 

1691 else: 

1692 return core_schema.tuple_schema([self.generate_schema(param) for param in params]) 

1693 

1694 def _type_schema(self) -> core_schema.CoreSchema: 

1695 return core_schema.custom_error_schema( 

1696 core_schema.is_instance_schema(type), 

1697 custom_error_type='is_type', 

1698 custom_error_message='Input should be a type', 

1699 ) 

1700 

1701 def _zoneinfo_schema(self) -> core_schema.CoreSchema: 

1702 """Generate schema for a zone_info.ZoneInfo object""" 

1703 from ._validators import validate_str_is_valid_iana_tz 

1704 

1705 metadata = {'pydantic_js_functions': [lambda _1, _2: {'type': 'string', 'format': 'zoneinfo'}]} 

1706 return core_schema.no_info_plain_validator_function( 

1707 validate_str_is_valid_iana_tz, 

1708 serialization=core_schema.to_string_ser_schema(), 

1709 metadata=metadata, 

1710 ) 

1711 

1712 def _union_is_subclass_schema(self, union_type: Any) -> core_schema.CoreSchema: 

1713 """Generate schema for `type[Union[X, ...]]`.""" 

1714 args = self._get_args_resolving_forward_refs(union_type, required=True) 

1715 return core_schema.union_schema([self.generate_schema(type[args]) for args in args]) 

1716 

1717 def _subclass_schema(self, type_: Any) -> core_schema.CoreSchema: 

1718 """Generate schema for a type, e.g. `type[int]`.""" 

1719 type_param = self._get_first_arg_or_any(type_) 

1720 

1721 # Assume `type[Annotated[<typ>, ...]]` is equivalent to `type[<typ>]`: 

1722 type_param = _typing_extra.annotated_type(type_param) or type_param 

1723 

1724 if typing_objects.is_any(type_param): 

1725 return self._type_schema() 

1726 elif typing_objects.is_typealiastype(type_param): 

1727 return self.generate_schema(type[type_param.__value__]) 

1728 elif typing_objects.is_typevar(type_param): 

1729 if type_param.__bound__: 

1730 if is_union_origin(get_origin(type_param.__bound__)): 

1731 return self._union_is_subclass_schema(type_param.__bound__) 

1732 return core_schema.is_subclass_schema(type_param.__bound__) 

1733 elif type_param.__constraints__: 

1734 return core_schema.union_schema([self.generate_schema(type[c]) for c in type_param.__constraints__]) 

1735 else: 

1736 return self._type_schema() 

1737 elif is_union_origin(get_origin(type_param)): 

1738 return self._union_is_subclass_schema(type_param) 

1739 else: 

1740 if typing_objects.is_self(type_param): 

1741 type_param = self._resolve_self_type(type_param) 

1742 if _typing_extra.is_generic_alias(type_param): 

1743 raise PydanticUserError( 

1744 'Subscripting `type[]` with an already parametrized type is not supported. ' 

1745 f'Instead of using type[{type_param!r}], use type[{_repr.display_as_type(get_origin(type_param))}].', 

1746 code=None, 

1747 ) 

1748 if not inspect.isclass(type_param): 

1749 # when using type[None], this doesn't type convert to type[NoneType], and None isn't a class 

1750 # so we handle it manually here 

1751 if type_param is None: 

1752 return core_schema.is_subclass_schema(_typing_extra.NoneType) 

1753 raise TypeError(f'Expected a class, got {type_param!r}') 

1754 return core_schema.is_subclass_schema(type_param) 

1755 

1756 def _sequence_schema(self, items_type: Any) -> core_schema.CoreSchema: 

1757 """Generate schema for a Sequence, e.g. `Sequence[int]`.""" 

1758 from ._serializers import serialize_sequence_via_list 

1759 

1760 item_type_schema = self.generate_schema(items_type) 

1761 list_schema = core_schema.list_schema(item_type_schema) 

1762 

1763 json_schema = smart_deepcopy(list_schema) 

1764 python_schema = core_schema.is_instance_schema(typing.Sequence, cls_repr='Sequence') 

1765 if not typing_objects.is_any(items_type): 

1766 from ._validators import sequence_validator 

1767 

1768 python_schema = core_schema.chain_schema( 

1769 [python_schema, core_schema.no_info_wrap_validator_function(sequence_validator, list_schema)], 

1770 ) 

1771 

1772 serialization = core_schema.wrap_serializer_function_ser_schema( 

1773 serialize_sequence_via_list, schema=item_type_schema, info_arg=True 

1774 ) 

1775 return core_schema.json_or_python_schema( 

1776 json_schema=json_schema, python_schema=python_schema, serialization=serialization 

1777 ) 

1778 

1779 def _iterable_schema(self, type_: Any) -> core_schema.GeneratorSchema: 

1780 """Generate a schema for an `Iterable`.""" 

1781 item_type = self._get_first_arg_or_any(type_) 

1782 

1783 return core_schema.generator_schema(self.generate_schema(item_type)) 

1784 

1785 def _pattern_schema(self, pattern_type: Any) -> core_schema.CoreSchema: 

1786 from . import _validators 

1787 

1788 metadata = {'pydantic_js_functions': [lambda _1, _2: {'type': 'string', 'format': 'regex'}]} 

1789 ser = core_schema.plain_serializer_function_ser_schema( 

1790 attrgetter('pattern'), when_used='json', return_schema=core_schema.str_schema() 

1791 ) 

1792 if pattern_type is typing.Pattern or pattern_type is re.Pattern: 

1793 # bare type 

1794 return core_schema.no_info_plain_validator_function( 

1795 _validators.pattern_either_validator, serialization=ser, metadata=metadata 

1796 ) 

1797 

1798 param = self._get_args_resolving_forward_refs( 

1799 pattern_type, 

1800 required=True, 

1801 )[0] 

1802 if param is str: 

1803 return core_schema.no_info_plain_validator_function( 

1804 _validators.pattern_str_validator, serialization=ser, metadata=metadata 

1805 ) 

1806 elif param is bytes: 

1807 return core_schema.no_info_plain_validator_function( 

1808 _validators.pattern_bytes_validator, serialization=ser, metadata=metadata 

1809 ) 

1810 else: 

1811 raise PydanticSchemaGenerationError(f'Unable to generate pydantic-core schema for {pattern_type!r}.') 

1812 

1813 def _hashable_schema(self) -> core_schema.CoreSchema: 

1814 return core_schema.custom_error_schema( 

1815 schema=core_schema.json_or_python_schema( 

1816 json_schema=core_schema.chain_schema( 

1817 [core_schema.any_schema(), core_schema.is_instance_schema(collections.abc.Hashable)] 

1818 ), 

1819 python_schema=core_schema.is_instance_schema(collections.abc.Hashable), 

1820 ), 

1821 custom_error_type='is_hashable', 

1822 custom_error_message='Input should be hashable', 

1823 ) 

1824 

1825 def _dataclass_schema( 

1826 self, dataclass: type[StandardDataclass], origin: type[StandardDataclass] | None 

1827 ) -> core_schema.CoreSchema: 

1828 """Generate schema for a dataclass.""" 

1829 with ( 

1830 self.model_type_stack.push(dataclass), 

1831 self.defs.get_schema_or_ref(dataclass) as ( 

1832 dataclass_ref, 

1833 maybe_schema, 

1834 ), 

1835 ): 

1836 if maybe_schema is not None: 

1837 return maybe_schema 

1838 

1839 schema = dataclass.__dict__.get('__pydantic_core_schema__') 

1840 if schema is not None and not isinstance(schema, MockCoreSchema): 

1841 if schema['type'] == 'definitions': 

1842 schema = self.defs.unpack_definitions(schema) 

1843 ref = get_ref(schema) 

1844 if ref: 

1845 return self.defs.create_definition_reference_schema(schema) 

1846 else: 

1847 return schema 

1848 

1849 typevars_map = get_standard_typevars_map(dataclass) 

1850 if origin is not None: 

1851 dataclass = origin 

1852 

1853 # if (plain) dataclass doesn't have config, we use the parent's config, hence a default of `None` 

1854 # (Pydantic dataclasses have an empty dict config by default). 

1855 # see https://github.com/pydantic/pydantic/issues/10917 

1856 config = getattr(dataclass, '__pydantic_config__', None) 

1857 

1858 from ..dataclasses import is_pydantic_dataclass 

1859 

1860 with self._ns_resolver.push(dataclass), self._config_wrapper_stack.push(config): 

1861 if is_pydantic_dataclass(dataclass): 

1862 if dataclass.__pydantic_fields_complete__(): 

1863 # Copy the field info instances to avoid mutating the `FieldInfo` instances 

1864 # of the generic dataclass generic origin (e.g. `apply_typevars_map` below). 

1865 # Note that we don't apply `deepcopy` on `__pydantic_fields__` because we 

1866 # don't want to copy the `FieldInfo` attributes: 

1867 fields = { 

1868 f_name: copy(field_info) for f_name, field_info in dataclass.__pydantic_fields__.items() 

1869 } 

1870 if typevars_map: 

1871 for field in fields.values(): 

1872 field.apply_typevars_map(typevars_map, *self._types_namespace) 

1873 else: 

1874 try: 

1875 fields = rebuild_dataclass_fields( 

1876 dataclass, 

1877 config_wrapper=self._config_wrapper, 

1878 ns_resolver=self._ns_resolver, 

1879 typevars_map=typevars_map or {}, 

1880 ) 

1881 except NameError as e: 

1882 raise PydanticUndefinedAnnotation.from_name_error(e) from e 

1883 else: 

1884 fields = collect_dataclass_fields( 

1885 dataclass, 

1886 typevars_map=typevars_map, 

1887 config_wrapper=self._config_wrapper, 

1888 ) 

1889 

1890 if self._config_wrapper.extra == 'allow': 

1891 # disallow combination of init=False on a dataclass field and extra='allow' on a dataclass 

1892 for field_name, field in fields.items(): 

1893 if field.init is False: 

1894 raise PydanticUserError( 

1895 f'Field {field_name} has `init=False` and dataclass has config setting `extra="allow"`. ' 

1896 f'This combination is not allowed.', 

1897 code='dataclass-init-false-extra-allow', 

1898 ) 

1899 

1900 decorators = dataclass.__dict__.get('__pydantic_decorators__') 

1901 if decorators is None: 

1902 decorators = DecoratorInfos.build(dataclass, replace_wrapped_methods=False) 

1903 decorators.update_from_config(self._config_wrapper) 

1904 # Move kw_only=False args to the start of the list, as this is how vanilla dataclasses work. 

1905 # Note that when kw_only is missing or None, it is treated as equivalent to kw_only=True 

1906 args = sorted( 

1907 (self._generate_dc_field_schema(k, v, decorators) for k, v in fields.items()), 

1908 key=lambda a: a.get('kw_only') is not False, 

1909 ) 

1910 has_post_init = hasattr(dataclass, '__post_init__') 

1911 has_slots = hasattr(dataclass, '__slots__') 

1912 

1913 args_schema = core_schema.dataclass_args_schema( 

1914 dataclass.__name__, 

1915 args, 

1916 computed_fields=[ 

1917 self._computed_field_schema(d, decorators.field_serializers) 

1918 for d in decorators.computed_fields.values() 

1919 ], 

1920 collect_init_only=has_post_init, 

1921 ) 

1922 

1923 inner_schema = apply_validators(args_schema, decorators.root_validators.values()) 

1924 

1925 model_validators = decorators.model_validators.values() 

1926 inner_schema = apply_model_validators(inner_schema, model_validators, 'inner') 

1927 

1928 core_config = self._config_wrapper.core_config(title=dataclass.__name__) 

1929 

1930 dc_schema = core_schema.dataclass_schema( 

1931 dataclass, 

1932 inner_schema, 

1933 generic_origin=origin, 

1934 post_init=has_post_init, 

1935 ref=dataclass_ref, 

1936 fields=[field.name for field in dataclasses.fields(dataclass)], 

1937 slots=has_slots, 

1938 config=core_config, 

1939 # we don't use a custom __setattr__ for dataclasses, so we must 

1940 # pass along the frozen config setting to the pydantic-core schema 

1941 frozen=self._config_wrapper_stack.tail.frozen, 

1942 ) 

1943 schema = self._apply_model_serializers(dc_schema, decorators.model_serializers.values()) 

1944 schema = apply_model_validators(schema, model_validators, 'outer') 

1945 return self.defs.create_definition_reference_schema(schema) 

1946 

1947 def _call_schema(self, function: ValidateCallSupportedTypes) -> core_schema.CallSchema: 

1948 """Generate schema for a Callable. 

1949 

1950 TODO support functional validators once we support them in Config 

1951 """ 

1952 arguments_schema = self._arguments_schema(function) 

1953 

1954 return_schema: core_schema.CoreSchema | None = None 

1955 config_wrapper = self._config_wrapper 

1956 if config_wrapper.validate_return: 

1957 sig = _typing_extra.signature_no_eval(function) 

1958 return_hint = sig.return_annotation 

1959 if return_hint is not sig.empty: 

1960 globalns, localns = self._types_namespace 

1961 type_hints = _typing_extra.get_function_type_hints( 

1962 function, globalns=globalns, localns=localns, include_keys={'return'} 

1963 ) 

1964 return_schema = self.generate_schema(type_hints['return']) 

1965 

1966 return core_schema.call_schema( 

1967 arguments_schema, 

1968 function, 

1969 return_schema=return_schema, 

1970 ) 

1971 

1972 def _arguments_schema( 

1973 self, function: ValidateCallSupportedTypes, parameters_callback: ParametersCallback | None = None 

1974 ) -> core_schema.ArgumentsSchema: 

1975 """Generate schema for a Signature.""" 

1976 mode_lookup: dict[_ParameterKind, Literal['positional_only', 'positional_or_keyword', 'keyword_only']] = { 

1977 Parameter.POSITIONAL_ONLY: 'positional_only', 

1978 Parameter.POSITIONAL_OR_KEYWORD: 'positional_or_keyword', 

1979 Parameter.KEYWORD_ONLY: 'keyword_only', 

1980 } 

1981 

1982 sig = _typing_extra.signature_no_eval(function) 

1983 globalns, localns = self._types_namespace 

1984 type_hints = _typing_extra.get_function_type_hints(function, globalns=globalns, localns=localns) 

1985 

1986 arguments_list: list[core_schema.ArgumentsParameter] = [] 

1987 var_args_schema: core_schema.CoreSchema | None = None 

1988 var_kwargs_schema: core_schema.CoreSchema | None = None 

1989 var_kwargs_mode: core_schema.VarKwargsMode | None = None 

1990 

1991 for i, (name, p) in enumerate(sig.parameters.items()): 

1992 if p.annotation is sig.empty: 

1993 annotation = typing.cast(Any, Any) 

1994 else: 

1995 annotation = type_hints[name] 

1996 

1997 if parameters_callback is not None: 

1998 result = parameters_callback(i, name, annotation) 

1999 if result == 'skip': 

2000 continue 

2001 

2002 parameter_mode = mode_lookup.get(p.kind) 

2003 if parameter_mode is not None: 

2004 arg_schema = self._generate_parameter_schema( 

2005 name, annotation, AnnotationSource.FUNCTION, p.default, parameter_mode 

2006 ) 

2007 arguments_list.append(arg_schema) 

2008 elif p.kind == Parameter.VAR_POSITIONAL: 

2009 var_args_schema = self.generate_schema(annotation) 

2010 else: 

2011 assert p.kind == Parameter.VAR_KEYWORD, p.kind 

2012 

2013 unpack_type = _typing_extra.unpack_type(annotation) 

2014 if unpack_type is not None: 

2015 origin = get_origin(unpack_type) or unpack_type 

2016 if not is_typeddict(origin): 

2017 raise PydanticUserError( 

2018 f'Expected a `TypedDict` class inside `Unpack[...]`, got {unpack_type!r}', 

2019 code='unpack-typed-dict', 

2020 ) 

2021 non_pos_only_param_names = { 

2022 name for name, p in sig.parameters.items() if p.kind != Parameter.POSITIONAL_ONLY 

2023 } 

2024 overlapping_params = non_pos_only_param_names.intersection(origin.__annotations__) 

2025 if overlapping_params: 

2026 raise PydanticUserError( 

2027 f'Typed dictionary {origin.__name__!r} overlaps with parameter' 

2028 f'{"s" if len(overlapping_params) >= 2 else ""} ' 

2029 f'{", ".join(repr(p) for p in sorted(overlapping_params))}', 

2030 code='overlapping-unpack-typed-dict', 

2031 ) 

2032 

2033 var_kwargs_mode = 'unpacked-typed-dict' 

2034 var_kwargs_schema = self._typed_dict_schema(unpack_type, get_origin(unpack_type)) 

2035 else: 

2036 var_kwargs_mode = 'uniform' 

2037 var_kwargs_schema = self.generate_schema(annotation) 

2038 

2039 return core_schema.arguments_schema( 

2040 arguments_list, 

2041 var_args_schema=var_args_schema, 

2042 var_kwargs_mode=var_kwargs_mode, 

2043 var_kwargs_schema=var_kwargs_schema, 

2044 validate_by_name=self._config_wrapper.validate_by_name, 

2045 ) 

2046 

2047 def _arguments_v3_schema( 

2048 self, function: ValidateCallSupportedTypes, parameters_callback: ParametersCallback | None = None 

2049 ) -> core_schema.ArgumentsV3Schema: 

2050 mode_lookup: dict[ 

2051 _ParameterKind, Literal['positional_only', 'positional_or_keyword', 'var_args', 'keyword_only'] 

2052 ] = { 

2053 Parameter.POSITIONAL_ONLY: 'positional_only', 

2054 Parameter.POSITIONAL_OR_KEYWORD: 'positional_or_keyword', 

2055 Parameter.VAR_POSITIONAL: 'var_args', 

2056 Parameter.KEYWORD_ONLY: 'keyword_only', 

2057 } 

2058 

2059 sig = _typing_extra.signature_no_eval(function) 

2060 globalns, localns = self._types_namespace 

2061 type_hints = _typing_extra.get_function_type_hints(function, globalns=globalns, localns=localns) 

2062 

2063 parameters_list: list[core_schema.ArgumentsV3Parameter] = [] 

2064 

2065 for i, (name, p) in enumerate(sig.parameters.items()): 

2066 if parameters_callback is not None: 

2067 result = parameters_callback(i, name, p.annotation) 

2068 if result == 'skip': 

2069 continue 

2070 

2071 if p.annotation is Parameter.empty: 

2072 annotation = typing.cast(Any, Any) 

2073 else: 

2074 annotation = type_hints[name] 

2075 

2076 parameter_mode = mode_lookup.get(p.kind) 

2077 if parameter_mode is None: 

2078 assert p.kind == Parameter.VAR_KEYWORD, p.kind 

2079 

2080 unpack_type = _typing_extra.unpack_type(annotation) 

2081 if unpack_type is not None: 

2082 origin = get_origin(unpack_type) or unpack_type 

2083 if not is_typeddict(origin): 

2084 raise PydanticUserError( 

2085 f'Expected a `TypedDict` class inside `Unpack[...]`, got {unpack_type!r}', 

2086 code='unpack-typed-dict', 

2087 ) 

2088 non_pos_only_param_names = { 

2089 name for name, p in sig.parameters.items() if p.kind != Parameter.POSITIONAL_ONLY 

2090 } 

2091 overlapping_params = non_pos_only_param_names.intersection(origin.__annotations__) 

2092 if overlapping_params: 

2093 raise PydanticUserError( 

2094 f'Typed dictionary {origin.__name__!r} overlaps with parameter' 

2095 f'{"s" if len(overlapping_params) >= 2 else ""} ' 

2096 f'{", ".join(repr(p) for p in sorted(overlapping_params))}', 

2097 code='overlapping-unpack-typed-dict', 

2098 ) 

2099 parameter_mode = 'var_kwargs_unpacked_typed_dict' 

2100 annotation = unpack_type 

2101 else: 

2102 parameter_mode = 'var_kwargs_uniform' 

2103 

2104 parameters_list.append( 

2105 self._generate_parameter_v3_schema( 

2106 name, annotation, AnnotationSource.FUNCTION, parameter_mode, default=p.default 

2107 ) 

2108 ) 

2109 

2110 return core_schema.arguments_v3_schema( 

2111 parameters_list, 

2112 validate_by_name=self._config_wrapper.validate_by_name, 

2113 ) 

2114 

2115 def _unsubstituted_typevar_schema(self, typevar: typing.TypeVar) -> core_schema.CoreSchema: 

2116 try: 

2117 has_default = typevar.has_default() # pyright: ignore[reportAttributeAccessIssue] 

2118 except AttributeError: 

2119 # Happens if using `typing.TypeVar` (and not `typing_extensions`) on Python < 3.13 

2120 pass 

2121 else: 

2122 if has_default: 

2123 return self.generate_schema(typevar.__default__) # pyright: ignore[reportAttributeAccessIssue] 

2124 

2125 if constraints := typevar.__constraints__: 

2126 return self._union_schema(typing.Union[constraints]) 

2127 

2128 if bound := typevar.__bound__: 

2129 schema = self.generate_schema(bound) 

2130 schema['serialization'] = core_schema.simple_ser_schema('any') 

2131 return schema 

2132 

2133 return core_schema.any_schema() 

2134 

2135 def _computed_field_schema( 

2136 self, 

2137 d: Decorator[ComputedFieldInfo], 

2138 field_serializers: dict[str, Decorator[FieldSerializerDecoratorInfo]], 

2139 ) -> core_schema.ComputedField: 

2140 if d.info.return_type is not PydanticUndefined: 

2141 return_type = d.info.return_type 

2142 else: 

2143 try: 

2144 # Do not pass in globals as the function could be defined in a different module. 

2145 # Instead, let `get_callable_return_type` infer the globals to use, but still pass 

2146 # in locals that may contain a parent/rebuild namespace: 

2147 return_type = _decorators.get_callable_return_type(d.func, localns=self._types_namespace.locals) 

2148 except NameError as e: 

2149 raise PydanticUndefinedAnnotation.from_name_error(e) from e 

2150 if return_type is PydanticUndefined: 

2151 raise PydanticUserError( 

2152 'Computed field is missing return type annotation or specifying `return_type`' 

2153 ' to the `@computed_field` decorator (e.g. `@computed_field(return_type=int | str)`)', 

2154 code='model-field-missing-annotation', 

2155 ) 

2156 

2157 return_type = replace_types(return_type, self._typevars_map) 

2158 # Create a new ComputedFieldInfo so that different type parametrizations of the same 

2159 # generic model's computed field can have different return types. 

2160 d.info = dataclasses.replace(d.info, return_type=return_type) 

2161 return_type_schema = self.generate_schema(return_type) 

2162 # Apply serializers to computed field if there exist 

2163 return_type_schema = self._apply_field_serializers( 

2164 return_type_schema, 

2165 filter_field_decorator_info_by_field(field_serializers.values(), d.cls_var_name), 

2166 ) 

2167 

2168 pydantic_js_updates, pydantic_js_extra = _extract_json_schema_info_from_field_info(d.info) 

2169 core_metadata: dict[str, Any] = {} 

2170 update_core_metadata( 

2171 core_metadata, 

2172 pydantic_js_updates={'readOnly': True, **(pydantic_js_updates if pydantic_js_updates else {})}, 

2173 pydantic_js_extra=pydantic_js_extra, 

2174 ) 

2175 exclude_if = d.info.exclude_if 

2176 # TODO: Should we support exclude_if from annotations? 

2177 return core_schema.computed_field( 

2178 d.cls_var_name, 

2179 return_schema=return_type_schema, 

2180 alias=d.info.alias, 

2181 serialization_exclude_if=exclude_if, 

2182 metadata=core_metadata, 

2183 ) 

2184 

2185 def _annotated_schema(self, annotated_type: Any) -> core_schema.CoreSchema: 

2186 """Generate schema for an Annotated type, e.g. `Annotated[int, Field(...)]` or `Annotated[int, Gt(0)]`.""" 

2187 FieldInfo = import_cached_field_info() 

2188 source_type, *annotations = self._get_args_resolving_forward_refs( 

2189 annotated_type, 

2190 required=True, 

2191 ) 

2192 schema = self._apply_annotations(source_type, annotations) 

2193 # put the default validator last so that TypeAdapter.get_default_value() works 

2194 # even if there are function validators involved 

2195 for annotation in annotations: 

2196 if isinstance(annotation, FieldInfo): 

2197 schema = wrap_default(annotation, schema) 

2198 return schema 

2199 

2200 def _apply_annotations( 

2201 self, 

2202 source_type: Any, 

2203 annotations: list[Any], 

2204 transform_inner_schema: Callable[[CoreSchema], CoreSchema] = lambda x: x, 

2205 check_unsupported_field_info_attributes: bool = True, 

2206 ) -> CoreSchema: 

2207 """Apply arguments from `Annotated` or from `FieldInfo` to a schema. 

2208 

2209 This gets called by `GenerateSchema._annotated_schema` but differs from it in that it does 

2210 not expect `source_type` to be an `Annotated` object, it expects it to be the first argument of that 

2211 (in other words, `GenerateSchema._annotated_schema` just unpacks `Annotated`, this process it). 

2212 """ 

2213 annotations = list(_known_annotated_metadata.expand_grouped_metadata(annotations)) 

2214 

2215 pydantic_js_annotation_functions: list[GetJsonSchemaFunction] = [] 

2216 

2217 def inner_handler(obj: Any) -> CoreSchema: 

2218 schema = self._generate_schema_from_get_schema_method(obj, source_type) 

2219 

2220 if schema is None: 

2221 schema = self._generate_schema_inner(obj) 

2222 

2223 metadata_js_function = _extract_get_pydantic_json_schema(obj) 

2224 if metadata_js_function is not None: 

2225 metadata_schema = resolve_original_schema(schema, self.defs) 

2226 if metadata_schema is not None: 

2227 self._add_js_function(metadata_schema, metadata_js_function) 

2228 return transform_inner_schema(schema) 

2229 

2230 get_inner_schema = CallbackGetCoreSchemaHandler(inner_handler, self) 

2231 

2232 for annotation in annotations: 

2233 if annotation is None: 

2234 continue 

2235 get_inner_schema = self._get_wrapped_inner_schema( 

2236 get_inner_schema, 

2237 annotation, 

2238 pydantic_js_annotation_functions, 

2239 check_unsupported_field_info_attributes=check_unsupported_field_info_attributes, 

2240 ) 

2241 

2242 schema = get_inner_schema(source_type) 

2243 if pydantic_js_annotation_functions: 

2244 core_metadata = schema.setdefault('metadata', {}) 

2245 update_core_metadata(core_metadata, pydantic_js_annotation_functions=pydantic_js_annotation_functions) 

2246 return _add_custom_serialization_from_json_encoders(self._config_wrapper.json_encoders, source_type, schema) 

2247 

2248 def _apply_single_annotation( 

2249 self, 

2250 schema: core_schema.CoreSchema, 

2251 metadata: Any, 

2252 check_unsupported_field_info_attributes: bool = True, 

2253 ) -> core_schema.CoreSchema: 

2254 FieldInfo = import_cached_field_info() 

2255 

2256 if isinstance(metadata, FieldInfo): 

2257 if ( 

2258 check_unsupported_field_info_attributes 

2259 # HACK: we don't want to emit the warning for `FieldInfo` subclasses, because FastAPI does weird manipulations 

2260 # with its subclasses and their annotations: 

2261 and type(metadata) is FieldInfo 

2262 ): 

2263 for attr, value in (unsupported_attributes := self._get_unsupported_field_info_attributes(metadata)): 

2264 warnings.warn( 

2265 f'The {attr!r} attribute with value {value!r} was provided to the `Field()` function, ' 

2266 f'which has no effect in the context it was used. {attr!r} is field-specific metadata, ' 

2267 'and can only be attached to a model field using `Annotated` metadata or by assignment. ' 

2268 'This may have happened because an `Annotated` type alias using the `type` statement was ' 

2269 'used, or if the `Field()` function was attached to a single member of a union type.', 

2270 category=UnsupportedFieldAttributeWarning, 

2271 ) 

2272 

2273 if ( 

2274 metadata.default_factory_takes_validated_data 

2275 and self.model_type_stack.get() is None 

2276 and 'defaut_factory' not in unsupported_attributes 

2277 ): 

2278 warnings.warn( 

2279 "A 'default_factory' taking validated data as an argument was provided to the `Field()` function, " 

2280 'but no validated data is available in the context it was used.', 

2281 category=UnsupportedFieldAttributeWarning, 

2282 ) 

2283 

2284 for field_metadata in metadata.metadata: 

2285 schema = self._apply_single_annotation(schema, field_metadata) 

2286 

2287 if metadata.discriminator is not None: 

2288 schema = self._apply_discriminator_to_union(schema, metadata.discriminator) 

2289 return schema 

2290 

2291 if schema['type'] == 'nullable': 

2292 # for nullable schemas, metadata is automatically applied to the inner schema 

2293 inner = schema.get('schema', core_schema.any_schema()) 

2294 inner = self._apply_single_annotation(inner, metadata) 

2295 if inner: 

2296 schema['schema'] = inner 

2297 return schema 

2298 

2299 if schema['type'] == 'union' and any( 

2300 choice['type'] == 'missing-sentinel' for choice in core_schema.iter_union_choices(schema) 

2301 ): 

2302 # Same behavior as for nullable schemas. This is a bit gross, but we have to support the same pattern 

2303 filtered_choices = [ 

2304 choice 

2305 for choice in schema['choices'] 

2306 if (choice[0] if isinstance(choice, tuple) else choice)['type'] != 'missing-sentinel' 

2307 ] 

2308 if len(filtered_choices) >= 2: 

2309 # e.g. `Annotated[int | str | MISSING, Constraint(...)]`. We apply `Constraint(...)` to `int | str`, 

2310 # and create a new union semantically equivalent to `Annotated[int | str, Constraint(...)] | MISSING`: 

2311 filtered_union = core_schema.union_schema(filtered_choices) 

2312 filtered_union = self._apply_single_annotation(filtered_union, metadata) 

2313 new_union = schema.copy() 

2314 new_union['choices'] = [ 

2315 filtered_union, 

2316 next( 

2317 choice 

2318 for choice in schema['choices'] 

2319 if (choice[0] if isinstance(choice, tuple) else choice)['type'] == 'missing-sentinel' 

2320 ), 

2321 ] 

2322 return new_union 

2323 elif len(filtered_choices) == 1: 

2324 # e.g. `Annotated[int | MISSING, Constraint(...)]`. We apply `Constraint(...)` to `int`, and reconstruct 

2325 # a new union preserving the order. 

2326 inner = filtered_choices[0][0] if isinstance(filtered_choices[0], tuple) else filtered_choices[0] 

2327 inner = self._apply_single_annotation(inner, metadata) 

2328 

2329 # Create a new union schema, preserving the order of the union: 

2330 new_union = schema.copy() 

2331 new_union['choices'] = [ 

2332 (inner, choice[1]) 

2333 if isinstance(choice, tuple) and choice[0]['type'] != 'missing-sentinel' 

2334 else inner 

2335 if not isinstance(choice, tuple) and choice['type'] != 'missing-sentinel' 

2336 else choice 

2337 for choice in schema['choices'] 

2338 ] 

2339 return new_union 

2340 

2341 original_schema = schema 

2342 ref = schema.get('ref') 

2343 if ref is not None: 

2344 schema = schema.copy() 

2345 new_ref = ref + f'_{repr(metadata)}' 

2346 if (existing := self.defs.get_schema_from_ref(new_ref)) is not None: 

2347 return existing 

2348 schema['ref'] = new_ref # pyright: ignore[reportGeneralTypeIssues] 

2349 elif schema['type'] == 'definition-ref': 

2350 ref = schema['schema_ref'] 

2351 if (referenced_schema := self.defs.get_schema_from_ref(ref)) is not None: 

2352 schema = referenced_schema.copy() 

2353 new_ref = ref + f'_{repr(metadata)}' 

2354 if (existing := self.defs.get_schema_from_ref(new_ref)) is not None: 

2355 return existing 

2356 schema['ref'] = new_ref # pyright: ignore[reportGeneralTypeIssues] 

2357 

2358 maybe_updated_schema = _known_annotated_metadata.apply_known_metadata(metadata, schema) 

2359 

2360 if maybe_updated_schema is not None: 

2361 return maybe_updated_schema 

2362 return original_schema 

2363 

2364 def _apply_single_annotation_json_schema( 

2365 self, schema: core_schema.CoreSchema, metadata: Any 

2366 ) -> core_schema.CoreSchema: 

2367 FieldInfo = import_cached_field_info() 

2368 

2369 if isinstance(metadata, FieldInfo): 

2370 for field_metadata in metadata.metadata: 

2371 schema = self._apply_single_annotation_json_schema(schema, field_metadata) 

2372 

2373 pydantic_js_updates, pydantic_js_extra = _extract_json_schema_info_from_field_info(metadata) 

2374 core_metadata = schema.setdefault('metadata', {}) 

2375 update_core_metadata( 

2376 core_metadata, pydantic_js_updates=pydantic_js_updates, pydantic_js_extra=pydantic_js_extra 

2377 ) 

2378 return schema 

2379 

2380 def _get_unsupported_field_info_attributes(self, field_info: FieldInfo) -> list[tuple[str, Any]]: 

2381 """Get the list of unsupported `FieldInfo` attributes when not directly used in `Annotated` for field annotations.""" 

2382 unused_metadata: list[tuple[str, Any]] = [] 

2383 for unused_metadata_name, unset_value in UNSUPPORTED_STANDALONE_FIELDINFO_ATTRIBUTES: 

2384 if ( 

2385 (unused_metadata_value := getattr(field_info, unused_metadata_name)) is not unset_value 

2386 # `default` and `default_factory` can still be used with a type adapter, so only include them 

2387 # if used with a model-like class: 

2388 and ( 

2389 unused_metadata_name not in ('default', 'default_factory') 

2390 or self.model_type_stack.get() is not None 

2391 ) 

2392 # Setting `alias` will set `validation/serialization_alias` as well, so we want to avoid duplicate warnings: 

2393 and ( 

2394 unused_metadata_name not in ('validation_alias', 'serialization_alias') 

2395 or 'alias' not in field_info._attributes_set 

2396 ) 

2397 ): 

2398 unused_metadata.append((unused_metadata_name, unused_metadata_value)) 

2399 

2400 return unused_metadata 

2401 

2402 def _get_wrapped_inner_schema( 

2403 self, 

2404 get_inner_schema: GetCoreSchemaHandler, 

2405 annotation: Any, 

2406 pydantic_js_annotation_functions: list[GetJsonSchemaFunction], 

2407 check_unsupported_field_info_attributes: bool = False, 

2408 ) -> CallbackGetCoreSchemaHandler: 

2409 annotation_get_schema: GetCoreSchemaFunction | None = getattr(annotation, '__get_pydantic_core_schema__', None) 

2410 

2411 def new_handler(source: Any) -> core_schema.CoreSchema: 

2412 if annotation_get_schema is not None: 

2413 schema = annotation_get_schema(source, get_inner_schema) 

2414 else: 

2415 schema = get_inner_schema(source) 

2416 schema = self._apply_single_annotation( 

2417 schema, 

2418 annotation, 

2419 check_unsupported_field_info_attributes=check_unsupported_field_info_attributes, 

2420 ) 

2421 schema = self._apply_single_annotation_json_schema(schema, annotation) 

2422 

2423 metadata_js_function = _extract_get_pydantic_json_schema(annotation) 

2424 if metadata_js_function is not None: 

2425 pydantic_js_annotation_functions.append(metadata_js_function) 

2426 return schema 

2427 

2428 return CallbackGetCoreSchemaHandler(new_handler, self) 

2429 

2430 def _apply_field_serializers( 

2431 self, 

2432 schema: core_schema.CoreSchema, 

2433 serializers: list[Decorator[FieldSerializerDecoratorInfo]], 

2434 ) -> core_schema.CoreSchema: 

2435 """Apply field serializers to a schema.""" 

2436 if serializers: 

2437 schema = copy(schema) 

2438 if schema['type'] == 'definitions': 

2439 inner_schema = schema['schema'] 

2440 schema['schema'] = self._apply_field_serializers(inner_schema, serializers) 

2441 return schema 

2442 elif 'ref' in schema: 

2443 schema = self.defs.create_definition_reference_schema(schema) 

2444 

2445 # use the last serializer to make it easy to override a serializer set on a parent model 

2446 serializer = serializers[-1] 

2447 is_field_serializer, info_arg = inspect_field_serializer(serializer.func, serializer.info.mode) 

2448 

2449 if serializer.info.return_type is not PydanticUndefined: 

2450 return_type = serializer.info.return_type 

2451 else: 

2452 try: 

2453 # Do not pass in globals as the function could be defined in a different module. 

2454 # Instead, let `get_callable_return_type` infer the globals to use, but still pass 

2455 # in locals that may contain a parent/rebuild namespace: 

2456 return_type = _decorators.get_callable_return_type( 

2457 serializer.func, localns=self._types_namespace.locals 

2458 ) 

2459 except NameError as e: 

2460 raise PydanticUndefinedAnnotation.from_name_error(e) from e 

2461 

2462 if return_type is PydanticUndefined: 

2463 return_schema = None 

2464 else: 

2465 return_schema = self.generate_schema(return_type) 

2466 

2467 if serializer.info.mode == 'wrap': 

2468 schema['serialization'] = core_schema.wrap_serializer_function_ser_schema( 

2469 serializer.func, 

2470 is_field_serializer=is_field_serializer, 

2471 info_arg=info_arg, 

2472 return_schema=return_schema, 

2473 when_used=serializer.info.when_used, 

2474 ) 

2475 else: 

2476 assert serializer.info.mode == 'plain' 

2477 schema['serialization'] = core_schema.plain_serializer_function_ser_schema( 

2478 serializer.func, 

2479 is_field_serializer=is_field_serializer, 

2480 info_arg=info_arg, 

2481 return_schema=return_schema, 

2482 when_used=serializer.info.when_used, 

2483 ) 

2484 return schema 

2485 

2486 def _apply_model_serializers( 

2487 self, schema: core_schema.CoreSchema, serializers: Iterable[Decorator[ModelSerializerDecoratorInfo]] 

2488 ) -> core_schema.CoreSchema: 

2489 """Apply model serializers to a schema.""" 

2490 ref: str | None = schema.pop('ref', None) # type: ignore 

2491 if serializers: 

2492 serializer = list(serializers)[-1] 

2493 info_arg = inspect_model_serializer(serializer.func, serializer.info.mode) 

2494 

2495 if serializer.info.return_type is not PydanticUndefined: 

2496 return_type = serializer.info.return_type 

2497 else: 

2498 try: 

2499 # Do not pass in globals as the function could be defined in a different module. 

2500 # Instead, let `get_callable_return_type` infer the globals to use, but still pass 

2501 # in locals that may contain a parent/rebuild namespace: 

2502 return_type = _decorators.get_callable_return_type( 

2503 serializer.func, localns=self._types_namespace.locals 

2504 ) 

2505 except NameError as e: 

2506 raise PydanticUndefinedAnnotation.from_name_error(e) from e 

2507 

2508 if return_type is PydanticUndefined: 

2509 return_schema = None 

2510 else: 

2511 return_schema = self.generate_schema(return_type) 

2512 

2513 if serializer.info.mode == 'wrap': 

2514 ser_schema: core_schema.SerSchema = core_schema.wrap_serializer_function_ser_schema( 

2515 serializer.func, 

2516 info_arg=info_arg, 

2517 return_schema=return_schema, 

2518 when_used=serializer.info.when_used, 

2519 ) 

2520 else: 

2521 # plain 

2522 ser_schema = core_schema.plain_serializer_function_ser_schema( 

2523 serializer.func, 

2524 info_arg=info_arg, 

2525 return_schema=return_schema, 

2526 when_used=serializer.info.when_used, 

2527 ) 

2528 schema['serialization'] = ser_schema 

2529 if ref: 

2530 schema['ref'] = ref # type: ignore 

2531 return schema 

2532 

2533 

2534_VALIDATOR_F_MATCH: Mapping[ 

2535 tuple[FieldValidatorModes, Literal['no-info', 'with-info']], 

2536 Callable[[Callable[..., Any], core_schema.CoreSchema], core_schema.CoreSchema], 

2537] = { 

2538 ('before', 'no-info'): lambda f, schema: core_schema.no_info_before_validator_function(f, schema), 

2539 ('after', 'no-info'): lambda f, schema: core_schema.no_info_after_validator_function(f, schema), 

2540 ('plain', 'no-info'): lambda f, _: core_schema.no_info_plain_validator_function(f), 

2541 ('wrap', 'no-info'): lambda f, schema: core_schema.no_info_wrap_validator_function(f, schema), 

2542 ('before', 'with-info'): lambda f, schema: core_schema.with_info_before_validator_function(f, schema), 

2543 ('after', 'with-info'): lambda f, schema: core_schema.with_info_after_validator_function(f, schema), 

2544 ('plain', 'with-info'): lambda f, _: core_schema.with_info_plain_validator_function(f), 

2545 ('wrap', 'with-info'): lambda f, schema: core_schema.with_info_wrap_validator_function(f, schema), 

2546} 

2547 

2548 

2549# TODO V3: this function is only used for deprecated decorators. It should 

2550# be removed once we drop support for those. 

2551def apply_validators( 

2552 schema: core_schema.CoreSchema, 

2553 validators: Iterable[Decorator[RootValidatorDecoratorInfo]] 

2554 | Iterable[Decorator[ValidatorDecoratorInfo]] 

2555 | Iterable[Decorator[FieldValidatorDecoratorInfo]], 

2556) -> core_schema.CoreSchema: 

2557 """Apply validators to a schema. 

2558 

2559 Args: 

2560 schema: The schema to apply validators on. 

2561 validators: An iterable of validators. 

2562 field_name: The name of the field if validators are being applied to a model field. 

2563 

2564 Returns: 

2565 The updated schema. 

2566 """ 

2567 for validator in validators: 

2568 # Actually, type could be 'field' or 'model', but this is only used for deprecated 

2569 # decorators, so let's not worry about it. 

2570 info_arg = inspect_validator(validator.func, mode=validator.info.mode, type='field') 

2571 val_type = 'with-info' if info_arg else 'no-info' 

2572 

2573 schema = _VALIDATOR_F_MATCH[(validator.info.mode, val_type)](validator.func, schema) 

2574 return schema 

2575 

2576 

2577def _validators_require_validate_default(validators: Iterable[Decorator[ValidatorDecoratorInfo]]) -> bool: 

2578 """In v1, if any of the validators for a field had `always=True`, the default value would be validated. 

2579 

2580 This serves as an auxiliary function for re-implementing that logic, by looping over a provided 

2581 collection of (v1-style) ValidatorDecoratorInfo's and checking if any of them have `always=True`. 

2582 

2583 We should be able to drop this function and the associated logic calling it once we drop support 

2584 for v1-style validator decorators. (Or we can extend it and keep it if we add something equivalent 

2585 to the v1-validator `always` kwarg to `field_validator`.) 

2586 """ 

2587 for validator in validators: 

2588 if validator.info.always: 

2589 return True 

2590 return False 

2591 

2592 

2593def _convert_to_aliases( 

2594 alias: str | AliasChoices | AliasPath | None, 

2595) -> str | list[str | int] | list[list[str | int]] | None: 

2596 if isinstance(alias, (AliasChoices, AliasPath)): 

2597 return alias.convert_to_aliases() 

2598 else: 

2599 return alias 

2600 

2601 

2602def apply_model_validators( 

2603 schema: core_schema.CoreSchema, 

2604 validators: Iterable[Decorator[ModelValidatorDecoratorInfo]], 

2605 mode: Literal['inner', 'outer', 'all'], 

2606) -> core_schema.CoreSchema: 

2607 """Apply model validators to a schema. 

2608 

2609 If mode == 'inner', only "before" validators are applied 

2610 If mode == 'outer', validators other than "before" are applied 

2611 If mode == 'all', all validators are applied 

2612 

2613 Args: 

2614 schema: The schema to apply validators on. 

2615 validators: An iterable of validators. 

2616 mode: The validator mode. 

2617 

2618 Returns: 

2619 The updated schema. 

2620 """ 

2621 ref: str | None = schema.pop('ref', None) # type: ignore 

2622 for validator in validators: 

2623 if mode == 'inner' and validator.info.mode != 'before': 

2624 continue 

2625 if mode == 'outer' and validator.info.mode == 'before': 

2626 continue 

2627 info_arg = inspect_validator(validator.func, mode=validator.info.mode, type='model') 

2628 if validator.info.mode == 'wrap': 

2629 if info_arg: 

2630 schema = core_schema.with_info_wrap_validator_function(function=validator.func, schema=schema) 

2631 else: 

2632 schema = core_schema.no_info_wrap_validator_function(function=validator.func, schema=schema) 

2633 elif validator.info.mode == 'before': 

2634 if info_arg: 

2635 schema = core_schema.with_info_before_validator_function(function=validator.func, schema=schema) 

2636 else: 

2637 schema = core_schema.no_info_before_validator_function(function=validator.func, schema=schema) 

2638 else: 

2639 assert validator.info.mode == 'after' 

2640 if info_arg: 

2641 schema = core_schema.with_info_after_validator_function(function=validator.func, schema=schema) 

2642 else: 

2643 schema = core_schema.no_info_after_validator_function(function=validator.func, schema=schema) 

2644 if ref: 

2645 schema['ref'] = ref # type: ignore 

2646 return schema 

2647 

2648 

2649def wrap_default(field_info: FieldInfo, schema: core_schema.CoreSchema) -> core_schema.CoreSchema: 

2650 """Wrap schema with default schema if default value or `default_factory` are available. 

2651 

2652 Args: 

2653 field_info: The field info object. 

2654 schema: The schema to apply default on. 

2655 

2656 Returns: 

2657 Updated schema by default value or `default_factory`. 

2658 """ 

2659 if field_info.default_factory: 

2660 return core_schema.with_default_schema( 

2661 schema, 

2662 default_factory=field_info.default_factory, 

2663 default_factory_takes_data=takes_validated_data_argument(field_info.default_factory), 

2664 validate_default=field_info.validate_default, 

2665 ) 

2666 elif field_info.default is not PydanticUndefined: 

2667 return core_schema.with_default_schema( 

2668 schema, default=field_info.default, validate_default=field_info.validate_default 

2669 ) 

2670 else: 

2671 return schema 

2672 

2673 

2674def _extract_get_pydantic_json_schema(tp: Any) -> GetJsonSchemaFunction | None: 

2675 """Extract `__get_pydantic_json_schema__` from a type, handling the deprecated `__modify_schema__`.""" 

2676 js_modify_function = getattr(tp, '__get_pydantic_json_schema__', None) 

2677 

2678 if hasattr(tp, '__modify_schema__'): 

2679 BaseModel = import_cached_base_model() 

2680 

2681 has_custom_v2_modify_js_func = ( 

2682 js_modify_function is not None 

2683 and BaseModel.__get_pydantic_json_schema__.__func__ # type: ignore 

2684 not in (js_modify_function, getattr(js_modify_function, '__func__', None)) 

2685 ) 

2686 

2687 if not has_custom_v2_modify_js_func: 

2688 cls_name = getattr(tp, '__name__', None) 

2689 raise PydanticUserError( 

2690 f'The `__modify_schema__` method is not supported in Pydantic v2. ' 

2691 f'Use `__get_pydantic_json_schema__` instead{f" in class `{cls_name}`" if cls_name else ""}.', 

2692 code='custom-json-schema', 

2693 ) 

2694 

2695 if (origin := get_origin(tp)) is not None: 

2696 # Generic aliases proxy attribute access to the origin, *except* dunder attributes, 

2697 # such as `__get_pydantic_json_schema__`, hence the explicit check. 

2698 return _extract_get_pydantic_json_schema(origin) 

2699 

2700 if js_modify_function is None: 

2701 return None 

2702 

2703 return js_modify_function 

2704 

2705 

2706def resolve_original_schema(schema: CoreSchema, definitions: _Definitions) -> CoreSchema | None: 

2707 if schema['type'] == 'definition-ref': 

2708 return definitions.get_schema_from_ref(schema['schema_ref']) 

2709 elif schema['type'] == 'definitions': 

2710 return schema['schema'] 

2711 else: 

2712 return schema 

2713 

2714 

2715def _inlining_behavior( 

2716 def_ref: core_schema.DefinitionReferenceSchema, 

2717) -> Literal['inline', 'keep', 'preserve_metadata']: 

2718 """Determine the inlining behavior of the `'definition-ref'` schema. 

2719 

2720 - If no `'serialization'` schema and no metadata is attached, the schema can safely be inlined. 

2721 - If it has metadata but only related to the deferred discriminator application, it can be inlined 

2722 provided that such metadata is kept. 

2723 - Otherwise, the schema should not be inlined. Doing so would remove the `'serialization'` schema or metadata. 

2724 """ 

2725 if 'serialization' in def_ref: 

2726 return 'keep' 

2727 metadata = def_ref.get('metadata') 

2728 if not metadata: 

2729 return 'inline' 

2730 if len(metadata) == 1 and 'pydantic_internal_union_discriminator' in metadata: 

2731 return 'preserve_metadata' 

2732 return 'keep' 

2733 

2734 

2735class _Definitions: 

2736 """Keeps track of references and definitions.""" 

2737 

2738 _recursively_seen: set[str] 

2739 """A set of recursively seen references. 

2740 

2741 When a referenceable type is encountered, the `get_schema_or_ref` context manager is 

2742 entered to compute the reference. If the type references itself by some way (e.g. for 

2743 a dataclass a Pydantic model, the class can be referenced as a field annotation), 

2744 entering the context manager again will yield a `'definition-ref'` schema that should 

2745 short-circuit the normal generation process, as the reference was already in this set. 

2746 """ 

2747 

2748 _definitions: dict[str, core_schema.CoreSchema] 

2749 """A mapping of references to their corresponding schema. 

2750 

2751 When a schema for a referenceable type is generated, it is stored in this mapping. If the 

2752 same type is encountered again, the reference is yielded by the `get_schema_or_ref` context 

2753 manager. 

2754 """ 

2755 

2756 def __init__(self) -> None: 

2757 self._recursively_seen = set() 

2758 self._definitions = {} 

2759 

2760 @contextmanager 

2761 def get_schema_or_ref(self, tp: Any, /) -> Generator[tuple[str, core_schema.DefinitionReferenceSchema | None]]: 

2762 """Get a definition for `tp` if one exists. 

2763 

2764 If a definition exists, a tuple of `(ref_string, CoreSchema)` is returned. 

2765 If no definition exists yet, a tuple of `(ref_string, None)` is returned. 

2766 

2767 Note that the returned `CoreSchema` will always be a `DefinitionReferenceSchema`, 

2768 not the actual definition itself. 

2769 

2770 This should be called for any type that can be identified by reference. 

2771 This includes any recursive types. 

2772 

2773 At present the following types can be named/recursive: 

2774 

2775 - Pydantic model 

2776 - Pydantic and stdlib dataclasses 

2777 - Typed dictionaries 

2778 - Named tuples 

2779 - `TypeAliasType` instances 

2780 - Enums 

2781 """ 

2782 ref = get_type_ref(tp) 

2783 # return the reference if we're either (1) in a cycle or (2) it the reference was already encountered: 

2784 if ref in self._recursively_seen or ref in self._definitions: 

2785 yield (ref, core_schema.definition_reference_schema(ref)) 

2786 else: 

2787 self._recursively_seen.add(ref) 

2788 try: 

2789 yield (ref, None) 

2790 finally: 

2791 self._recursively_seen.discard(ref) 

2792 

2793 def get_schema_from_ref(self, ref: str) -> CoreSchema | None: 

2794 """Resolve the schema from the given reference.""" 

2795 return self._definitions.get(ref) 

2796 

2797 def create_definition_reference_schema(self, schema: CoreSchema) -> core_schema.DefinitionReferenceSchema: 

2798 """Store the schema as a definition and return a `'definition-reference'` schema pointing to it. 

2799 

2800 The schema must have a reference attached to it. 

2801 """ 

2802 ref = schema['ref'] # pyright: ignore 

2803 self._definitions[ref] = schema 

2804 return core_schema.definition_reference_schema(ref) 

2805 

2806 def unpack_definitions(self, schema: core_schema.DefinitionsSchema) -> CoreSchema: 

2807 """Store the definitions of the `'definitions'` core schema and return the inner core schema.""" 

2808 for def_schema in schema['definitions']: 

2809 self._definitions[def_schema['ref']] = def_schema # pyright: ignore 

2810 return schema['schema'] 

2811 

2812 def finalize_schema(self, schema: CoreSchema) -> CoreSchema: 

2813 """Finalize the core schema. 

2814 

2815 This traverses the core schema and referenced definitions, replaces `'definition-ref'` schemas 

2816 by the referenced definition if possible, and applies deferred discriminators. 

2817 """ 

2818 definitions = self._definitions 

2819 try: 

2820 gather_result = gather_schemas_for_cleaning( 

2821 schema, 

2822 definitions=definitions, 

2823 ) 

2824 except MissingDefinitionError as e: 

2825 raise InvalidSchemaError from e 

2826 

2827 remaining_defs: dict[str, CoreSchema] = {} 

2828 

2829 # Note: this logic doesn't play well when core schemas with deferred discriminator metadata 

2830 # and references are encountered. See the `test_deferred_discriminated_union_and_references()` test. 

2831 for ref, inlinable_def_ref in gather_result['collected_references'].items(): 

2832 if inlinable_def_ref is not None and (inlining_behavior := _inlining_behavior(inlinable_def_ref)) != 'keep': 

2833 if inlining_behavior == 'inline': 

2834 # `ref` was encountered, and only once: 

2835 # - `inlinable_def_ref` is a `'definition-ref'` schema and is guaranteed to be 

2836 # the only one. Transform it into the definition it points to. 

2837 # - Do not store the definition in the `remaining_defs`. 

2838 inlinable_def_ref.clear() # pyright: ignore[reportAttributeAccessIssue] 

2839 inlinable_def_ref.update(self._resolve_definition(ref, definitions)) # pyright: ignore 

2840 elif inlining_behavior == 'preserve_metadata': 

2841 # `ref` was encountered, and only once, but contains discriminator metadata. 

2842 # We will do the same thing as if `inlining_behavior` was `'inline'`, but make 

2843 # sure to keep the metadata for the deferred discriminator application logic below. 

2844 meta = inlinable_def_ref.pop('metadata') 

2845 inlinable_def_ref.clear() # pyright: ignore[reportAttributeAccessIssue] 

2846 inlinable_def_ref.update(self._resolve_definition(ref, definitions)) # pyright: ignore 

2847 inlinable_def_ref['metadata'] = meta 

2848 else: 

2849 # `ref` was encountered, at least two times (or only once, but with metadata or a serialization schema): 

2850 # - Do not inline the `'definition-ref'` schemas (they are not provided in the gather result anyway). 

2851 # - Store the definition in the `remaining_defs` 

2852 remaining_defs[ref] = self._resolve_definition(ref, definitions) 

2853 

2854 for cs in gather_result['deferred_discriminator_schemas']: 

2855 discriminator: str | None = cs['metadata'].pop('pydantic_internal_union_discriminator', None) # pyright: ignore[reportTypedDictNotRequiredAccess] 

2856 if discriminator is None: 

2857 # This can happen in rare scenarios, when a deferred schema is present multiple times in the 

2858 # gather result (e.g. when using the `Sequence` type -- see `test_sequence_discriminated_union()`). 

2859 # In this case, a previous loop iteration applied the discriminator and so we can just skip it here. 

2860 continue 

2861 applied = _discriminated_union.apply_discriminator(cs.copy(), discriminator, remaining_defs) 

2862 # Mutate the schema directly to have the discriminator applied 

2863 cs.clear() # pyright: ignore[reportAttributeAccessIssue] 

2864 cs.update(applied) # pyright: ignore 

2865 

2866 if remaining_defs: 

2867 schema = core_schema.definitions_schema(schema=schema, definitions=[*remaining_defs.values()]) 

2868 return schema 

2869 

2870 def _resolve_definition(self, ref: str, definitions: dict[str, CoreSchema]) -> CoreSchema: 

2871 definition = definitions[ref] 

2872 if definition['type'] != 'definition-ref': 

2873 return definition 

2874 

2875 # Some `'definition-ref'` schemas might act as "intermediate" references (e.g. when using 

2876 # a PEP 695 type alias (which is referenceable) that references another PEP 695 type alias): 

2877 visited: set[str] = set() 

2878 while definition['type'] == 'definition-ref' and _inlining_behavior(definition) == 'inline': 

2879 schema_ref = definition['schema_ref'] 

2880 if schema_ref in visited: 

2881 raise PydanticUserError( 

2882 f'{ref} contains a circular reference to itself.', code='circular-reference-schema' 

2883 ) 

2884 visited.add(schema_ref) 

2885 definition = definitions[schema_ref] 

2886 return {**definition, 'ref': ref} # pyright: ignore[reportReturnType] 

2887 

2888 

2889class _FieldNameStack: 

2890 __slots__ = ('_stack',) 

2891 

2892 def __init__(self) -> None: 

2893 self._stack: list[str] = [] 

2894 

2895 @contextmanager 

2896 def push(self, field_name: str) -> Iterator[None]: 

2897 self._stack.append(field_name) 

2898 yield 

2899 self._stack.pop() 

2900 

2901 def get(self) -> str | None: 

2902 if self._stack: 

2903 return self._stack[-1] 

2904 else: 

2905 return None 

2906 

2907 

2908class _ModelTypeStack: 

2909 __slots__ = ('_stack',) 

2910 

2911 def __init__(self) -> None: 

2912 self._stack: list[type] = [] 

2913 

2914 @contextmanager 

2915 def push(self, type_obj: type) -> Iterator[None]: 

2916 self._stack.append(type_obj) 

2917 yield 

2918 self._stack.pop() 

2919 

2920 def get(self) -> type | None: 

2921 if self._stack: 

2922 return self._stack[-1] 

2923 else: 

2924 return None