Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pydantic/_internal/_generate_schema.py: 47%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1367 statements  

1"""Convert python types to pydantic-core schema.""" 

2 

3from __future__ import annotations as _annotations 

4 

5import collections.abc 

6import dataclasses 

7import datetime 

8import inspect 

9import os 

10import pathlib 

11import re 

12import sys 

13import typing 

14import warnings 

15from collections.abc import Generator, Iterable, Iterator, Mapping 

16from contextlib import contextmanager 

17from copy import copy 

18from decimal import Decimal 

19from enum import Enum 

20from fractions import Fraction 

21from functools import partial 

22from inspect import Parameter, _ParameterKind, signature 

23from ipaddress import IPv4Address, IPv4Interface, IPv4Network, IPv6Address, IPv6Interface, IPv6Network 

24from itertools import chain 

25from operator import attrgetter 

26from types import FunctionType, GenericAlias, LambdaType, MethodType 

27from typing import ( 

28 TYPE_CHECKING, 

29 Any, 

30 Callable, 

31 Final, 

32 ForwardRef, 

33 Literal, 

34 TypeVar, 

35 Union, 

36 cast, 

37 overload, 

38) 

39from uuid import UUID 

40from warnings import warn 

41from zoneinfo import ZoneInfo 

42 

43import typing_extensions 

44from pydantic_core import ( 

45 CoreSchema, 

46 MultiHostUrl, 

47 PydanticCustomError, 

48 PydanticSerializationUnexpectedValue, 

49 PydanticUndefined, 

50 Url, 

51 core_schema, 

52 to_jsonable_python, 

53) 

54from typing_extensions import TypeAlias, TypeAliasType, TypedDict, get_args, get_origin, is_typeddict 

55from typing_inspection import typing_objects 

56from typing_inspection.introspection import AnnotationSource, get_literal_values, is_union_origin 

57 

58from ..aliases import AliasChoices, AliasGenerator, AliasPath 

59from ..annotated_handlers import GetCoreSchemaHandler, GetJsonSchemaHandler 

60from ..config import ConfigDict, JsonDict, JsonEncoder, JsonSchemaExtraCallable 

61from ..errors import PydanticSchemaGenerationError, PydanticUndefinedAnnotation, PydanticUserError 

62from ..functional_validators import AfterValidator, BeforeValidator, FieldValidatorModes, PlainValidator, WrapValidator 

63from ..json_schema import JsonSchemaValue 

64from ..version import version_short 

65from ..warnings import PydanticDeprecatedSince20 

66from . import _decorators, _discriminated_union, _known_annotated_metadata, _repr, _typing_extra 

67from ._config import ConfigWrapper, ConfigWrapperStack 

68from ._core_metadata import CoreMetadata, update_core_metadata 

69from ._core_utils import ( 

70 get_ref, 

71 get_type_ref, 

72 is_list_like_schema_with_items_schema, 

73 validate_core_schema, 

74) 

75from ._decorators import ( 

76 Decorator, 

77 DecoratorInfos, 

78 FieldSerializerDecoratorInfo, 

79 FieldValidatorDecoratorInfo, 

80 ModelSerializerDecoratorInfo, 

81 ModelValidatorDecoratorInfo, 

82 RootValidatorDecoratorInfo, 

83 ValidatorDecoratorInfo, 

84 get_attribute_from_bases, 

85 inspect_field_serializer, 

86 inspect_model_serializer, 

87 inspect_validator, 

88) 

89from ._docs_extraction import extract_docstrings_from_cls 

90from ._fields import ( 

91 collect_dataclass_fields, 

92 rebuild_dataclass_fields, 

93 rebuild_model_fields, 

94 takes_validated_data_argument, 

95) 

96from ._forward_ref import PydanticRecursiveRef 

97from ._generics import get_standard_typevars_map, replace_types 

98from ._import_utils import import_cached_base_model, import_cached_field_info 

99from ._mock_val_ser import MockCoreSchema 

100from ._namespace_utils import NamespacesTuple, NsResolver 

101from ._schema_gather import MissingDefinitionError, gather_schemas_for_cleaning 

102from ._schema_generation_shared import CallbackGetCoreSchemaHandler 

103from ._utils import lenient_issubclass, smart_deepcopy 

104 

105if TYPE_CHECKING: 

106 from ..fields import ComputedFieldInfo, FieldInfo 

107 from ..main import BaseModel 

108 from ..types import Discriminator 

109 from ._dataclasses import StandardDataclass 

110 from ._schema_generation_shared import GetJsonSchemaFunction 

111 

112_SUPPORTS_TYPEDDICT = sys.version_info >= (3, 12) 

113 

114FieldDecoratorInfo = Union[ValidatorDecoratorInfo, FieldValidatorDecoratorInfo, FieldSerializerDecoratorInfo] 

115FieldDecoratorInfoType = TypeVar('FieldDecoratorInfoType', bound=FieldDecoratorInfo) 

116AnyFieldDecorator = Union[ 

117 Decorator[ValidatorDecoratorInfo], 

118 Decorator[FieldValidatorDecoratorInfo], 

119 Decorator[FieldSerializerDecoratorInfo], 

120] 

121 

122ModifyCoreSchemaWrapHandler: TypeAlias = GetCoreSchemaHandler 

123GetCoreSchemaFunction: TypeAlias = Callable[[Any, ModifyCoreSchemaWrapHandler], core_schema.CoreSchema] 

124ParametersCallback: TypeAlias = "Callable[[int, str, Any], Literal['skip'] | None]" 

125 

126TUPLE_TYPES: list[type] = [typing.Tuple, tuple] # noqa: UP006 

127LIST_TYPES: list[type] = [typing.List, list, collections.abc.MutableSequence] # noqa: UP006 

128SET_TYPES: list[type] = [typing.Set, set, collections.abc.MutableSet] # noqa: UP006 

129FROZEN_SET_TYPES: list[type] = [typing.FrozenSet, frozenset, collections.abc.Set] # noqa: UP006 

130DICT_TYPES: list[type] = [typing.Dict, dict] # noqa: UP006 

131IP_TYPES: list[type] = [IPv4Address, IPv4Interface, IPv4Network, IPv6Address, IPv6Interface, IPv6Network] 

132SEQUENCE_TYPES: list[type] = [typing.Sequence, collections.abc.Sequence] 

133ITERABLE_TYPES: list[type] = [typing.Iterable, collections.abc.Iterable, typing.Generator, collections.abc.Generator] 

134TYPE_TYPES: list[type] = [typing.Type, type] # noqa: UP006 

135PATTERN_TYPES: list[type] = [typing.Pattern, re.Pattern] 

136PATH_TYPES: list[type] = [ 

137 os.PathLike, 

138 pathlib.Path, 

139 pathlib.PurePath, 

140 pathlib.PosixPath, 

141 pathlib.PurePosixPath, 

142 pathlib.PureWindowsPath, 

143] 

144MAPPING_TYPES = [ 

145 typing.Mapping, 

146 typing.MutableMapping, 

147 collections.abc.Mapping, 

148 collections.abc.MutableMapping, 

149 collections.OrderedDict, 

150 typing_extensions.OrderedDict, 

151 typing.DefaultDict, # noqa: UP006 

152 collections.defaultdict, 

153] 

154COUNTER_TYPES = [collections.Counter, typing.Counter] 

155DEQUE_TYPES: list[type] = [collections.deque, typing.Deque] # noqa: UP006 

156 

157# Note: This does not play very well with type checkers. For example, 

158# `a: LambdaType = lambda x: x` will raise a type error by Pyright. 

159ValidateCallSupportedTypes = Union[ 

160 LambdaType, 

161 FunctionType, 

162 MethodType, 

163 partial, 

164] 

165 

166VALIDATE_CALL_SUPPORTED_TYPES = get_args(ValidateCallSupportedTypes) 

167 

168_mode_to_validator: dict[ 

169 FieldValidatorModes, type[BeforeValidator | AfterValidator | PlainValidator | WrapValidator] 

170] = {'before': BeforeValidator, 'after': AfterValidator, 'plain': PlainValidator, 'wrap': WrapValidator} 

171 

172 

173def check_validator_fields_against_field_name( 

174 info: FieldDecoratorInfo, 

175 field: str, 

176) -> bool: 

177 """Check if field name is in validator fields. 

178 

179 Args: 

180 info: The field info. 

181 field: The field name to check. 

182 

183 Returns: 

184 `True` if field name is in validator fields, `False` otherwise. 

185 """ 

186 fields = info.fields 

187 return '*' in fields or field in fields 

188 

189 

190def check_decorator_fields_exist(decorators: Iterable[AnyFieldDecorator], fields: Iterable[str]) -> None: 

191 """Check if the defined fields in decorators exist in `fields` param. 

192 

193 It ignores the check for a decorator if the decorator has `*` as field or `check_fields=False`. 

194 

195 Args: 

196 decorators: An iterable of decorators. 

197 fields: An iterable of fields name. 

198 

199 Raises: 

200 PydanticUserError: If one of the field names does not exist in `fields` param. 

201 """ 

202 fields = set(fields) 

203 for dec in decorators: 

204 if '*' in dec.info.fields: 

205 continue 

206 if dec.info.check_fields is False: 

207 continue 

208 for field in dec.info.fields: 

209 if field not in fields: 

210 raise PydanticUserError( 

211 f'Decorators defined with incorrect fields: {dec.cls_ref}.{dec.cls_var_name}' 

212 " (use check_fields=False if you're inheriting from the model and intended this)", 

213 code='decorator-missing-field', 

214 ) 

215 

216 

217def filter_field_decorator_info_by_field( 

218 validator_functions: Iterable[Decorator[FieldDecoratorInfoType]], field: str 

219) -> list[Decorator[FieldDecoratorInfoType]]: 

220 return [dec for dec in validator_functions if check_validator_fields_against_field_name(dec.info, field)] 

221 

222 

223def apply_each_item_validators( 

224 schema: core_schema.CoreSchema, 

225 each_item_validators: list[Decorator[ValidatorDecoratorInfo]], 

226 field_name: str | None, 

227) -> core_schema.CoreSchema: 

228 # This V1 compatibility shim should eventually be removed 

229 

230 # fail early if each_item_validators is empty 

231 if not each_item_validators: 

232 return schema 

233 

234 # push down any `each_item=True` validators 

235 # note that this won't work for any Annotated types that get wrapped by a function validator 

236 # but that's okay because that didn't exist in V1 

237 if schema['type'] == 'nullable': 

238 schema['schema'] = apply_each_item_validators(schema['schema'], each_item_validators, field_name) 

239 return schema 

240 elif schema['type'] == 'tuple': 

241 if (variadic_item_index := schema.get('variadic_item_index')) is not None: 

242 schema['items_schema'][variadic_item_index] = apply_validators( 

243 schema['items_schema'][variadic_item_index], 

244 each_item_validators, 

245 field_name, 

246 ) 

247 elif is_list_like_schema_with_items_schema(schema): 

248 inner_schema = schema.get('items_schema', core_schema.any_schema()) 

249 schema['items_schema'] = apply_validators(inner_schema, each_item_validators, field_name) 

250 elif schema['type'] == 'dict': 

251 inner_schema = schema.get('values_schema', core_schema.any_schema()) 

252 schema['values_schema'] = apply_validators(inner_schema, each_item_validators, field_name) 

253 else: 

254 raise TypeError( 

255 f'`@validator(..., each_item=True)` cannot be applied to fields with a schema of {schema["type"]}' 

256 ) 

257 return schema 

258 

259 

260def _extract_json_schema_info_from_field_info( 

261 info: FieldInfo | ComputedFieldInfo, 

262) -> tuple[JsonDict | None, JsonDict | JsonSchemaExtraCallable | None]: 

263 json_schema_updates = { 

264 'title': info.title, 

265 'description': info.description, 

266 'deprecated': bool(info.deprecated) or info.deprecated == '' or None, 

267 'examples': to_jsonable_python(info.examples), 

268 } 

269 json_schema_updates = {k: v for k, v in json_schema_updates.items() if v is not None} 

270 return (json_schema_updates or None, info.json_schema_extra) 

271 

272 

273JsonEncoders = dict[type[Any], JsonEncoder] 

274 

275 

276def _add_custom_serialization_from_json_encoders( 

277 json_encoders: JsonEncoders | None, tp: Any, schema: CoreSchema 

278) -> CoreSchema: 

279 """Iterate over the json_encoders and add the first matching encoder to the schema. 

280 

281 Args: 

282 json_encoders: A dictionary of types and their encoder functions. 

283 tp: The type to check for a matching encoder. 

284 schema: The schema to add the encoder to. 

285 """ 

286 if not json_encoders: 

287 return schema 

288 if 'serialization' in schema: 

289 return schema 

290 # Check the class type and its superclasses for a matching encoder 

291 # Decimal.__class__.__mro__ (and probably other cases) doesn't include Decimal itself 

292 # if the type is a GenericAlias (e.g. from list[int]) we need to use __class__ instead of .__mro__ 

293 for base in (tp, *getattr(tp, '__mro__', tp.__class__.__mro__)[:-1]): 

294 encoder = json_encoders.get(base) 

295 if encoder is None: 

296 continue 

297 

298 warnings.warn( 

299 f'`json_encoders` is deprecated. See https://docs.pydantic.dev/{version_short()}/concepts/serialization/#custom-serializers for alternatives', 

300 PydanticDeprecatedSince20, 

301 ) 

302 

303 # TODO: in theory we should check that the schema accepts a serialization key 

304 schema['serialization'] = core_schema.plain_serializer_function_ser_schema(encoder, when_used='json') 

305 return schema 

306 

307 return schema 

308 

309 

310def _get_first_non_null(a: Any, b: Any) -> Any: 

311 """Return the first argument if it is not None, otherwise return the second argument. 

312 

313 Use case: serialization_alias (argument a) and alias (argument b) are both defined, and serialization_alias is ''. 

314 This function will return serialization_alias, which is the first argument, even though it is an empty string. 

315 """ 

316 return a if a is not None else b 

317 

318 

319class InvalidSchemaError(Exception): 

320 """The core schema is invalid.""" 

321 

322 

323class GenerateSchema: 

324 """Generate core schema for a Pydantic model, dataclass and types like `str`, `datetime`, ... .""" 

325 

326 __slots__ = ( 

327 '_config_wrapper_stack', 

328 '_ns_resolver', 

329 '_typevars_map', 

330 'field_name_stack', 

331 'model_type_stack', 

332 'defs', 

333 ) 

334 

335 def __init__( 

336 self, 

337 config_wrapper: ConfigWrapper, 

338 ns_resolver: NsResolver | None = None, 

339 typevars_map: Mapping[TypeVar, Any] | None = None, 

340 ) -> None: 

341 # we need a stack for recursing into nested models 

342 self._config_wrapper_stack = ConfigWrapperStack(config_wrapper) 

343 self._ns_resolver = ns_resolver or NsResolver() 

344 self._typevars_map = typevars_map 

345 self.field_name_stack = _FieldNameStack() 

346 self.model_type_stack = _ModelTypeStack() 

347 self.defs = _Definitions() 

348 

349 def __init_subclass__(cls) -> None: 

350 super().__init_subclass__() 

351 warnings.warn( 

352 'Subclassing `GenerateSchema` is not supported. The API is highly subject to change in minor versions.', 

353 UserWarning, 

354 stacklevel=2, 

355 ) 

356 

357 @property 

358 def _config_wrapper(self) -> ConfigWrapper: 

359 return self._config_wrapper_stack.tail 

360 

361 @property 

362 def _types_namespace(self) -> NamespacesTuple: 

363 return self._ns_resolver.types_namespace 

364 

365 @property 

366 def _arbitrary_types(self) -> bool: 

367 return self._config_wrapper.arbitrary_types_allowed 

368 

369 # the following methods can be overridden but should be considered 

370 # unstable / private APIs 

371 def _list_schema(self, items_type: Any) -> CoreSchema: 

372 return core_schema.list_schema(self.generate_schema(items_type)) 

373 

374 def _dict_schema(self, keys_type: Any, values_type: Any) -> CoreSchema: 

375 return core_schema.dict_schema(self.generate_schema(keys_type), self.generate_schema(values_type)) 

376 

377 def _set_schema(self, items_type: Any) -> CoreSchema: 

378 return core_schema.set_schema(self.generate_schema(items_type)) 

379 

380 def _frozenset_schema(self, items_type: Any) -> CoreSchema: 

381 return core_schema.frozenset_schema(self.generate_schema(items_type)) 

382 

383 def _enum_schema(self, enum_type: type[Enum]) -> CoreSchema: 

384 cases: list[Any] = list(enum_type.__members__.values()) 

385 

386 enum_ref = get_type_ref(enum_type) 

387 description = None if not enum_type.__doc__ else inspect.cleandoc(enum_type.__doc__) 

388 if ( 

389 description == 'An enumeration.' 

390 ): # This is the default value provided by enum.EnumMeta.__new__; don't use it 

391 description = None 

392 js_updates = {'title': enum_type.__name__, 'description': description} 

393 js_updates = {k: v for k, v in js_updates.items() if v is not None} 

394 

395 sub_type: Literal['str', 'int', 'float'] | None = None 

396 if issubclass(enum_type, int): 

397 sub_type = 'int' 

398 value_ser_type: core_schema.SerSchema = core_schema.simple_ser_schema('int') 

399 elif issubclass(enum_type, str): 

400 # this handles `StrEnum` (3.11 only), and also `Foobar(str, Enum)` 

401 sub_type = 'str' 

402 value_ser_type = core_schema.simple_ser_schema('str') 

403 elif issubclass(enum_type, float): 

404 sub_type = 'float' 

405 value_ser_type = core_schema.simple_ser_schema('float') 

406 else: 

407 # TODO this is an ugly hack, how do we trigger an Any schema for serialization? 

408 value_ser_type = core_schema.plain_serializer_function_ser_schema(lambda x: x) 

409 

410 if cases: 

411 

412 def get_json_schema(schema: CoreSchema, handler: GetJsonSchemaHandler) -> JsonSchemaValue: 

413 json_schema = handler(schema) 

414 original_schema = handler.resolve_ref_schema(json_schema) 

415 original_schema.update(js_updates) 

416 return json_schema 

417 

418 # we don't want to add the missing to the schema if it's the default one 

419 default_missing = getattr(enum_type._missing_, '__func__', None) is Enum._missing_.__func__ # pyright: ignore[reportFunctionMemberAccess] 

420 enum_schema = core_schema.enum_schema( 

421 enum_type, 

422 cases, 

423 sub_type=sub_type, 

424 missing=None if default_missing else enum_type._missing_, 

425 ref=enum_ref, 

426 metadata={'pydantic_js_functions': [get_json_schema]}, 

427 ) 

428 

429 if self._config_wrapper.use_enum_values: 

430 enum_schema = core_schema.no_info_after_validator_function( 

431 attrgetter('value'), enum_schema, serialization=value_ser_type 

432 ) 

433 

434 return enum_schema 

435 

436 else: 

437 

438 def get_json_schema_no_cases(_, handler: GetJsonSchemaHandler) -> JsonSchemaValue: 

439 json_schema = handler(core_schema.enum_schema(enum_type, cases, sub_type=sub_type, ref=enum_ref)) 

440 original_schema = handler.resolve_ref_schema(json_schema) 

441 original_schema.update(js_updates) 

442 return json_schema 

443 

444 # Use an isinstance check for enums with no cases. 

445 # The most important use case for this is creating TypeVar bounds for generics that should 

446 # be restricted to enums. This is more consistent than it might seem at first, since you can only 

447 # subclass enum.Enum (or subclasses of enum.Enum) if all parent classes have no cases. 

448 # We use the get_json_schema function when an Enum subclass has been declared with no cases 

449 # so that we can still generate a valid json schema. 

450 return core_schema.is_instance_schema( 

451 enum_type, 

452 metadata={'pydantic_js_functions': [get_json_schema_no_cases]}, 

453 ) 

454 

455 def _ip_schema(self, tp: Any) -> CoreSchema: 

456 from ._validators import IP_VALIDATOR_LOOKUP, IpType 

457 

458 ip_type_json_schema_format: dict[type[IpType], str] = { 

459 IPv4Address: 'ipv4', 

460 IPv4Network: 'ipv4network', 

461 IPv4Interface: 'ipv4interface', 

462 IPv6Address: 'ipv6', 

463 IPv6Network: 'ipv6network', 

464 IPv6Interface: 'ipv6interface', 

465 } 

466 

467 def ser_ip(ip: Any, info: core_schema.SerializationInfo) -> str | IpType: 

468 if not isinstance(ip, (tp, str)): 

469 raise PydanticSerializationUnexpectedValue( 

470 f"Expected `{tp}` but got `{type(ip)}` with value `'{ip}'` - serialized value may not be as expected." 

471 ) 

472 if info.mode == 'python': 

473 return ip 

474 return str(ip) 

475 

476 return core_schema.lax_or_strict_schema( 

477 lax_schema=core_schema.no_info_plain_validator_function(IP_VALIDATOR_LOOKUP[tp]), 

478 strict_schema=core_schema.json_or_python_schema( 

479 json_schema=core_schema.no_info_after_validator_function(tp, core_schema.str_schema()), 

480 python_schema=core_schema.is_instance_schema(tp), 

481 ), 

482 serialization=core_schema.plain_serializer_function_ser_schema(ser_ip, info_arg=True, when_used='always'), 

483 metadata={ 

484 'pydantic_js_functions': [lambda _1, _2: {'type': 'string', 'format': ip_type_json_schema_format[tp]}] 

485 }, 

486 ) 

487 

488 def _path_schema(self, tp: Any, path_type: Any) -> CoreSchema: 

489 if tp is os.PathLike and (path_type not in {str, bytes} and not typing_objects.is_any(path_type)): 

490 raise PydanticUserError( 

491 '`os.PathLike` can only be used with `str`, `bytes` or `Any`', code='schema-for-unknown-type' 

492 ) 

493 

494 path_constructor = pathlib.PurePath if tp is os.PathLike else tp 

495 strict_inner_schema = ( 

496 core_schema.bytes_schema(strict=True) if (path_type is bytes) else core_schema.str_schema(strict=True) 

497 ) 

498 lax_inner_schema = core_schema.bytes_schema() if (path_type is bytes) else core_schema.str_schema() 

499 

500 def path_validator(input_value: str | bytes) -> os.PathLike[Any]: # type: ignore 

501 try: 

502 if path_type is bytes: 

503 if isinstance(input_value, bytes): 

504 try: 

505 input_value = input_value.decode() 

506 except UnicodeDecodeError as e: 

507 raise PydanticCustomError('bytes_type', 'Input must be valid bytes') from e 

508 else: 

509 raise PydanticCustomError('bytes_type', 'Input must be bytes') 

510 elif not isinstance(input_value, str): 

511 raise PydanticCustomError('path_type', 'Input is not a valid path') 

512 

513 return path_constructor(input_value) # type: ignore 

514 except TypeError as e: 

515 raise PydanticCustomError('path_type', 'Input is not a valid path') from e 

516 

517 def ser_path(path: Any, info: core_schema.SerializationInfo) -> str | os.PathLike[Any]: 

518 if not isinstance(path, (tp, str)): 

519 raise PydanticSerializationUnexpectedValue( 

520 f"Expected `{tp}` but got `{type(path)}` with value `'{path}'` - serialized value may not be as expected." 

521 ) 

522 if info.mode == 'python': 

523 return path 

524 return str(path) 

525 

526 instance_schema = core_schema.json_or_python_schema( 

527 json_schema=core_schema.no_info_after_validator_function(path_validator, lax_inner_schema), 

528 python_schema=core_schema.is_instance_schema(tp), 

529 ) 

530 

531 schema = core_schema.lax_or_strict_schema( 

532 lax_schema=core_schema.union_schema( 

533 [ 

534 instance_schema, 

535 core_schema.no_info_after_validator_function(path_validator, strict_inner_schema), 

536 ], 

537 custom_error_type='path_type', 

538 custom_error_message=f'Input is not a valid path for {tp}', 

539 ), 

540 strict_schema=instance_schema, 

541 serialization=core_schema.plain_serializer_function_ser_schema(ser_path, info_arg=True, when_used='always'), 

542 metadata={'pydantic_js_functions': [lambda source, handler: {**handler(source), 'format': 'path'}]}, 

543 ) 

544 return schema 

545 

546 def _deque_schema(self, items_type: Any) -> CoreSchema: 

547 from ._serializers import serialize_sequence_via_list 

548 from ._validators import deque_validator 

549 

550 item_type_schema = self.generate_schema(items_type) 

551 

552 # we have to use a lax list schema here, because we need to validate the deque's 

553 # items via a list schema, but it's ok if the deque itself is not a list 

554 list_schema = core_schema.list_schema(item_type_schema, strict=False) 

555 

556 check_instance = core_schema.json_or_python_schema( 

557 json_schema=list_schema, 

558 python_schema=core_schema.is_instance_schema(collections.deque, cls_repr='Deque'), 

559 ) 

560 

561 lax_schema = core_schema.no_info_wrap_validator_function(deque_validator, list_schema) 

562 

563 return core_schema.lax_or_strict_schema( 

564 lax_schema=lax_schema, 

565 strict_schema=core_schema.chain_schema([check_instance, lax_schema]), 

566 serialization=core_schema.wrap_serializer_function_ser_schema( 

567 serialize_sequence_via_list, schema=item_type_schema, info_arg=True 

568 ), 

569 ) 

570 

571 def _mapping_schema(self, tp: Any, keys_type: Any, values_type: Any) -> CoreSchema: 

572 from ._validators import MAPPING_ORIGIN_MAP, defaultdict_validator, get_defaultdict_default_default_factory 

573 

574 mapped_origin = MAPPING_ORIGIN_MAP[tp] 

575 keys_schema = self.generate_schema(keys_type) 

576 values_schema = self.generate_schema(values_type) 

577 dict_schema = core_schema.dict_schema(keys_schema, values_schema, strict=False) 

578 

579 if mapped_origin is dict: 

580 schema = dict_schema 

581 else: 

582 check_instance = core_schema.json_or_python_schema( 

583 json_schema=dict_schema, 

584 python_schema=core_schema.is_instance_schema(mapped_origin), 

585 ) 

586 

587 if tp is collections.defaultdict: 

588 default_default_factory = get_defaultdict_default_default_factory(values_type) 

589 coerce_instance_wrap = partial( 

590 core_schema.no_info_wrap_validator_function, 

591 partial(defaultdict_validator, default_default_factory=default_default_factory), 

592 ) 

593 else: 

594 coerce_instance_wrap = partial(core_schema.no_info_after_validator_function, mapped_origin) 

595 

596 lax_schema = coerce_instance_wrap(dict_schema) 

597 strict_schema = core_schema.chain_schema([check_instance, lax_schema]) 

598 

599 schema = core_schema.lax_or_strict_schema( 

600 lax_schema=lax_schema, 

601 strict_schema=strict_schema, 

602 serialization=core_schema.wrap_serializer_function_ser_schema( 

603 lambda v, h: h(v), schema=dict_schema, info_arg=False 

604 ), 

605 ) 

606 

607 return schema 

608 

609 def _fraction_schema(self) -> CoreSchema: 

610 """Support for [`fractions.Fraction`][fractions.Fraction].""" 

611 from ._validators import fraction_validator 

612 

613 # TODO: note, this is a fairly common pattern, re lax / strict for attempted type coercion, 

614 # can we use a helper function to reduce boilerplate? 

615 return core_schema.lax_or_strict_schema( 

616 lax_schema=core_schema.no_info_plain_validator_function(fraction_validator), 

617 strict_schema=core_schema.json_or_python_schema( 

618 json_schema=core_schema.no_info_plain_validator_function(fraction_validator), 

619 python_schema=core_schema.is_instance_schema(Fraction), 

620 ), 

621 # use str serialization to guarantee round trip behavior 

622 serialization=core_schema.to_string_ser_schema(when_used='always'), 

623 metadata={'pydantic_js_functions': [lambda _1, _2: {'type': 'string', 'format': 'fraction'}]}, 

624 ) 

625 

626 def _arbitrary_type_schema(self, tp: Any) -> CoreSchema: 

627 if not isinstance(tp, type): 

628 warn( 

629 f'{tp!r} is not a Python type (it may be an instance of an object),' 

630 ' Pydantic will allow any object with no validation since we cannot even' 

631 ' enforce that the input is an instance of the given type.' 

632 ' To get rid of this error wrap the type with `pydantic.SkipValidation`.', 

633 UserWarning, 

634 ) 

635 return core_schema.any_schema() 

636 return core_schema.is_instance_schema(tp) 

637 

638 def _unknown_type_schema(self, obj: Any) -> CoreSchema: 

639 raise PydanticSchemaGenerationError( 

640 f'Unable to generate pydantic-core schema for {obj!r}. ' 

641 'Set `arbitrary_types_allowed=True` in the model_config to ignore this error' 

642 ' or implement `__get_pydantic_core_schema__` on your type to fully support it.' 

643 '\n\nIf you got this error by calling handler(<some type>) within' 

644 ' `__get_pydantic_core_schema__` then you likely need to call' 

645 ' `handler.generate_schema(<some type>)` since we do not call' 

646 ' `__get_pydantic_core_schema__` on `<some type>` otherwise to avoid infinite recursion.' 

647 ) 

648 

649 def _apply_discriminator_to_union( 

650 self, schema: CoreSchema, discriminator: str | Discriminator | None 

651 ) -> CoreSchema: 

652 if discriminator is None: 

653 return schema 

654 try: 

655 return _discriminated_union.apply_discriminator( 

656 schema, 

657 discriminator, 

658 self.defs._definitions, 

659 ) 

660 except _discriminated_union.MissingDefinitionForUnionRef: 

661 # defer until defs are resolved 

662 _discriminated_union.set_discriminator_in_metadata( 

663 schema, 

664 discriminator, 

665 ) 

666 return schema 

667 

668 def clean_schema(self, schema: CoreSchema) -> CoreSchema: 

669 schema = self.defs.finalize_schema(schema) 

670 schema = validate_core_schema(schema) 

671 return schema 

672 

673 def _add_js_function(self, metadata_schema: CoreSchema, js_function: Callable[..., Any]) -> None: 

674 metadata = metadata_schema.get('metadata', {}) 

675 pydantic_js_functions = metadata.setdefault('pydantic_js_functions', []) 

676 # because of how we generate core schemas for nested generic models 

677 # we can end up adding `BaseModel.__get_pydantic_json_schema__` multiple times 

678 # this check may fail to catch duplicates if the function is a `functools.partial` 

679 # or something like that, but if it does it'll fail by inserting the duplicate 

680 if js_function not in pydantic_js_functions: 

681 pydantic_js_functions.append(js_function) 

682 metadata_schema['metadata'] = metadata 

683 

684 def generate_schema( 

685 self, 

686 obj: Any, 

687 ) -> core_schema.CoreSchema: 

688 """Generate core schema. 

689 

690 Args: 

691 obj: The object to generate core schema for. 

692 

693 Returns: 

694 The generated core schema. 

695 

696 Raises: 

697 PydanticUndefinedAnnotation: 

698 If it is not possible to evaluate forward reference. 

699 PydanticSchemaGenerationError: 

700 If it is not possible to generate pydantic-core schema. 

701 TypeError: 

702 - If `alias_generator` returns a disallowed type (must be str, AliasPath or AliasChoices). 

703 - If V1 style validator with `each_item=True` applied on a wrong field. 

704 PydanticUserError: 

705 - If `typing.TypedDict` is used instead of `typing_extensions.TypedDict` on Python < 3.12. 

706 - If `__modify_schema__` method is used instead of `__get_pydantic_json_schema__`. 

707 """ 

708 schema = self._generate_schema_from_get_schema_method(obj, obj) 

709 

710 if schema is None: 

711 schema = self._generate_schema_inner(obj) 

712 

713 metadata_js_function = _extract_get_pydantic_json_schema(obj) 

714 if metadata_js_function is not None: 

715 metadata_schema = resolve_original_schema(schema, self.defs) 

716 if metadata_schema: 

717 self._add_js_function(metadata_schema, metadata_js_function) 

718 

719 schema = _add_custom_serialization_from_json_encoders(self._config_wrapper.json_encoders, obj, schema) 

720 

721 return schema 

722 

723 def _model_schema(self, cls: type[BaseModel]) -> core_schema.CoreSchema: 

724 """Generate schema for a Pydantic model.""" 

725 BaseModel_ = import_cached_base_model() 

726 

727 with self.defs.get_schema_or_ref(cls) as (model_ref, maybe_schema): 

728 if maybe_schema is not None: 

729 return maybe_schema 

730 

731 schema = cls.__dict__.get('__pydantic_core_schema__') 

732 if schema is not None and not isinstance(schema, MockCoreSchema): 

733 if schema['type'] == 'definitions': 

734 schema = self.defs.unpack_definitions(schema) 

735 ref = get_ref(schema) 

736 if ref: 

737 return self.defs.create_definition_reference_schema(schema) 

738 else: 

739 return schema 

740 

741 config_wrapper = ConfigWrapper(cls.model_config, check=False) 

742 

743 with self._config_wrapper_stack.push(config_wrapper), self._ns_resolver.push(cls): 

744 core_config = self._config_wrapper.core_config(title=cls.__name__) 

745 

746 if cls.__pydantic_fields_complete__ or cls is BaseModel_: 

747 fields = getattr(cls, '__pydantic_fields__', {}) 

748 else: 

749 if not hasattr(cls, '__pydantic_fields__'): 

750 # This happens when we have a loop in the schema generation: 

751 # class Base[T](BaseModel): 

752 # t: T 

753 # 

754 # class Other(BaseModel): 

755 # b: 'Base[Other]' 

756 # When we build fields for `Other`, we evaluate the forward annotation. 

757 # At this point, `Other` doesn't have the model fields set. We create 

758 # `Base[Other]`; model fields are successfully built, and we try to generate 

759 # a schema for `t: Other`. As `Other.__pydantic_fields__` aren't set, we abort. 

760 raise PydanticUndefinedAnnotation( 

761 name=cls.__name__, 

762 message=f'Class {cls.__name__!r} is not defined', 

763 ) 

764 try: 

765 fields = rebuild_model_fields( 

766 cls, 

767 ns_resolver=self._ns_resolver, 

768 typevars_map=self._typevars_map or {}, 

769 ) 

770 except NameError as e: 

771 raise PydanticUndefinedAnnotation.from_name_error(e) from e 

772 

773 decorators = cls.__pydantic_decorators__ 

774 computed_fields = decorators.computed_fields 

775 check_decorator_fields_exist( 

776 chain( 

777 decorators.field_validators.values(), 

778 decorators.field_serializers.values(), 

779 decorators.validators.values(), 

780 ), 

781 {*fields.keys(), *computed_fields.keys()}, 

782 ) 

783 

784 model_validators = decorators.model_validators.values() 

785 

786 extras_schema = None 

787 extras_keys_schema = None 

788 if core_config.get('extra_fields_behavior') == 'allow': 

789 assert cls.__mro__[0] is cls 

790 assert cls.__mro__[-1] is object 

791 for candidate_cls in cls.__mro__[:-1]: 

792 extras_annotation = getattr(candidate_cls, '__annotations__', {}).get( 

793 '__pydantic_extra__', None 

794 ) 

795 if extras_annotation is not None: 

796 if isinstance(extras_annotation, str): 

797 extras_annotation = _typing_extra.eval_type_backport( 

798 _typing_extra._make_forward_ref( 

799 extras_annotation, is_argument=False, is_class=True 

800 ), 

801 *self._types_namespace, 

802 ) 

803 tp = get_origin(extras_annotation) 

804 if tp not in DICT_TYPES: 

805 raise PydanticSchemaGenerationError( 

806 'The type annotation for `__pydantic_extra__` must be `dict[str, ...]`' 

807 ) 

808 extra_keys_type, extra_items_type = self._get_args_resolving_forward_refs( 

809 extras_annotation, 

810 required=True, 

811 ) 

812 if extra_keys_type is not str: 

813 extras_keys_schema = self.generate_schema(extra_keys_type) 

814 if not typing_objects.is_any(extra_items_type): 

815 extras_schema = self.generate_schema(extra_items_type) 

816 if extras_keys_schema is not None or extras_schema is not None: 

817 break 

818 

819 generic_origin: type[BaseModel] | None = getattr(cls, '__pydantic_generic_metadata__', {}).get('origin') 

820 

821 if cls.__pydantic_root_model__: 

822 root_field = self._common_field_schema('root', fields['root'], decorators) 

823 inner_schema = root_field['schema'] 

824 inner_schema = apply_model_validators(inner_schema, model_validators, 'inner') 

825 model_schema = core_schema.model_schema( 

826 cls, 

827 inner_schema, 

828 generic_origin=generic_origin, 

829 custom_init=getattr(cls, '__pydantic_custom_init__', None), 

830 root_model=True, 

831 post_init=getattr(cls, '__pydantic_post_init__', None), 

832 config=core_config, 

833 ref=model_ref, 

834 ) 

835 else: 

836 fields_schema: core_schema.CoreSchema = core_schema.model_fields_schema( 

837 {k: self._generate_md_field_schema(k, v, decorators) for k, v in fields.items()}, 

838 computed_fields=[ 

839 self._computed_field_schema(d, decorators.field_serializers) 

840 for d in computed_fields.values() 

841 ], 

842 extras_schema=extras_schema, 

843 extras_keys_schema=extras_keys_schema, 

844 model_name=cls.__name__, 

845 ) 

846 inner_schema = apply_validators(fields_schema, decorators.root_validators.values(), None) 

847 inner_schema = apply_model_validators(inner_schema, model_validators, 'inner') 

848 

849 model_schema = core_schema.model_schema( 

850 cls, 

851 inner_schema, 

852 generic_origin=generic_origin, 

853 custom_init=getattr(cls, '__pydantic_custom_init__', None), 

854 root_model=False, 

855 post_init=getattr(cls, '__pydantic_post_init__', None), 

856 config=core_config, 

857 ref=model_ref, 

858 ) 

859 

860 schema = self._apply_model_serializers(model_schema, decorators.model_serializers.values()) 

861 schema = apply_model_validators(schema, model_validators, 'outer') 

862 return self.defs.create_definition_reference_schema(schema) 

863 

864 def _resolve_self_type(self, obj: Any) -> Any: 

865 obj = self.model_type_stack.get() 

866 if obj is None: 

867 raise PydanticUserError('`typing.Self` is invalid in this context', code='invalid-self-type') 

868 return obj 

869 

870 def _generate_schema_from_get_schema_method(self, obj: Any, source: Any) -> core_schema.CoreSchema | None: 

871 BaseModel_ = import_cached_base_model() 

872 

873 get_schema = getattr(obj, '__get_pydantic_core_schema__', None) 

874 is_base_model_get_schema = ( 

875 getattr(get_schema, '__func__', None) is BaseModel_.__get_pydantic_core_schema__.__func__ # pyright: ignore[reportFunctionMemberAccess] 

876 ) 

877 

878 if ( 

879 get_schema is not None 

880 # BaseModel.__get_pydantic_core_schema__ is defined for backwards compatibility, 

881 # to allow existing code to call `super().__get_pydantic_core_schema__` in Pydantic 

882 # model that overrides `__get_pydantic_core_schema__`. However, it raises a deprecation 

883 # warning stating that the method will be removed, and during the core schema gen we actually 

884 # don't call the method: 

885 and not is_base_model_get_schema 

886 ): 

887 # Some referenceable types might have a `__get_pydantic_core_schema__` method 

888 # defined on it by users (e.g. on a dataclass). This generally doesn't play well 

889 # as these types are already recognized by the `GenerateSchema` class and isn't ideal 

890 # as we might end up calling `get_schema_or_ref` (expensive) on types that are actually 

891 # not referenceable: 

892 with self.defs.get_schema_or_ref(obj) as (_, maybe_schema): 

893 if maybe_schema is not None: 

894 return maybe_schema 

895 

896 if obj is source: 

897 ref_mode = 'unpack' 

898 else: 

899 ref_mode = 'to-def' 

900 schema = get_schema( 

901 source, CallbackGetCoreSchemaHandler(self._generate_schema_inner, self, ref_mode=ref_mode) 

902 ) 

903 if schema['type'] == 'definitions': 

904 schema = self.defs.unpack_definitions(schema) 

905 

906 ref = get_ref(schema) 

907 if ref: 

908 return self.defs.create_definition_reference_schema(schema) 

909 

910 # Note: if schema is of type `'definition-ref'`, we might want to copy it as a 

911 # safety measure (because these are inlined in place -- i.e. mutated directly) 

912 return schema 

913 

914 if get_schema is None and (validators := getattr(obj, '__get_validators__', None)) is not None: 

915 from pydantic.v1 import BaseModel as BaseModelV1 

916 

917 if issubclass(obj, BaseModelV1): 

918 warn( 

919 f'Mixing V1 models and V2 models (or constructs, like `TypeAdapter`) is not supported. Please upgrade `{obj.__name__}` to V2.', 

920 UserWarning, 

921 ) 

922 else: 

923 warn( 

924 '`__get_validators__` is deprecated and will be removed, use `__get_pydantic_core_schema__` instead.', 

925 PydanticDeprecatedSince20, 

926 ) 

927 return core_schema.chain_schema([core_schema.with_info_plain_validator_function(v) for v in validators()]) 

928 

929 def _resolve_forward_ref(self, obj: Any) -> Any: 

930 # we assume that types_namespace has the target of forward references in its scope, 

931 # but this could fail, for example, if calling Validator on an imported type which contains 

932 # forward references to other types only defined in the module from which it was imported 

933 # `Validator(SomeImportedTypeAliasWithAForwardReference)` 

934 # or the equivalent for BaseModel 

935 # class Model(BaseModel): 

936 # x: SomeImportedTypeAliasWithAForwardReference 

937 try: 

938 obj = _typing_extra.eval_type_backport(obj, *self._types_namespace) 

939 except NameError as e: 

940 raise PydanticUndefinedAnnotation.from_name_error(e) from e 

941 

942 # if obj is still a ForwardRef, it means we can't evaluate it, raise PydanticUndefinedAnnotation 

943 if isinstance(obj, ForwardRef): 

944 raise PydanticUndefinedAnnotation(obj.__forward_arg__, f'Unable to evaluate forward reference {obj}') 

945 

946 if self._typevars_map: 

947 obj = replace_types(obj, self._typevars_map) 

948 

949 return obj 

950 

951 @overload 

952 def _get_args_resolving_forward_refs(self, obj: Any, required: Literal[True]) -> tuple[Any, ...]: ... 

953 

954 @overload 

955 def _get_args_resolving_forward_refs(self, obj: Any) -> tuple[Any, ...] | None: ... 

956 

957 def _get_args_resolving_forward_refs(self, obj: Any, required: bool = False) -> tuple[Any, ...] | None: 

958 args = get_args(obj) 

959 if args: 

960 if isinstance(obj, GenericAlias): 

961 # PEP 585 generic aliases don't convert args to ForwardRefs, unlike `typing.List/Dict` etc. 

962 args = (_typing_extra._make_forward_ref(a) if isinstance(a, str) else a for a in args) 

963 args = tuple(self._resolve_forward_ref(a) if isinstance(a, ForwardRef) else a for a in args) 

964 elif required: # pragma: no cover 

965 raise TypeError(f'Expected {obj} to have generic parameters but it had none') 

966 return args 

967 

968 def _get_first_arg_or_any(self, obj: Any) -> Any: 

969 args = self._get_args_resolving_forward_refs(obj) 

970 if not args: 

971 return Any 

972 return args[0] 

973 

974 def _get_first_two_args_or_any(self, obj: Any) -> tuple[Any, Any]: 

975 args = self._get_args_resolving_forward_refs(obj) 

976 if not args: 

977 return (Any, Any) 

978 if len(args) < 2: 

979 origin = get_origin(obj) 

980 raise TypeError(f'Expected two type arguments for {origin}, got 1') 

981 return args[0], args[1] 

982 

983 def _generate_schema_inner(self, obj: Any) -> core_schema.CoreSchema: 

984 if typing_objects.is_self(obj): 

985 obj = self._resolve_self_type(obj) 

986 

987 if typing_objects.is_annotated(get_origin(obj)): 

988 return self._annotated_schema(obj) 

989 

990 if isinstance(obj, dict): 

991 # we assume this is already a valid schema 

992 return obj # type: ignore[return-value] 

993 

994 if isinstance(obj, str): 

995 obj = ForwardRef(obj) 

996 

997 if isinstance(obj, ForwardRef): 

998 return self.generate_schema(self._resolve_forward_ref(obj)) 

999 

1000 BaseModel = import_cached_base_model() 

1001 

1002 if lenient_issubclass(obj, BaseModel): 

1003 with self.model_type_stack.push(obj): 

1004 return self._model_schema(obj) 

1005 

1006 if isinstance(obj, PydanticRecursiveRef): 

1007 return core_schema.definition_reference_schema(schema_ref=obj.type_ref) 

1008 

1009 return self.match_type(obj) 

1010 

1011 def match_type(self, obj: Any) -> core_schema.CoreSchema: # noqa: C901 

1012 """Main mapping of types to schemas. 

1013 

1014 The general structure is a series of if statements starting with the simple cases 

1015 (non-generic primitive types) and then handling generics and other more complex cases. 

1016 

1017 Each case either generates a schema directly, calls into a public user-overridable method 

1018 (like `GenerateSchema.tuple_variable_schema`) or calls into a private method that handles some 

1019 boilerplate before calling into the user-facing method (e.g. `GenerateSchema._tuple_schema`). 

1020 

1021 The idea is that we'll evolve this into adding more and more user facing methods over time 

1022 as they get requested and we figure out what the right API for them is. 

1023 """ 

1024 if obj is str: 

1025 return core_schema.str_schema() 

1026 elif obj is bytes: 

1027 return core_schema.bytes_schema() 

1028 elif obj is int: 

1029 return core_schema.int_schema() 

1030 elif obj is float: 

1031 return core_schema.float_schema() 

1032 elif obj is bool: 

1033 return core_schema.bool_schema() 

1034 elif obj is complex: 

1035 return core_schema.complex_schema() 

1036 elif typing_objects.is_any(obj) or obj is object: 

1037 return core_schema.any_schema() 

1038 elif obj is datetime.date: 

1039 return core_schema.date_schema() 

1040 elif obj is datetime.datetime: 

1041 return core_schema.datetime_schema() 

1042 elif obj is datetime.time: 

1043 return core_schema.time_schema() 

1044 elif obj is datetime.timedelta: 

1045 return core_schema.timedelta_schema() 

1046 elif obj is Decimal: 

1047 return core_schema.decimal_schema() 

1048 elif obj is UUID: 

1049 return core_schema.uuid_schema() 

1050 elif obj is Url: 

1051 return core_schema.url_schema() 

1052 elif obj is Fraction: 

1053 return self._fraction_schema() 

1054 elif obj is MultiHostUrl: 

1055 return core_schema.multi_host_url_schema() 

1056 elif obj is None or obj is _typing_extra.NoneType: 

1057 return core_schema.none_schema() 

1058 elif obj in IP_TYPES: 

1059 return self._ip_schema(obj) 

1060 elif obj in TUPLE_TYPES: 

1061 return self._tuple_schema(obj) 

1062 elif obj in LIST_TYPES: 

1063 return self._list_schema(Any) 

1064 elif obj in SET_TYPES: 

1065 return self._set_schema(Any) 

1066 elif obj in FROZEN_SET_TYPES: 

1067 return self._frozenset_schema(Any) 

1068 elif obj in SEQUENCE_TYPES: 

1069 return self._sequence_schema(Any) 

1070 elif obj in ITERABLE_TYPES: 

1071 return self._iterable_schema(obj) 

1072 elif obj in DICT_TYPES: 

1073 return self._dict_schema(Any, Any) 

1074 elif obj in PATH_TYPES: 

1075 return self._path_schema(obj, Any) 

1076 elif obj in DEQUE_TYPES: 

1077 return self._deque_schema(Any) 

1078 elif obj in MAPPING_TYPES: 

1079 return self._mapping_schema(obj, Any, Any) 

1080 elif obj in COUNTER_TYPES: 

1081 return self._mapping_schema(obj, Any, int) 

1082 elif typing_objects.is_typealiastype(obj): 

1083 return self._type_alias_type_schema(obj) 

1084 elif obj is type: 

1085 return self._type_schema() 

1086 elif _typing_extra.is_callable(obj): 

1087 return core_schema.callable_schema() 

1088 elif typing_objects.is_literal(get_origin(obj)): 

1089 return self._literal_schema(obj) 

1090 elif is_typeddict(obj): 

1091 return self._typed_dict_schema(obj, None) 

1092 elif _typing_extra.is_namedtuple(obj): 

1093 return self._namedtuple_schema(obj, None) 

1094 elif typing_objects.is_newtype(obj): 

1095 # NewType, can't use isinstance because it fails <3.10 

1096 return self.generate_schema(obj.__supertype__) 

1097 elif obj in PATTERN_TYPES: 

1098 return self._pattern_schema(obj) 

1099 elif _typing_extra.is_hashable(obj): 

1100 return self._hashable_schema() 

1101 elif isinstance(obj, typing.TypeVar): 

1102 return self._unsubstituted_typevar_schema(obj) 

1103 elif _typing_extra.is_finalvar(obj): 

1104 if obj is Final: 

1105 return core_schema.any_schema() 

1106 return self.generate_schema( 

1107 self._get_first_arg_or_any(obj), 

1108 ) 

1109 elif isinstance(obj, VALIDATE_CALL_SUPPORTED_TYPES): 

1110 return self._call_schema(obj) 

1111 elif inspect.isclass(obj) and issubclass(obj, Enum): 

1112 return self._enum_schema(obj) 

1113 elif obj is ZoneInfo: 

1114 return self._zoneinfo_schema() 

1115 

1116 # dataclasses.is_dataclass coerces dc instances to types, but we only handle 

1117 # the case of a dc type here 

1118 if dataclasses.is_dataclass(obj): 

1119 return self._dataclass_schema(obj, None) # pyright: ignore[reportArgumentType] 

1120 

1121 origin = get_origin(obj) 

1122 if origin is not None: 

1123 return self._match_generic_type(obj, origin) 

1124 

1125 if self._arbitrary_types: 

1126 return self._arbitrary_type_schema(obj) 

1127 return self._unknown_type_schema(obj) 

1128 

1129 def _match_generic_type(self, obj: Any, origin: Any) -> CoreSchema: # noqa: C901 

1130 # Need to handle generic dataclasses before looking for the schema properties because attribute accesses 

1131 # on _GenericAlias delegate to the origin type, so lose the information about the concrete parametrization 

1132 # As a result, currently, there is no way to cache the schema for generic dataclasses. This may be possible 

1133 # to resolve by modifying the value returned by `Generic.__class_getitem__`, but that is a dangerous game. 

1134 if dataclasses.is_dataclass(origin): 

1135 return self._dataclass_schema(obj, origin) # pyright: ignore[reportArgumentType] 

1136 if _typing_extra.is_namedtuple(origin): 

1137 return self._namedtuple_schema(obj, origin) 

1138 

1139 schema = self._generate_schema_from_get_schema_method(origin, obj) 

1140 if schema is not None: 

1141 return schema 

1142 

1143 if typing_objects.is_typealiastype(origin): 

1144 return self._type_alias_type_schema(obj) 

1145 elif is_union_origin(origin): 

1146 return self._union_schema(obj) 

1147 elif origin in TUPLE_TYPES: 

1148 return self._tuple_schema(obj) 

1149 elif origin in LIST_TYPES: 

1150 return self._list_schema(self._get_first_arg_or_any(obj)) 

1151 elif origin in SET_TYPES: 

1152 return self._set_schema(self._get_first_arg_or_any(obj)) 

1153 elif origin in FROZEN_SET_TYPES: 

1154 return self._frozenset_schema(self._get_first_arg_or_any(obj)) 

1155 elif origin in DICT_TYPES: 

1156 return self._dict_schema(*self._get_first_two_args_or_any(obj)) 

1157 elif origin in PATH_TYPES: 

1158 return self._path_schema(origin, self._get_first_arg_or_any(obj)) 

1159 elif origin in DEQUE_TYPES: 

1160 return self._deque_schema(self._get_first_arg_or_any(obj)) 

1161 elif origin in MAPPING_TYPES: 

1162 return self._mapping_schema(origin, *self._get_first_two_args_or_any(obj)) 

1163 elif origin in COUNTER_TYPES: 

1164 return self._mapping_schema(origin, self._get_first_arg_or_any(obj), int) 

1165 elif is_typeddict(origin): 

1166 return self._typed_dict_schema(obj, origin) 

1167 elif origin in TYPE_TYPES: 

1168 return self._subclass_schema(obj) 

1169 elif origin in SEQUENCE_TYPES: 

1170 return self._sequence_schema(self._get_first_arg_or_any(obj)) 

1171 elif origin in ITERABLE_TYPES: 

1172 return self._iterable_schema(obj) 

1173 elif origin in PATTERN_TYPES: 

1174 return self._pattern_schema(obj) 

1175 

1176 if self._arbitrary_types: 

1177 return self._arbitrary_type_schema(origin) 

1178 return self._unknown_type_schema(obj) 

1179 

1180 def _generate_td_field_schema( 

1181 self, 

1182 name: str, 

1183 field_info: FieldInfo, 

1184 decorators: DecoratorInfos, 

1185 *, 

1186 required: bool = True, 

1187 ) -> core_schema.TypedDictField: 

1188 """Prepare a TypedDictField to represent a model or typeddict field.""" 

1189 common_field = self._common_field_schema(name, field_info, decorators) 

1190 return core_schema.typed_dict_field( 

1191 common_field['schema'], 

1192 required=False if not field_info.is_required() else required, 

1193 serialization_exclude=common_field['serialization_exclude'], 

1194 validation_alias=common_field['validation_alias'], 

1195 serialization_alias=common_field['serialization_alias'], 

1196 metadata=common_field['metadata'], 

1197 ) 

1198 

1199 def _generate_md_field_schema( 

1200 self, 

1201 name: str, 

1202 field_info: FieldInfo, 

1203 decorators: DecoratorInfos, 

1204 ) -> core_schema.ModelField: 

1205 """Prepare a ModelField to represent a model field.""" 

1206 common_field = self._common_field_schema(name, field_info, decorators) 

1207 return core_schema.model_field( 

1208 common_field['schema'], 

1209 serialization_exclude=common_field['serialization_exclude'], 

1210 validation_alias=common_field['validation_alias'], 

1211 serialization_alias=common_field['serialization_alias'], 

1212 frozen=common_field['frozen'], 

1213 metadata=common_field['metadata'], 

1214 ) 

1215 

1216 def _generate_dc_field_schema( 

1217 self, 

1218 name: str, 

1219 field_info: FieldInfo, 

1220 decorators: DecoratorInfos, 

1221 ) -> core_schema.DataclassField: 

1222 """Prepare a DataclassField to represent the parameter/field, of a dataclass.""" 

1223 common_field = self._common_field_schema(name, field_info, decorators) 

1224 return core_schema.dataclass_field( 

1225 name, 

1226 common_field['schema'], 

1227 init=field_info.init, 

1228 init_only=field_info.init_var or None, 

1229 kw_only=None if field_info.kw_only else False, 

1230 serialization_exclude=common_field['serialization_exclude'], 

1231 validation_alias=common_field['validation_alias'], 

1232 serialization_alias=common_field['serialization_alias'], 

1233 frozen=common_field['frozen'], 

1234 metadata=common_field['metadata'], 

1235 ) 

1236 

1237 @staticmethod 

1238 def _apply_alias_generator_to_field_info( 

1239 alias_generator: Callable[[str], str] | AliasGenerator, field_info: FieldInfo, field_name: str 

1240 ) -> None: 

1241 """Apply an alias_generator to aliases on a FieldInfo instance if appropriate. 

1242 

1243 Args: 

1244 alias_generator: A callable that takes a string and returns a string, or an AliasGenerator instance. 

1245 field_info: The FieldInfo instance to which the alias_generator is (maybe) applied. 

1246 field_name: The name of the field from which to generate the alias. 

1247 """ 

1248 # Apply an alias_generator if 

1249 # 1. An alias is not specified 

1250 # 2. An alias is specified, but the priority is <= 1 

1251 if ( 

1252 field_info.alias_priority is None 

1253 or field_info.alias_priority <= 1 

1254 or field_info.alias is None 

1255 or field_info.validation_alias is None 

1256 or field_info.serialization_alias is None 

1257 ): 

1258 alias, validation_alias, serialization_alias = None, None, None 

1259 

1260 if isinstance(alias_generator, AliasGenerator): 

1261 alias, validation_alias, serialization_alias = alias_generator.generate_aliases(field_name) 

1262 elif isinstance(alias_generator, Callable): 

1263 alias = alias_generator(field_name) 

1264 if not isinstance(alias, str): 

1265 raise TypeError(f'alias_generator {alias_generator} must return str, not {alias.__class__}') 

1266 

1267 # if priority is not set, we set to 1 

1268 # which supports the case where the alias_generator from a child class is used 

1269 # to generate an alias for a field in a parent class 

1270 if field_info.alias_priority is None or field_info.alias_priority <= 1: 

1271 field_info.alias_priority = 1 

1272 

1273 # if the priority is 1, then we set the aliases to the generated alias 

1274 if field_info.alias_priority == 1: 

1275 field_info.serialization_alias = _get_first_non_null(serialization_alias, alias) 

1276 field_info.validation_alias = _get_first_non_null(validation_alias, alias) 

1277 field_info.alias = alias 

1278 

1279 # if any of the aliases are not set, then we set them to the corresponding generated alias 

1280 if field_info.alias is None: 

1281 field_info.alias = alias 

1282 if field_info.serialization_alias is None: 

1283 field_info.serialization_alias = _get_first_non_null(serialization_alias, alias) 

1284 if field_info.validation_alias is None: 

1285 field_info.validation_alias = _get_first_non_null(validation_alias, alias) 

1286 

1287 @staticmethod 

1288 def _apply_alias_generator_to_computed_field_info( 

1289 alias_generator: Callable[[str], str] | AliasGenerator, 

1290 computed_field_info: ComputedFieldInfo, 

1291 computed_field_name: str, 

1292 ): 

1293 """Apply an alias_generator to alias on a ComputedFieldInfo instance if appropriate. 

1294 

1295 Args: 

1296 alias_generator: A callable that takes a string and returns a string, or an AliasGenerator instance. 

1297 computed_field_info: The ComputedFieldInfo instance to which the alias_generator is (maybe) applied. 

1298 computed_field_name: The name of the computed field from which to generate the alias. 

1299 """ 

1300 # Apply an alias_generator if 

1301 # 1. An alias is not specified 

1302 # 2. An alias is specified, but the priority is <= 1 

1303 

1304 if ( 

1305 computed_field_info.alias_priority is None 

1306 or computed_field_info.alias_priority <= 1 

1307 or computed_field_info.alias is None 

1308 ): 

1309 alias, validation_alias, serialization_alias = None, None, None 

1310 

1311 if isinstance(alias_generator, AliasGenerator): 

1312 alias, validation_alias, serialization_alias = alias_generator.generate_aliases(computed_field_name) 

1313 elif isinstance(alias_generator, Callable): 

1314 alias = alias_generator(computed_field_name) 

1315 if not isinstance(alias, str): 

1316 raise TypeError(f'alias_generator {alias_generator} must return str, not {alias.__class__}') 

1317 

1318 # if priority is not set, we set to 1 

1319 # which supports the case where the alias_generator from a child class is used 

1320 # to generate an alias for a field in a parent class 

1321 if computed_field_info.alias_priority is None or computed_field_info.alias_priority <= 1: 

1322 computed_field_info.alias_priority = 1 

1323 

1324 # if the priority is 1, then we set the aliases to the generated alias 

1325 # note that we use the serialization_alias with priority over alias, as computed_field 

1326 # aliases are used for serialization only (not validation) 

1327 if computed_field_info.alias_priority == 1: 

1328 computed_field_info.alias = _get_first_non_null(serialization_alias, alias) 

1329 

1330 @staticmethod 

1331 def _apply_field_title_generator_to_field_info( 

1332 config_wrapper: ConfigWrapper, field_info: FieldInfo | ComputedFieldInfo, field_name: str 

1333 ) -> None: 

1334 """Apply a field_title_generator on a FieldInfo or ComputedFieldInfo instance if appropriate 

1335 Args: 

1336 config_wrapper: The config of the model 

1337 field_info: The FieldInfo or ComputedField instance to which the title_generator is (maybe) applied. 

1338 field_name: The name of the field from which to generate the title. 

1339 """ 

1340 field_title_generator = field_info.field_title_generator or config_wrapper.field_title_generator 

1341 

1342 if field_title_generator is None: 

1343 return 

1344 

1345 if field_info.title is None: 

1346 title = field_title_generator(field_name, field_info) # type: ignore 

1347 if not isinstance(title, str): 

1348 raise TypeError(f'field_title_generator {field_title_generator} must return str, not {title.__class__}') 

1349 

1350 field_info.title = title 

1351 

1352 def _common_field_schema( # C901 

1353 self, name: str, field_info: FieldInfo, decorators: DecoratorInfos 

1354 ) -> _CommonField: 

1355 source_type, annotations = field_info.annotation, field_info.metadata 

1356 

1357 def set_discriminator(schema: CoreSchema) -> CoreSchema: 

1358 schema = self._apply_discriminator_to_union(schema, field_info.discriminator) 

1359 return schema 

1360 

1361 # Convert `@field_validator` decorators to `Before/After/Plain/WrapValidator` instances: 

1362 validators_from_decorators = [] 

1363 for decorator in filter_field_decorator_info_by_field(decorators.field_validators.values(), name): 

1364 validators_from_decorators.append(_mode_to_validator[decorator.info.mode]._from_decorator(decorator)) 

1365 

1366 with self.field_name_stack.push(name): 

1367 if field_info.discriminator is not None: 

1368 schema = self._apply_annotations( 

1369 source_type, annotations + validators_from_decorators, transform_inner_schema=set_discriminator 

1370 ) 

1371 else: 

1372 schema = self._apply_annotations( 

1373 source_type, 

1374 annotations + validators_from_decorators, 

1375 ) 

1376 

1377 # This V1 compatibility shim should eventually be removed 

1378 # push down any `each_item=True` validators 

1379 # note that this won't work for any Annotated types that get wrapped by a function validator 

1380 # but that's okay because that didn't exist in V1 

1381 this_field_validators = filter_field_decorator_info_by_field(decorators.validators.values(), name) 

1382 if _validators_require_validate_default(this_field_validators): 

1383 field_info.validate_default = True 

1384 each_item_validators = [v for v in this_field_validators if v.info.each_item is True] 

1385 this_field_validators = [v for v in this_field_validators if v not in each_item_validators] 

1386 schema = apply_each_item_validators(schema, each_item_validators, name) 

1387 

1388 schema = apply_validators(schema, this_field_validators, name) 

1389 

1390 # the default validator needs to go outside of any other validators 

1391 # so that it is the topmost validator for the field validator 

1392 # which uses it to check if the field has a default value or not 

1393 if not field_info.is_required(): 

1394 schema = wrap_default(field_info, schema) 

1395 

1396 schema = self._apply_field_serializers( 

1397 schema, filter_field_decorator_info_by_field(decorators.field_serializers.values(), name) 

1398 ) 

1399 self._apply_field_title_generator_to_field_info(self._config_wrapper, field_info, name) 

1400 

1401 pydantic_js_updates, pydantic_js_extra = _extract_json_schema_info_from_field_info(field_info) 

1402 core_metadata: dict[str, Any] = {} 

1403 update_core_metadata( 

1404 core_metadata, pydantic_js_updates=pydantic_js_updates, pydantic_js_extra=pydantic_js_extra 

1405 ) 

1406 

1407 alias_generator = self._config_wrapper.alias_generator 

1408 if alias_generator is not None: 

1409 self._apply_alias_generator_to_field_info(alias_generator, field_info, name) 

1410 

1411 if isinstance(field_info.validation_alias, (AliasChoices, AliasPath)): 

1412 validation_alias = field_info.validation_alias.convert_to_aliases() 

1413 else: 

1414 validation_alias = field_info.validation_alias 

1415 

1416 return _common_field( 

1417 schema, 

1418 serialization_exclude=True if field_info.exclude else None, 

1419 validation_alias=validation_alias, 

1420 serialization_alias=field_info.serialization_alias, 

1421 frozen=field_info.frozen, 

1422 metadata=core_metadata, 

1423 ) 

1424 

1425 def _union_schema(self, union_type: Any) -> core_schema.CoreSchema: 

1426 """Generate schema for a Union.""" 

1427 args = self._get_args_resolving_forward_refs(union_type, required=True) 

1428 choices: list[CoreSchema] = [] 

1429 nullable = False 

1430 for arg in args: 

1431 if arg is None or arg is _typing_extra.NoneType: 

1432 nullable = True 

1433 else: 

1434 choices.append(self.generate_schema(arg)) 

1435 

1436 if len(choices) == 1: 

1437 s = choices[0] 

1438 else: 

1439 choices_with_tags: list[CoreSchema | tuple[CoreSchema, str]] = [] 

1440 for choice in choices: 

1441 tag = cast(CoreMetadata, choice.get('metadata', {})).get('pydantic_internal_union_tag_key') 

1442 if tag is not None: 

1443 choices_with_tags.append((choice, tag)) 

1444 else: 

1445 choices_with_tags.append(choice) 

1446 s = core_schema.union_schema(choices_with_tags) 

1447 

1448 if nullable: 

1449 s = core_schema.nullable_schema(s) 

1450 return s 

1451 

1452 def _type_alias_type_schema(self, obj: TypeAliasType) -> CoreSchema: 

1453 with self.defs.get_schema_or_ref(obj) as (ref, maybe_schema): 

1454 if maybe_schema is not None: 

1455 return maybe_schema 

1456 

1457 origin: TypeAliasType = get_origin(obj) or obj 

1458 typevars_map = get_standard_typevars_map(obj) 

1459 

1460 with self._ns_resolver.push(origin): 

1461 try: 

1462 annotation = _typing_extra.eval_type(origin.__value__, *self._types_namespace) 

1463 except NameError as e: 

1464 raise PydanticUndefinedAnnotation.from_name_error(e) from e 

1465 annotation = replace_types(annotation, typevars_map) 

1466 schema = self.generate_schema(annotation) 

1467 assert schema['type'] != 'definitions' 

1468 schema['ref'] = ref # type: ignore 

1469 return self.defs.create_definition_reference_schema(schema) 

1470 

1471 def _literal_schema(self, literal_type: Any) -> CoreSchema: 

1472 """Generate schema for a Literal.""" 

1473 expected = list(get_literal_values(literal_type, type_check=False, unpack_type_aliases='eager')) 

1474 assert expected, f'literal "expected" cannot be empty, obj={literal_type}' 

1475 schema = core_schema.literal_schema(expected) 

1476 

1477 if self._config_wrapper.use_enum_values and any(isinstance(v, Enum) for v in expected): 

1478 schema = core_schema.no_info_after_validator_function( 

1479 lambda v: v.value if isinstance(v, Enum) else v, schema 

1480 ) 

1481 

1482 return schema 

1483 

1484 def _typed_dict_schema(self, typed_dict_cls: Any, origin: Any) -> core_schema.CoreSchema: 

1485 """Generate a core schema for a `TypedDict` class. 

1486 

1487 To be able to build a `DecoratorInfos` instance for the `TypedDict` class (which will include 

1488 validators, serializers, etc.), we need to have access to the original bases of the class 

1489 (see https://docs.python.org/3/library/types.html#types.get_original_bases). 

1490 However, the `__orig_bases__` attribute was only added in 3.12 (https://github.com/python/cpython/pull/103698). 

1491 

1492 For this reason, we require Python 3.12 (or using the `typing_extensions` backport). 

1493 """ 

1494 FieldInfo = import_cached_field_info() 

1495 

1496 with ( 

1497 self.model_type_stack.push(typed_dict_cls), 

1498 self.defs.get_schema_or_ref(typed_dict_cls) as ( 

1499 typed_dict_ref, 

1500 maybe_schema, 

1501 ), 

1502 ): 

1503 if maybe_schema is not None: 

1504 return maybe_schema 

1505 

1506 typevars_map = get_standard_typevars_map(typed_dict_cls) 

1507 if origin is not None: 

1508 typed_dict_cls = origin 

1509 

1510 if not _SUPPORTS_TYPEDDICT and type(typed_dict_cls).__module__ == 'typing': 

1511 raise PydanticUserError( 

1512 'Please use `typing_extensions.TypedDict` instead of `typing.TypedDict` on Python < 3.12.', 

1513 code='typed-dict-version', 

1514 ) 

1515 

1516 try: 

1517 # if a typed dictionary class doesn't have config, we use the parent's config, hence a default of `None` 

1518 # see https://github.com/pydantic/pydantic/issues/10917 

1519 config: ConfigDict | None = get_attribute_from_bases(typed_dict_cls, '__pydantic_config__') 

1520 except AttributeError: 

1521 config = None 

1522 

1523 with self._config_wrapper_stack.push(config): 

1524 core_config = self._config_wrapper.core_config(title=typed_dict_cls.__name__) 

1525 

1526 required_keys: frozenset[str] = typed_dict_cls.__required_keys__ 

1527 

1528 fields: dict[str, core_schema.TypedDictField] = {} 

1529 

1530 decorators = DecoratorInfos.build(typed_dict_cls) 

1531 

1532 if self._config_wrapper.use_attribute_docstrings: 

1533 field_docstrings = extract_docstrings_from_cls(typed_dict_cls, use_inspect=True) 

1534 else: 

1535 field_docstrings = None 

1536 

1537 try: 

1538 annotations = _typing_extra.get_cls_type_hints(typed_dict_cls, ns_resolver=self._ns_resolver) 

1539 except NameError as e: 

1540 raise PydanticUndefinedAnnotation.from_name_error(e) from e 

1541 

1542 readonly_fields: list[str] = [] 

1543 

1544 for field_name, annotation in annotations.items(): 

1545 field_info = FieldInfo.from_annotation(annotation, _source=AnnotationSource.TYPED_DICT) 

1546 field_info.annotation = replace_types(field_info.annotation, typevars_map) 

1547 

1548 required = ( 

1549 field_name in required_keys or 'required' in field_info._qualifiers 

1550 ) and 'not_required' not in field_info._qualifiers 

1551 if 'read_only' in field_info._qualifiers: 

1552 readonly_fields.append(field_name) 

1553 

1554 if ( 

1555 field_docstrings is not None 

1556 and field_info.description is None 

1557 and field_name in field_docstrings 

1558 ): 

1559 field_info.description = field_docstrings[field_name] 

1560 self._apply_field_title_generator_to_field_info(self._config_wrapper, field_info, field_name) 

1561 fields[field_name] = self._generate_td_field_schema( 

1562 field_name, field_info, decorators, required=required 

1563 ) 

1564 

1565 if readonly_fields: 

1566 fields_repr = ', '.join(repr(f) for f in readonly_fields) 

1567 plural = len(readonly_fields) >= 2 

1568 warnings.warn( 

1569 f'Item{"s" if plural else ""} {fields_repr} on TypedDict class {typed_dict_cls.__name__!r} ' 

1570 f'{"are" if plural else "is"} using the `ReadOnly` qualifier. Pydantic will not protect items ' 

1571 'from any mutation on dictionary instances.', 

1572 UserWarning, 

1573 ) 

1574 

1575 td_schema = core_schema.typed_dict_schema( 

1576 fields, 

1577 cls=typed_dict_cls, 

1578 computed_fields=[ 

1579 self._computed_field_schema(d, decorators.field_serializers) 

1580 for d in decorators.computed_fields.values() 

1581 ], 

1582 ref=typed_dict_ref, 

1583 config=core_config, 

1584 ) 

1585 

1586 schema = self._apply_model_serializers(td_schema, decorators.model_serializers.values()) 

1587 schema = apply_model_validators(schema, decorators.model_validators.values(), 'all') 

1588 return self.defs.create_definition_reference_schema(schema) 

1589 

1590 def _namedtuple_schema(self, namedtuple_cls: Any, origin: Any) -> core_schema.CoreSchema: 

1591 """Generate schema for a NamedTuple.""" 

1592 with ( 

1593 self.model_type_stack.push(namedtuple_cls), 

1594 self.defs.get_schema_or_ref(namedtuple_cls) as ( 

1595 namedtuple_ref, 

1596 maybe_schema, 

1597 ), 

1598 ): 

1599 if maybe_schema is not None: 

1600 return maybe_schema 

1601 typevars_map = get_standard_typevars_map(namedtuple_cls) 

1602 if origin is not None: 

1603 namedtuple_cls = origin 

1604 

1605 try: 

1606 annotations = _typing_extra.get_cls_type_hints(namedtuple_cls, ns_resolver=self._ns_resolver) 

1607 except NameError as e: 

1608 raise PydanticUndefinedAnnotation.from_name_error(e) from e 

1609 if not annotations: 

1610 # annotations is empty, happens if namedtuple_cls defined via collections.namedtuple(...) 

1611 annotations: dict[str, Any] = {k: Any for k in namedtuple_cls._fields} 

1612 

1613 if typevars_map: 

1614 annotations = { 

1615 field_name: replace_types(annotation, typevars_map) 

1616 for field_name, annotation in annotations.items() 

1617 } 

1618 

1619 arguments_schema = core_schema.arguments_schema( 

1620 [ 

1621 self._generate_parameter_schema( 

1622 field_name, 

1623 annotation, 

1624 source=AnnotationSource.NAMED_TUPLE, 

1625 default=namedtuple_cls._field_defaults.get(field_name, Parameter.empty), 

1626 ) 

1627 for field_name, annotation in annotations.items() 

1628 ], 

1629 metadata={'pydantic_js_prefer_positional_arguments': True}, 

1630 ) 

1631 schema = core_schema.call_schema(arguments_schema, namedtuple_cls, ref=namedtuple_ref) 

1632 return self.defs.create_definition_reference_schema(schema) 

1633 

1634 def _generate_parameter_schema( 

1635 self, 

1636 name: str, 

1637 annotation: type[Any], 

1638 source: AnnotationSource, 

1639 default: Any = Parameter.empty, 

1640 mode: Literal['positional_only', 'positional_or_keyword', 'keyword_only'] | None = None, 

1641 ) -> core_schema.ArgumentsParameter: 

1642 """Generate the definition of a field in a namedtuple or a parameter in a function signature. 

1643 

1644 This definition is meant to be used for the `'arguments'` core schema, which will be replaced 

1645 in V3 by the `'arguments-v3`'. 

1646 """ 

1647 FieldInfo = import_cached_field_info() 

1648 

1649 if default is Parameter.empty: 

1650 field = FieldInfo.from_annotation(annotation, _source=source) 

1651 else: 

1652 field = FieldInfo.from_annotated_attribute(annotation, default, _source=source) 

1653 assert field.annotation is not None, 'field.annotation should not be None when generating a schema' 

1654 with self.field_name_stack.push(name): 

1655 schema = self._apply_annotations(field.annotation, [field]) 

1656 

1657 if not field.is_required(): 

1658 schema = wrap_default(field, schema) 

1659 

1660 parameter_schema = core_schema.arguments_parameter(name, schema) 

1661 if mode is not None: 

1662 parameter_schema['mode'] = mode 

1663 if field.alias is not None: 

1664 parameter_schema['alias'] = field.alias 

1665 else: 

1666 alias_generator = self._config_wrapper.alias_generator 

1667 if isinstance(alias_generator, AliasGenerator) and alias_generator.alias is not None: 

1668 parameter_schema['alias'] = alias_generator.alias(name) 

1669 elif callable(alias_generator): 

1670 parameter_schema['alias'] = alias_generator(name) 

1671 return parameter_schema 

1672 

1673 def _generate_parameter_v3_schema( 

1674 self, 

1675 name: str, 

1676 annotation: Any, 

1677 source: AnnotationSource, 

1678 mode: Literal[ 

1679 'positional_only', 

1680 'positional_or_keyword', 

1681 'keyword_only', 

1682 'var_args', 

1683 'var_kwargs_uniform', 

1684 'var_kwargs_unpacked_typed_dict', 

1685 ], 

1686 default: Any = Parameter.empty, 

1687 ) -> core_schema.ArgumentsV3Parameter: 

1688 """Generate the definition of a parameter in a function signature. 

1689 

1690 This definition is meant to be used for the `'arguments-v3'` core schema, which will replace 

1691 the `'arguments`' schema in V3. 

1692 """ 

1693 FieldInfo = import_cached_field_info() 

1694 

1695 if default is Parameter.empty: 

1696 field = FieldInfo.from_annotation(annotation, _source=source) 

1697 else: 

1698 field = FieldInfo.from_annotated_attribute(annotation, default, _source=source) 

1699 

1700 with self.field_name_stack.push(name): 

1701 schema = self._apply_annotations(field.annotation, [field]) 

1702 

1703 if not field.is_required(): 

1704 schema = wrap_default(field, schema) 

1705 

1706 parameter_schema = core_schema.arguments_v3_parameter( 

1707 name=name, 

1708 schema=schema, 

1709 mode=mode, 

1710 ) 

1711 if field.alias is not None: 

1712 parameter_schema['alias'] = field.alias 

1713 else: 

1714 alias_generator = self._config_wrapper.alias_generator 

1715 if isinstance(alias_generator, AliasGenerator) and alias_generator.alias is not None: 

1716 parameter_schema['alias'] = alias_generator.alias(name) 

1717 elif callable(alias_generator): 

1718 parameter_schema['alias'] = alias_generator(name) 

1719 

1720 return parameter_schema 

1721 

1722 def _tuple_schema(self, tuple_type: Any) -> core_schema.CoreSchema: 

1723 """Generate schema for a Tuple, e.g. `tuple[int, str]` or `tuple[int, ...]`.""" 

1724 # TODO: do we really need to resolve type vars here? 

1725 typevars_map = get_standard_typevars_map(tuple_type) 

1726 params = self._get_args_resolving_forward_refs(tuple_type) 

1727 

1728 if typevars_map and params: 

1729 params = tuple(replace_types(param, typevars_map) for param in params) 

1730 

1731 # NOTE: subtle difference: `tuple[()]` gives `params=()`, whereas `typing.Tuple[()]` gives `params=((),)` 

1732 # This is only true for <3.11, on Python 3.11+ `typing.Tuple[()]` gives `params=()` 

1733 if not params: 

1734 if tuple_type in TUPLE_TYPES: 

1735 return core_schema.tuple_schema([core_schema.any_schema()], variadic_item_index=0) 

1736 else: 

1737 # special case for `tuple[()]` which means `tuple[]` - an empty tuple 

1738 return core_schema.tuple_schema([]) 

1739 elif params[-1] is Ellipsis: 

1740 if len(params) == 2: 

1741 return core_schema.tuple_schema([self.generate_schema(params[0])], variadic_item_index=0) 

1742 else: 

1743 # TODO: something like https://github.com/pydantic/pydantic/issues/5952 

1744 raise ValueError('Variable tuples can only have one type') 

1745 elif len(params) == 1 and params[0] == (): 

1746 # special case for `tuple[()]` which means `tuple[]` - an empty tuple 

1747 # NOTE: This conditional can be removed when we drop support for Python 3.10. 

1748 return core_schema.tuple_schema([]) 

1749 else: 

1750 return core_schema.tuple_schema([self.generate_schema(param) for param in params]) 

1751 

1752 def _type_schema(self) -> core_schema.CoreSchema: 

1753 return core_schema.custom_error_schema( 

1754 core_schema.is_instance_schema(type), 

1755 custom_error_type='is_type', 

1756 custom_error_message='Input should be a type', 

1757 ) 

1758 

1759 def _zoneinfo_schema(self) -> core_schema.CoreSchema: 

1760 """Generate schema for a zone_info.ZoneInfo object""" 

1761 from ._validators import validate_str_is_valid_iana_tz 

1762 

1763 metadata = {'pydantic_js_functions': [lambda _1, _2: {'type': 'string', 'format': 'zoneinfo'}]} 

1764 return core_schema.no_info_plain_validator_function( 

1765 validate_str_is_valid_iana_tz, 

1766 serialization=core_schema.to_string_ser_schema(), 

1767 metadata=metadata, 

1768 ) 

1769 

1770 def _union_is_subclass_schema(self, union_type: Any) -> core_schema.CoreSchema: 

1771 """Generate schema for `type[Union[X, ...]]`.""" 

1772 args = self._get_args_resolving_forward_refs(union_type, required=True) 

1773 return core_schema.union_schema([self.generate_schema(type[args]) for args in args]) 

1774 

1775 def _subclass_schema(self, type_: Any) -> core_schema.CoreSchema: 

1776 """Generate schema for a type, e.g. `type[int]`.""" 

1777 type_param = self._get_first_arg_or_any(type_) 

1778 

1779 # Assume `type[Annotated[<typ>, ...]]` is equivalent to `type[<typ>]`: 

1780 type_param = _typing_extra.annotated_type(type_param) or type_param 

1781 

1782 if typing_objects.is_any(type_param): 

1783 return self._type_schema() 

1784 elif typing_objects.is_typealiastype(type_param): 

1785 return self.generate_schema(type[type_param.__value__]) 

1786 elif typing_objects.is_typevar(type_param): 

1787 if type_param.__bound__: 

1788 if is_union_origin(get_origin(type_param.__bound__)): 

1789 return self._union_is_subclass_schema(type_param.__bound__) 

1790 return core_schema.is_subclass_schema(type_param.__bound__) 

1791 elif type_param.__constraints__: 

1792 return core_schema.union_schema([self.generate_schema(type[c]) for c in type_param.__constraints__]) 

1793 else: 

1794 return self._type_schema() 

1795 elif is_union_origin(get_origin(type_param)): 

1796 return self._union_is_subclass_schema(type_param) 

1797 else: 

1798 if typing_objects.is_self(type_param): 

1799 type_param = self._resolve_self_type(type_param) 

1800 if _typing_extra.is_generic_alias(type_param): 

1801 raise PydanticUserError( 

1802 'Subscripting `type[]` with an already parametrized type is not supported. ' 

1803 f'Instead of using type[{type_param!r}], use type[{_repr.display_as_type(get_origin(type_param))}].', 

1804 code=None, 

1805 ) 

1806 if not inspect.isclass(type_param): 

1807 # when using type[None], this doesn't type convert to type[NoneType], and None isn't a class 

1808 # so we handle it manually here 

1809 if type_param is None: 

1810 return core_schema.is_subclass_schema(_typing_extra.NoneType) 

1811 raise TypeError(f'Expected a class, got {type_param!r}') 

1812 return core_schema.is_subclass_schema(type_param) 

1813 

1814 def _sequence_schema(self, items_type: Any) -> core_schema.CoreSchema: 

1815 """Generate schema for a Sequence, e.g. `Sequence[int]`.""" 

1816 from ._serializers import serialize_sequence_via_list 

1817 

1818 item_type_schema = self.generate_schema(items_type) 

1819 list_schema = core_schema.list_schema(item_type_schema) 

1820 

1821 json_schema = smart_deepcopy(list_schema) 

1822 python_schema = core_schema.is_instance_schema(typing.Sequence, cls_repr='Sequence') 

1823 if not typing_objects.is_any(items_type): 

1824 from ._validators import sequence_validator 

1825 

1826 python_schema = core_schema.chain_schema( 

1827 [python_schema, core_schema.no_info_wrap_validator_function(sequence_validator, list_schema)], 

1828 ) 

1829 

1830 serialization = core_schema.wrap_serializer_function_ser_schema( 

1831 serialize_sequence_via_list, schema=item_type_schema, info_arg=True 

1832 ) 

1833 return core_schema.json_or_python_schema( 

1834 json_schema=json_schema, python_schema=python_schema, serialization=serialization 

1835 ) 

1836 

1837 def _iterable_schema(self, type_: Any) -> core_schema.GeneratorSchema: 

1838 """Generate a schema for an `Iterable`.""" 

1839 item_type = self._get_first_arg_or_any(type_) 

1840 

1841 return core_schema.generator_schema(self.generate_schema(item_type)) 

1842 

1843 def _pattern_schema(self, pattern_type: Any) -> core_schema.CoreSchema: 

1844 from . import _validators 

1845 

1846 metadata = {'pydantic_js_functions': [lambda _1, _2: {'type': 'string', 'format': 'regex'}]} 

1847 ser = core_schema.plain_serializer_function_ser_schema( 

1848 attrgetter('pattern'), when_used='json', return_schema=core_schema.str_schema() 

1849 ) 

1850 if pattern_type is typing.Pattern or pattern_type is re.Pattern: 

1851 # bare type 

1852 return core_schema.no_info_plain_validator_function( 

1853 _validators.pattern_either_validator, serialization=ser, metadata=metadata 

1854 ) 

1855 

1856 param = self._get_args_resolving_forward_refs( 

1857 pattern_type, 

1858 required=True, 

1859 )[0] 

1860 if param is str: 

1861 return core_schema.no_info_plain_validator_function( 

1862 _validators.pattern_str_validator, serialization=ser, metadata=metadata 

1863 ) 

1864 elif param is bytes: 

1865 return core_schema.no_info_plain_validator_function( 

1866 _validators.pattern_bytes_validator, serialization=ser, metadata=metadata 

1867 ) 

1868 else: 

1869 raise PydanticSchemaGenerationError(f'Unable to generate pydantic-core schema for {pattern_type!r}.') 

1870 

1871 def _hashable_schema(self) -> core_schema.CoreSchema: 

1872 return core_schema.custom_error_schema( 

1873 schema=core_schema.json_or_python_schema( 

1874 json_schema=core_schema.chain_schema( 

1875 [core_schema.any_schema(), core_schema.is_instance_schema(collections.abc.Hashable)] 

1876 ), 

1877 python_schema=core_schema.is_instance_schema(collections.abc.Hashable), 

1878 ), 

1879 custom_error_type='is_hashable', 

1880 custom_error_message='Input should be hashable', 

1881 ) 

1882 

1883 def _dataclass_schema( 

1884 self, dataclass: type[StandardDataclass], origin: type[StandardDataclass] | None 

1885 ) -> core_schema.CoreSchema: 

1886 """Generate schema for a dataclass.""" 

1887 with ( 

1888 self.model_type_stack.push(dataclass), 

1889 self.defs.get_schema_or_ref(dataclass) as ( 

1890 dataclass_ref, 

1891 maybe_schema, 

1892 ), 

1893 ): 

1894 if maybe_schema is not None: 

1895 return maybe_schema 

1896 

1897 schema = dataclass.__dict__.get('__pydantic_core_schema__') 

1898 if schema is not None and not isinstance(schema, MockCoreSchema): 

1899 if schema['type'] == 'definitions': 

1900 schema = self.defs.unpack_definitions(schema) 

1901 ref = get_ref(schema) 

1902 if ref: 

1903 return self.defs.create_definition_reference_schema(schema) 

1904 else: 

1905 return schema 

1906 

1907 typevars_map = get_standard_typevars_map(dataclass) 

1908 if origin is not None: 

1909 dataclass = origin 

1910 

1911 # if (plain) dataclass doesn't have config, we use the parent's config, hence a default of `None` 

1912 # (Pydantic dataclasses have an empty dict config by default). 

1913 # see https://github.com/pydantic/pydantic/issues/10917 

1914 config = getattr(dataclass, '__pydantic_config__', None) 

1915 

1916 from ..dataclasses import is_pydantic_dataclass 

1917 

1918 with self._ns_resolver.push(dataclass), self._config_wrapper_stack.push(config): 

1919 if is_pydantic_dataclass(dataclass): 

1920 if dataclass.__pydantic_fields_complete__(): 

1921 # Copy the field info instances to avoid mutating the `FieldInfo` instances 

1922 # of the generic dataclass generic origin (e.g. `apply_typevars_map` below). 

1923 # Note that we don't apply `deepcopy` on `__pydantic_fields__` because we 

1924 # don't want to copy the `FieldInfo` attributes: 

1925 fields = { 

1926 f_name: copy(field_info) for f_name, field_info in dataclass.__pydantic_fields__.items() 

1927 } 

1928 if typevars_map: 

1929 for field in fields.values(): 

1930 field.apply_typevars_map(typevars_map, *self._types_namespace) 

1931 else: 

1932 try: 

1933 fields = rebuild_dataclass_fields( 

1934 dataclass, 

1935 config_wrapper=self._config_wrapper, 

1936 ns_resolver=self._ns_resolver, 

1937 typevars_map=typevars_map or {}, 

1938 ) 

1939 except NameError as e: 

1940 raise PydanticUndefinedAnnotation.from_name_error(e) from e 

1941 else: 

1942 fields = collect_dataclass_fields( 

1943 dataclass, 

1944 typevars_map=typevars_map, 

1945 config_wrapper=self._config_wrapper, 

1946 ) 

1947 

1948 if self._config_wrapper.extra == 'allow': 

1949 # disallow combination of init=False on a dataclass field and extra='allow' on a dataclass 

1950 for field_name, field in fields.items(): 

1951 if field.init is False: 

1952 raise PydanticUserError( 

1953 f'Field {field_name} has `init=False` and dataclass has config setting `extra="allow"`. ' 

1954 f'This combination is not allowed.', 

1955 code='dataclass-init-false-extra-allow', 

1956 ) 

1957 

1958 decorators = dataclass.__dict__.get('__pydantic_decorators__') or DecoratorInfos.build(dataclass) 

1959 # Move kw_only=False args to the start of the list, as this is how vanilla dataclasses work. 

1960 # Note that when kw_only is missing or None, it is treated as equivalent to kw_only=True 

1961 args = sorted( 

1962 (self._generate_dc_field_schema(k, v, decorators) for k, v in fields.items()), 

1963 key=lambda a: a.get('kw_only') is not False, 

1964 ) 

1965 has_post_init = hasattr(dataclass, '__post_init__') 

1966 has_slots = hasattr(dataclass, '__slots__') 

1967 

1968 args_schema = core_schema.dataclass_args_schema( 

1969 dataclass.__name__, 

1970 args, 

1971 computed_fields=[ 

1972 self._computed_field_schema(d, decorators.field_serializers) 

1973 for d in decorators.computed_fields.values() 

1974 ], 

1975 collect_init_only=has_post_init, 

1976 ) 

1977 

1978 inner_schema = apply_validators(args_schema, decorators.root_validators.values(), None) 

1979 

1980 model_validators = decorators.model_validators.values() 

1981 inner_schema = apply_model_validators(inner_schema, model_validators, 'inner') 

1982 

1983 core_config = self._config_wrapper.core_config(title=dataclass.__name__) 

1984 

1985 dc_schema = core_schema.dataclass_schema( 

1986 dataclass, 

1987 inner_schema, 

1988 generic_origin=origin, 

1989 post_init=has_post_init, 

1990 ref=dataclass_ref, 

1991 fields=[field.name for field in dataclasses.fields(dataclass)], 

1992 slots=has_slots, 

1993 config=core_config, 

1994 # we don't use a custom __setattr__ for dataclasses, so we must 

1995 # pass along the frozen config setting to the pydantic-core schema 

1996 frozen=self._config_wrapper_stack.tail.frozen, 

1997 ) 

1998 schema = self._apply_model_serializers(dc_schema, decorators.model_serializers.values()) 

1999 schema = apply_model_validators(schema, model_validators, 'outer') 

2000 return self.defs.create_definition_reference_schema(schema) 

2001 

2002 def _call_schema(self, function: ValidateCallSupportedTypes) -> core_schema.CallSchema: 

2003 """Generate schema for a Callable. 

2004 

2005 TODO support functional validators once we support them in Config 

2006 """ 

2007 arguments_schema = self._arguments_schema(function) 

2008 

2009 return_schema: core_schema.CoreSchema | None = None 

2010 config_wrapper = self._config_wrapper 

2011 if config_wrapper.validate_return: 

2012 sig = signature(function) 

2013 return_hint = sig.return_annotation 

2014 if return_hint is not sig.empty: 

2015 globalns, localns = self._types_namespace 

2016 type_hints = _typing_extra.get_function_type_hints( 

2017 function, globalns=globalns, localns=localns, include_keys={'return'} 

2018 ) 

2019 return_schema = self.generate_schema(type_hints['return']) 

2020 

2021 return core_schema.call_schema( 

2022 arguments_schema, 

2023 function, 

2024 return_schema=return_schema, 

2025 ) 

2026 

2027 def _arguments_schema( 

2028 self, function: ValidateCallSupportedTypes, parameters_callback: ParametersCallback | None = None 

2029 ) -> core_schema.ArgumentsSchema: 

2030 """Generate schema for a Signature.""" 

2031 mode_lookup: dict[_ParameterKind, Literal['positional_only', 'positional_or_keyword', 'keyword_only']] = { 

2032 Parameter.POSITIONAL_ONLY: 'positional_only', 

2033 Parameter.POSITIONAL_OR_KEYWORD: 'positional_or_keyword', 

2034 Parameter.KEYWORD_ONLY: 'keyword_only', 

2035 } 

2036 

2037 sig = signature(function) 

2038 globalns, localns = self._types_namespace 

2039 type_hints = _typing_extra.get_function_type_hints(function, globalns=globalns, localns=localns) 

2040 

2041 arguments_list: list[core_schema.ArgumentsParameter] = [] 

2042 var_args_schema: core_schema.CoreSchema | None = None 

2043 var_kwargs_schema: core_schema.CoreSchema | None = None 

2044 var_kwargs_mode: core_schema.VarKwargsMode | None = None 

2045 

2046 for i, (name, p) in enumerate(sig.parameters.items()): 

2047 if p.annotation is sig.empty: 

2048 annotation = typing.cast(Any, Any) 

2049 else: 

2050 annotation = type_hints[name] 

2051 

2052 if parameters_callback is not None: 

2053 result = parameters_callback(i, name, annotation) 

2054 if result == 'skip': 

2055 continue 

2056 

2057 parameter_mode = mode_lookup.get(p.kind) 

2058 if parameter_mode is not None: 

2059 arg_schema = self._generate_parameter_schema( 

2060 name, annotation, AnnotationSource.FUNCTION, p.default, parameter_mode 

2061 ) 

2062 arguments_list.append(arg_schema) 

2063 elif p.kind == Parameter.VAR_POSITIONAL: 

2064 var_args_schema = self.generate_schema(annotation) 

2065 else: 

2066 assert p.kind == Parameter.VAR_KEYWORD, p.kind 

2067 

2068 unpack_type = _typing_extra.unpack_type(annotation) 

2069 if unpack_type is not None: 

2070 origin = get_origin(unpack_type) or unpack_type 

2071 if not is_typeddict(origin): 

2072 raise PydanticUserError( 

2073 f'Expected a `TypedDict` class inside `Unpack[...]`, got {unpack_type!r}', 

2074 code='unpack-typed-dict', 

2075 ) 

2076 non_pos_only_param_names = { 

2077 name for name, p in sig.parameters.items() if p.kind != Parameter.POSITIONAL_ONLY 

2078 } 

2079 overlapping_params = non_pos_only_param_names.intersection(origin.__annotations__) 

2080 if overlapping_params: 

2081 raise PydanticUserError( 

2082 f'Typed dictionary {origin.__name__!r} overlaps with parameter' 

2083 f'{"s" if len(overlapping_params) >= 2 else ""} ' 

2084 f'{", ".join(repr(p) for p in sorted(overlapping_params))}', 

2085 code='overlapping-unpack-typed-dict', 

2086 ) 

2087 

2088 var_kwargs_mode = 'unpacked-typed-dict' 

2089 var_kwargs_schema = self._typed_dict_schema(unpack_type, get_origin(unpack_type)) 

2090 else: 

2091 var_kwargs_mode = 'uniform' 

2092 var_kwargs_schema = self.generate_schema(annotation) 

2093 

2094 return core_schema.arguments_schema( 

2095 arguments_list, 

2096 var_args_schema=var_args_schema, 

2097 var_kwargs_mode=var_kwargs_mode, 

2098 var_kwargs_schema=var_kwargs_schema, 

2099 validate_by_name=self._config_wrapper.validate_by_name, 

2100 ) 

2101 

2102 def _arguments_v3_schema( 

2103 self, function: ValidateCallSupportedTypes, parameters_callback: ParametersCallback | None = None 

2104 ) -> core_schema.ArgumentsV3Schema: 

2105 mode_lookup: dict[ 

2106 _ParameterKind, Literal['positional_only', 'positional_or_keyword', 'var_args', 'keyword_only'] 

2107 ] = { 

2108 Parameter.POSITIONAL_ONLY: 'positional_only', 

2109 Parameter.POSITIONAL_OR_KEYWORD: 'positional_or_keyword', 

2110 Parameter.VAR_POSITIONAL: 'var_args', 

2111 Parameter.KEYWORD_ONLY: 'keyword_only', 

2112 } 

2113 

2114 sig = signature(function) 

2115 globalns, localns = self._types_namespace 

2116 type_hints = _typing_extra.get_function_type_hints(function, globalns=globalns, localns=localns) 

2117 

2118 parameters_list: list[core_schema.ArgumentsV3Parameter] = [] 

2119 

2120 for i, (name, p) in enumerate(sig.parameters.items()): 

2121 if parameters_callback is not None: 

2122 result = parameters_callback(i, name, p.annotation) 

2123 if result == 'skip': 

2124 continue 

2125 

2126 if p.annotation is Parameter.empty: 

2127 annotation = typing.cast(Any, Any) 

2128 else: 

2129 annotation = type_hints[name] 

2130 

2131 parameter_mode = mode_lookup.get(p.kind) 

2132 if parameter_mode is None: 

2133 assert p.kind == Parameter.VAR_KEYWORD, p.kind 

2134 

2135 unpack_type = _typing_extra.unpack_type(annotation) 

2136 if unpack_type is not None: 

2137 origin = get_origin(unpack_type) or unpack_type 

2138 if not is_typeddict(origin): 

2139 raise PydanticUserError( 

2140 f'Expected a `TypedDict` class inside `Unpack[...]`, got {unpack_type!r}', 

2141 code='unpack-typed-dict', 

2142 ) 

2143 non_pos_only_param_names = { 

2144 name for name, p in sig.parameters.items() if p.kind != Parameter.POSITIONAL_ONLY 

2145 } 

2146 overlapping_params = non_pos_only_param_names.intersection(origin.__annotations__) 

2147 if overlapping_params: 

2148 raise PydanticUserError( 

2149 f'Typed dictionary {origin.__name__!r} overlaps with parameter' 

2150 f'{"s" if len(overlapping_params) >= 2 else ""} ' 

2151 f'{", ".join(repr(p) for p in sorted(overlapping_params))}', 

2152 code='overlapping-unpack-typed-dict', 

2153 ) 

2154 parameter_mode = 'var_kwargs_unpacked_typed_dict' 

2155 annotation = unpack_type 

2156 else: 

2157 parameter_mode = 'var_kwargs_uniform' 

2158 

2159 parameters_list.append( 

2160 self._generate_parameter_v3_schema( 

2161 name, annotation, AnnotationSource.FUNCTION, parameter_mode, default=p.default 

2162 ) 

2163 ) 

2164 

2165 return core_schema.arguments_v3_schema( 

2166 parameters_list, 

2167 validate_by_name=self._config_wrapper.validate_by_name, 

2168 ) 

2169 

2170 def _unsubstituted_typevar_schema(self, typevar: typing.TypeVar) -> core_schema.CoreSchema: 

2171 try: 

2172 has_default = typevar.has_default() 

2173 except AttributeError: 

2174 # Happens if using `typing.TypeVar` (and not `typing_extensions`) on Python < 3.13 

2175 pass 

2176 else: 

2177 if has_default: 

2178 return self.generate_schema(typevar.__default__) 

2179 

2180 if constraints := typevar.__constraints__: 

2181 return self._union_schema(typing.Union[constraints]) 

2182 

2183 if bound := typevar.__bound__: 

2184 schema = self.generate_schema(bound) 

2185 schema['serialization'] = core_schema.wrap_serializer_function_ser_schema( 

2186 lambda x, h: h(x), 

2187 schema=core_schema.any_schema(), 

2188 ) 

2189 return schema 

2190 

2191 return core_schema.any_schema() 

2192 

2193 def _computed_field_schema( 

2194 self, 

2195 d: Decorator[ComputedFieldInfo], 

2196 field_serializers: dict[str, Decorator[FieldSerializerDecoratorInfo]], 

2197 ) -> core_schema.ComputedField: 

2198 if d.info.return_type is not PydanticUndefined: 

2199 return_type = d.info.return_type 

2200 else: 

2201 try: 

2202 # Do not pass in globals as the function could be defined in a different module. 

2203 # Instead, let `get_callable_return_type` infer the globals to use, but still pass 

2204 # in locals that may contain a parent/rebuild namespace: 

2205 return_type = _decorators.get_callable_return_type(d.func, localns=self._types_namespace.locals) 

2206 except NameError as e: 

2207 raise PydanticUndefinedAnnotation.from_name_error(e) from e 

2208 if return_type is PydanticUndefined: 

2209 raise PydanticUserError( 

2210 'Computed field is missing return type annotation or specifying `return_type`' 

2211 ' to the `@computed_field` decorator (e.g. `@computed_field(return_type=int | str)`)', 

2212 code='model-field-missing-annotation', 

2213 ) 

2214 

2215 return_type = replace_types(return_type, self._typevars_map) 

2216 # Create a new ComputedFieldInfo so that different type parametrizations of the same 

2217 # generic model's computed field can have different return types. 

2218 d.info = dataclasses.replace(d.info, return_type=return_type) 

2219 return_type_schema = self.generate_schema(return_type) 

2220 # Apply serializers to computed field if there exist 

2221 return_type_schema = self._apply_field_serializers( 

2222 return_type_schema, 

2223 filter_field_decorator_info_by_field(field_serializers.values(), d.cls_var_name), 

2224 ) 

2225 

2226 alias_generator = self._config_wrapper.alias_generator 

2227 if alias_generator is not None: 

2228 self._apply_alias_generator_to_computed_field_info( 

2229 alias_generator=alias_generator, computed_field_info=d.info, computed_field_name=d.cls_var_name 

2230 ) 

2231 self._apply_field_title_generator_to_field_info(self._config_wrapper, d.info, d.cls_var_name) 

2232 

2233 pydantic_js_updates, pydantic_js_extra = _extract_json_schema_info_from_field_info(d.info) 

2234 core_metadata: dict[str, Any] = {} 

2235 update_core_metadata( 

2236 core_metadata, 

2237 pydantic_js_updates={'readOnly': True, **(pydantic_js_updates if pydantic_js_updates else {})}, 

2238 pydantic_js_extra=pydantic_js_extra, 

2239 ) 

2240 return core_schema.computed_field( 

2241 d.cls_var_name, return_schema=return_type_schema, alias=d.info.alias, metadata=core_metadata 

2242 ) 

2243 

2244 def _annotated_schema(self, annotated_type: Any) -> core_schema.CoreSchema: 

2245 """Generate schema for an Annotated type, e.g. `Annotated[int, Field(...)]` or `Annotated[int, Gt(0)]`.""" 

2246 FieldInfo = import_cached_field_info() 

2247 source_type, *annotations = self._get_args_resolving_forward_refs( 

2248 annotated_type, 

2249 required=True, 

2250 ) 

2251 schema = self._apply_annotations(source_type, annotations) 

2252 # put the default validator last so that TypeAdapter.get_default_value() works 

2253 # even if there are function validators involved 

2254 for annotation in annotations: 

2255 if isinstance(annotation, FieldInfo): 

2256 schema = wrap_default(annotation, schema) 

2257 return schema 

2258 

2259 def _apply_annotations( 

2260 self, 

2261 source_type: Any, 

2262 annotations: list[Any], 

2263 transform_inner_schema: Callable[[CoreSchema], CoreSchema] = lambda x: x, 

2264 ) -> CoreSchema: 

2265 """Apply arguments from `Annotated` or from `FieldInfo` to a schema. 

2266 

2267 This gets called by `GenerateSchema._annotated_schema` but differs from it in that it does 

2268 not expect `source_type` to be an `Annotated` object, it expects it to be the first argument of that 

2269 (in other words, `GenerateSchema._annotated_schema` just unpacks `Annotated`, this process it). 

2270 """ 

2271 annotations = list(_known_annotated_metadata.expand_grouped_metadata(annotations)) 

2272 

2273 pydantic_js_annotation_functions: list[GetJsonSchemaFunction] = [] 

2274 

2275 def inner_handler(obj: Any) -> CoreSchema: 

2276 schema = self._generate_schema_from_get_schema_method(obj, source_type) 

2277 

2278 if schema is None: 

2279 schema = self._generate_schema_inner(obj) 

2280 

2281 metadata_js_function = _extract_get_pydantic_json_schema(obj) 

2282 if metadata_js_function is not None: 

2283 metadata_schema = resolve_original_schema(schema, self.defs) 

2284 if metadata_schema is not None: 

2285 self._add_js_function(metadata_schema, metadata_js_function) 

2286 return transform_inner_schema(schema) 

2287 

2288 get_inner_schema = CallbackGetCoreSchemaHandler(inner_handler, self) 

2289 

2290 for annotation in annotations: 

2291 if annotation is None: 

2292 continue 

2293 get_inner_schema = self._get_wrapped_inner_schema( 

2294 get_inner_schema, annotation, pydantic_js_annotation_functions 

2295 ) 

2296 

2297 schema = get_inner_schema(source_type) 

2298 if pydantic_js_annotation_functions: 

2299 core_metadata = schema.setdefault('metadata', {}) 

2300 update_core_metadata(core_metadata, pydantic_js_annotation_functions=pydantic_js_annotation_functions) 

2301 return _add_custom_serialization_from_json_encoders(self._config_wrapper.json_encoders, source_type, schema) 

2302 

2303 def _apply_single_annotation(self, schema: core_schema.CoreSchema, metadata: Any) -> core_schema.CoreSchema: 

2304 FieldInfo = import_cached_field_info() 

2305 

2306 if isinstance(metadata, FieldInfo): 

2307 for field_metadata in metadata.metadata: 

2308 schema = self._apply_single_annotation(schema, field_metadata) 

2309 

2310 if metadata.discriminator is not None: 

2311 schema = self._apply_discriminator_to_union(schema, metadata.discriminator) 

2312 return schema 

2313 

2314 if schema['type'] == 'nullable': 

2315 # for nullable schemas, metadata is automatically applied to the inner schema 

2316 inner = schema.get('schema', core_schema.any_schema()) 

2317 inner = self._apply_single_annotation(inner, metadata) 

2318 if inner: 

2319 schema['schema'] = inner 

2320 return schema 

2321 

2322 original_schema = schema 

2323 ref = schema.get('ref') 

2324 if ref is not None: 

2325 schema = schema.copy() 

2326 new_ref = ref + f'_{repr(metadata)}' 

2327 if (existing := self.defs.get_schema_from_ref(new_ref)) is not None: 

2328 return existing 

2329 schema['ref'] = new_ref # pyright: ignore[reportGeneralTypeIssues] 

2330 elif schema['type'] == 'definition-ref': 

2331 ref = schema['schema_ref'] 

2332 if (referenced_schema := self.defs.get_schema_from_ref(ref)) is not None: 

2333 schema = referenced_schema.copy() 

2334 new_ref = ref + f'_{repr(metadata)}' 

2335 if (existing := self.defs.get_schema_from_ref(new_ref)) is not None: 

2336 return existing 

2337 schema['ref'] = new_ref # pyright: ignore[reportGeneralTypeIssues] 

2338 

2339 maybe_updated_schema = _known_annotated_metadata.apply_known_metadata(metadata, schema) 

2340 

2341 if maybe_updated_schema is not None: 

2342 return maybe_updated_schema 

2343 return original_schema 

2344 

2345 def _apply_single_annotation_json_schema( 

2346 self, schema: core_schema.CoreSchema, metadata: Any 

2347 ) -> core_schema.CoreSchema: 

2348 FieldInfo = import_cached_field_info() 

2349 

2350 if isinstance(metadata, FieldInfo): 

2351 for field_metadata in metadata.metadata: 

2352 schema = self._apply_single_annotation_json_schema(schema, field_metadata) 

2353 

2354 pydantic_js_updates, pydantic_js_extra = _extract_json_schema_info_from_field_info(metadata) 

2355 core_metadata = schema.setdefault('metadata', {}) 

2356 update_core_metadata( 

2357 core_metadata, pydantic_js_updates=pydantic_js_updates, pydantic_js_extra=pydantic_js_extra 

2358 ) 

2359 return schema 

2360 

2361 def _get_wrapped_inner_schema( 

2362 self, 

2363 get_inner_schema: GetCoreSchemaHandler, 

2364 annotation: Any, 

2365 pydantic_js_annotation_functions: list[GetJsonSchemaFunction], 

2366 ) -> CallbackGetCoreSchemaHandler: 

2367 annotation_get_schema: GetCoreSchemaFunction | None = getattr(annotation, '__get_pydantic_core_schema__', None) 

2368 

2369 def new_handler(source: Any) -> core_schema.CoreSchema: 

2370 if annotation_get_schema is not None: 

2371 schema = annotation_get_schema(source, get_inner_schema) 

2372 else: 

2373 schema = get_inner_schema(source) 

2374 schema = self._apply_single_annotation(schema, annotation) 

2375 schema = self._apply_single_annotation_json_schema(schema, annotation) 

2376 

2377 metadata_js_function = _extract_get_pydantic_json_schema(annotation) 

2378 if metadata_js_function is not None: 

2379 pydantic_js_annotation_functions.append(metadata_js_function) 

2380 return schema 

2381 

2382 return CallbackGetCoreSchemaHandler(new_handler, self) 

2383 

2384 def _apply_field_serializers( 

2385 self, 

2386 schema: core_schema.CoreSchema, 

2387 serializers: list[Decorator[FieldSerializerDecoratorInfo]], 

2388 ) -> core_schema.CoreSchema: 

2389 """Apply field serializers to a schema.""" 

2390 if serializers: 

2391 schema = copy(schema) 

2392 if schema['type'] == 'definitions': 

2393 inner_schema = schema['schema'] 

2394 schema['schema'] = self._apply_field_serializers(inner_schema, serializers) 

2395 return schema 

2396 elif 'ref' in schema: 

2397 schema = self.defs.create_definition_reference_schema(schema) 

2398 

2399 # use the last serializer to make it easy to override a serializer set on a parent model 

2400 serializer = serializers[-1] 

2401 is_field_serializer, info_arg = inspect_field_serializer(serializer.func, serializer.info.mode) 

2402 

2403 if serializer.info.return_type is not PydanticUndefined: 

2404 return_type = serializer.info.return_type 

2405 else: 

2406 try: 

2407 # Do not pass in globals as the function could be defined in a different module. 

2408 # Instead, let `get_callable_return_type` infer the globals to use, but still pass 

2409 # in locals that may contain a parent/rebuild namespace: 

2410 return_type = _decorators.get_callable_return_type( 

2411 serializer.func, localns=self._types_namespace.locals 

2412 ) 

2413 except NameError as e: 

2414 raise PydanticUndefinedAnnotation.from_name_error(e) from e 

2415 

2416 if return_type is PydanticUndefined: 

2417 return_schema = None 

2418 else: 

2419 return_schema = self.generate_schema(return_type) 

2420 

2421 if serializer.info.mode == 'wrap': 

2422 schema['serialization'] = core_schema.wrap_serializer_function_ser_schema( 

2423 serializer.func, 

2424 is_field_serializer=is_field_serializer, 

2425 info_arg=info_arg, 

2426 return_schema=return_schema, 

2427 when_used=serializer.info.when_used, 

2428 ) 

2429 else: 

2430 assert serializer.info.mode == 'plain' 

2431 schema['serialization'] = core_schema.plain_serializer_function_ser_schema( 

2432 serializer.func, 

2433 is_field_serializer=is_field_serializer, 

2434 info_arg=info_arg, 

2435 return_schema=return_schema, 

2436 when_used=serializer.info.when_used, 

2437 ) 

2438 return schema 

2439 

2440 def _apply_model_serializers( 

2441 self, schema: core_schema.CoreSchema, serializers: Iterable[Decorator[ModelSerializerDecoratorInfo]] 

2442 ) -> core_schema.CoreSchema: 

2443 """Apply model serializers to a schema.""" 

2444 ref: str | None = schema.pop('ref', None) # type: ignore 

2445 if serializers: 

2446 serializer = list(serializers)[-1] 

2447 info_arg = inspect_model_serializer(serializer.func, serializer.info.mode) 

2448 

2449 if serializer.info.return_type is not PydanticUndefined: 

2450 return_type = serializer.info.return_type 

2451 else: 

2452 try: 

2453 # Do not pass in globals as the function could be defined in a different module. 

2454 # Instead, let `get_callable_return_type` infer the globals to use, but still pass 

2455 # in locals that may contain a parent/rebuild namespace: 

2456 return_type = _decorators.get_callable_return_type( 

2457 serializer.func, localns=self._types_namespace.locals 

2458 ) 

2459 except NameError as e: 

2460 raise PydanticUndefinedAnnotation.from_name_error(e) from e 

2461 

2462 if return_type is PydanticUndefined: 

2463 return_schema = None 

2464 else: 

2465 return_schema = self.generate_schema(return_type) 

2466 

2467 if serializer.info.mode == 'wrap': 

2468 ser_schema: core_schema.SerSchema = core_schema.wrap_serializer_function_ser_schema( 

2469 serializer.func, 

2470 info_arg=info_arg, 

2471 return_schema=return_schema, 

2472 when_used=serializer.info.when_used, 

2473 ) 

2474 else: 

2475 # plain 

2476 ser_schema = core_schema.plain_serializer_function_ser_schema( 

2477 serializer.func, 

2478 info_arg=info_arg, 

2479 return_schema=return_schema, 

2480 when_used=serializer.info.when_used, 

2481 ) 

2482 schema['serialization'] = ser_schema 

2483 if ref: 

2484 schema['ref'] = ref # type: ignore 

2485 return schema 

2486 

2487 

2488_VALIDATOR_F_MATCH: Mapping[ 

2489 tuple[FieldValidatorModes, Literal['no-info', 'with-info']], 

2490 Callable[[Callable[..., Any], core_schema.CoreSchema, str | None], core_schema.CoreSchema], 

2491] = { 

2492 ('before', 'no-info'): lambda f, schema, _: core_schema.no_info_before_validator_function(f, schema), 

2493 ('after', 'no-info'): lambda f, schema, _: core_schema.no_info_after_validator_function(f, schema), 

2494 ('plain', 'no-info'): lambda f, _1, _2: core_schema.no_info_plain_validator_function(f), 

2495 ('wrap', 'no-info'): lambda f, schema, _: core_schema.no_info_wrap_validator_function(f, schema), 

2496 ('before', 'with-info'): lambda f, schema, field_name: core_schema.with_info_before_validator_function( 

2497 f, schema, field_name=field_name 

2498 ), 

2499 ('after', 'with-info'): lambda f, schema, field_name: core_schema.with_info_after_validator_function( 

2500 f, schema, field_name=field_name 

2501 ), 

2502 ('plain', 'with-info'): lambda f, _, field_name: core_schema.with_info_plain_validator_function( 

2503 f, field_name=field_name 

2504 ), 

2505 ('wrap', 'with-info'): lambda f, schema, field_name: core_schema.with_info_wrap_validator_function( 

2506 f, schema, field_name=field_name 

2507 ), 

2508} 

2509 

2510 

2511# TODO V3: this function is only used for deprecated decorators. It should 

2512# be removed once we drop support for those. 

2513def apply_validators( 

2514 schema: core_schema.CoreSchema, 

2515 validators: Iterable[Decorator[RootValidatorDecoratorInfo]] 

2516 | Iterable[Decorator[ValidatorDecoratorInfo]] 

2517 | Iterable[Decorator[FieldValidatorDecoratorInfo]], 

2518 field_name: str | None, 

2519) -> core_schema.CoreSchema: 

2520 """Apply validators to a schema. 

2521 

2522 Args: 

2523 schema: The schema to apply validators on. 

2524 validators: An iterable of validators. 

2525 field_name: The name of the field if validators are being applied to a model field. 

2526 

2527 Returns: 

2528 The updated schema. 

2529 """ 

2530 for validator in validators: 

2531 info_arg = inspect_validator(validator.func, validator.info.mode) 

2532 val_type = 'with-info' if info_arg else 'no-info' 

2533 

2534 schema = _VALIDATOR_F_MATCH[(validator.info.mode, val_type)](validator.func, schema, field_name) 

2535 return schema 

2536 

2537 

2538def _validators_require_validate_default(validators: Iterable[Decorator[ValidatorDecoratorInfo]]) -> bool: 

2539 """In v1, if any of the validators for a field had `always=True`, the default value would be validated. 

2540 

2541 This serves as an auxiliary function for re-implementing that logic, by looping over a provided 

2542 collection of (v1-style) ValidatorDecoratorInfo's and checking if any of them have `always=True`. 

2543 

2544 We should be able to drop this function and the associated logic calling it once we drop support 

2545 for v1-style validator decorators. (Or we can extend it and keep it if we add something equivalent 

2546 to the v1-validator `always` kwarg to `field_validator`.) 

2547 """ 

2548 for validator in validators: 

2549 if validator.info.always: 

2550 return True 

2551 return False 

2552 

2553 

2554def apply_model_validators( 

2555 schema: core_schema.CoreSchema, 

2556 validators: Iterable[Decorator[ModelValidatorDecoratorInfo]], 

2557 mode: Literal['inner', 'outer', 'all'], 

2558) -> core_schema.CoreSchema: 

2559 """Apply model validators to a schema. 

2560 

2561 If mode == 'inner', only "before" validators are applied 

2562 If mode == 'outer', validators other than "before" are applied 

2563 If mode == 'all', all validators are applied 

2564 

2565 Args: 

2566 schema: The schema to apply validators on. 

2567 validators: An iterable of validators. 

2568 mode: The validator mode. 

2569 

2570 Returns: 

2571 The updated schema. 

2572 """ 

2573 ref: str | None = schema.pop('ref', None) # type: ignore 

2574 for validator in validators: 

2575 if mode == 'inner' and validator.info.mode != 'before': 

2576 continue 

2577 if mode == 'outer' and validator.info.mode == 'before': 

2578 continue 

2579 info_arg = inspect_validator(validator.func, validator.info.mode) 

2580 if validator.info.mode == 'wrap': 

2581 if info_arg: 

2582 schema = core_schema.with_info_wrap_validator_function(function=validator.func, schema=schema) 

2583 else: 

2584 schema = core_schema.no_info_wrap_validator_function(function=validator.func, schema=schema) 

2585 elif validator.info.mode == 'before': 

2586 if info_arg: 

2587 schema = core_schema.with_info_before_validator_function(function=validator.func, schema=schema) 

2588 else: 

2589 schema = core_schema.no_info_before_validator_function(function=validator.func, schema=schema) 

2590 else: 

2591 assert validator.info.mode == 'after' 

2592 if info_arg: 

2593 schema = core_schema.with_info_after_validator_function(function=validator.func, schema=schema) 

2594 else: 

2595 schema = core_schema.no_info_after_validator_function(function=validator.func, schema=schema) 

2596 if ref: 

2597 schema['ref'] = ref # type: ignore 

2598 return schema 

2599 

2600 

2601def wrap_default(field_info: FieldInfo, schema: core_schema.CoreSchema) -> core_schema.CoreSchema: 

2602 """Wrap schema with default schema if default value or `default_factory` are available. 

2603 

2604 Args: 

2605 field_info: The field info object. 

2606 schema: The schema to apply default on. 

2607 

2608 Returns: 

2609 Updated schema by default value or `default_factory`. 

2610 """ 

2611 if field_info.default_factory: 

2612 return core_schema.with_default_schema( 

2613 schema, 

2614 default_factory=field_info.default_factory, 

2615 default_factory_takes_data=takes_validated_data_argument(field_info.default_factory), 

2616 validate_default=field_info.validate_default, 

2617 ) 

2618 elif field_info.default is not PydanticUndefined: 

2619 return core_schema.with_default_schema( 

2620 schema, default=field_info.default, validate_default=field_info.validate_default 

2621 ) 

2622 else: 

2623 return schema 

2624 

2625 

2626def _extract_get_pydantic_json_schema(tp: Any) -> GetJsonSchemaFunction | None: 

2627 """Extract `__get_pydantic_json_schema__` from a type, handling the deprecated `__modify_schema__`.""" 

2628 js_modify_function = getattr(tp, '__get_pydantic_json_schema__', None) 

2629 

2630 if hasattr(tp, '__modify_schema__'): 

2631 BaseModel = import_cached_base_model() 

2632 

2633 has_custom_v2_modify_js_func = ( 

2634 js_modify_function is not None 

2635 and BaseModel.__get_pydantic_json_schema__.__func__ # type: ignore 

2636 not in (js_modify_function, getattr(js_modify_function, '__func__', None)) 

2637 ) 

2638 

2639 if not has_custom_v2_modify_js_func: 

2640 cls_name = getattr(tp, '__name__', None) 

2641 raise PydanticUserError( 

2642 f'The `__modify_schema__` method is not supported in Pydantic v2. ' 

2643 f'Use `__get_pydantic_json_schema__` instead{f" in class `{cls_name}`" if cls_name else ""}.', 

2644 code='custom-json-schema', 

2645 ) 

2646 

2647 if (origin := get_origin(tp)) is not None: 

2648 # Generic aliases proxy attribute access to the origin, *except* dunder attributes, 

2649 # such as `__get_pydantic_json_schema__`, hence the explicit check. 

2650 return _extract_get_pydantic_json_schema(origin) 

2651 

2652 if js_modify_function is None: 

2653 return None 

2654 

2655 return js_modify_function 

2656 

2657 

2658class _CommonField(TypedDict): 

2659 schema: core_schema.CoreSchema 

2660 validation_alias: str | list[str | int] | list[list[str | int]] | None 

2661 serialization_alias: str | None 

2662 serialization_exclude: bool | None 

2663 frozen: bool | None 

2664 metadata: dict[str, Any] 

2665 

2666 

2667def _common_field( 

2668 schema: core_schema.CoreSchema, 

2669 *, 

2670 validation_alias: str | list[str | int] | list[list[str | int]] | None = None, 

2671 serialization_alias: str | None = None, 

2672 serialization_exclude: bool | None = None, 

2673 frozen: bool | None = None, 

2674 metadata: Any = None, 

2675) -> _CommonField: 

2676 return { 

2677 'schema': schema, 

2678 'validation_alias': validation_alias, 

2679 'serialization_alias': serialization_alias, 

2680 'serialization_exclude': serialization_exclude, 

2681 'frozen': frozen, 

2682 'metadata': metadata, 

2683 } 

2684 

2685 

2686def resolve_original_schema(schema: CoreSchema, definitions: _Definitions) -> CoreSchema | None: 

2687 if schema['type'] == 'definition-ref': 

2688 return definitions.get_schema_from_ref(schema['schema_ref']) 

2689 elif schema['type'] == 'definitions': 

2690 return schema['schema'] 

2691 else: 

2692 return schema 

2693 

2694 

2695def _inlining_behavior( 

2696 def_ref: core_schema.DefinitionReferenceSchema, 

2697) -> Literal['inline', 'keep', 'preserve_metadata']: 

2698 """Determine the inlining behavior of the `'definition-ref'` schema. 

2699 

2700 - If no `'serialization'` schema and no metadata is attached, the schema can safely be inlined. 

2701 - If it has metadata but only related to the deferred discriminator application, it can be inlined 

2702 provided that such metadata is kept. 

2703 - Otherwise, the schema should not be inlined. Doing so would remove the `'serialization'` schema or metadata. 

2704 """ 

2705 if 'serialization' in def_ref: 

2706 return 'keep' 

2707 metadata = def_ref.get('metadata') 

2708 if not metadata: 

2709 return 'inline' 

2710 if len(metadata) == 1 and 'pydantic_internal_union_discriminator' in metadata: 

2711 return 'preserve_metadata' 

2712 return 'keep' 

2713 

2714 

2715class _Definitions: 

2716 """Keeps track of references and definitions.""" 

2717 

2718 _recursively_seen: set[str] 

2719 """A set of recursively seen references. 

2720 

2721 When a referenceable type is encountered, the `get_schema_or_ref` context manager is 

2722 entered to compute the reference. If the type references itself by some way (e.g. for 

2723 a dataclass a Pydantic model, the class can be referenced as a field annotation), 

2724 entering the context manager again will yield a `'definition-ref'` schema that should 

2725 short-circuit the normal generation process, as the reference was already in this set. 

2726 """ 

2727 

2728 _definitions: dict[str, core_schema.CoreSchema] 

2729 """A mapping of references to their corresponding schema. 

2730 

2731 When a schema for a referenceable type is generated, it is stored in this mapping. If the 

2732 same type is encountered again, the reference is yielded by the `get_schema_or_ref` context 

2733 manager. 

2734 """ 

2735 

2736 def __init__(self) -> None: 

2737 self._recursively_seen = set() 

2738 self._definitions = {} 

2739 

2740 @contextmanager 

2741 def get_schema_or_ref(self, tp: Any, /) -> Generator[tuple[str, core_schema.DefinitionReferenceSchema | None]]: 

2742 """Get a definition for `tp` if one exists. 

2743 

2744 If a definition exists, a tuple of `(ref_string, CoreSchema)` is returned. 

2745 If no definition exists yet, a tuple of `(ref_string, None)` is returned. 

2746 

2747 Note that the returned `CoreSchema` will always be a `DefinitionReferenceSchema`, 

2748 not the actual definition itself. 

2749 

2750 This should be called for any type that can be identified by reference. 

2751 This includes any recursive types. 

2752 

2753 At present the following types can be named/recursive: 

2754 

2755 - Pydantic model 

2756 - Pydantic and stdlib dataclasses 

2757 - Typed dictionaries 

2758 - Named tuples 

2759 - `TypeAliasType` instances 

2760 - Enums 

2761 """ 

2762 ref = get_type_ref(tp) 

2763 # return the reference if we're either (1) in a cycle or (2) it the reference was already encountered: 

2764 if ref in self._recursively_seen or ref in self._definitions: 

2765 yield (ref, core_schema.definition_reference_schema(ref)) 

2766 else: 

2767 self._recursively_seen.add(ref) 

2768 try: 

2769 yield (ref, None) 

2770 finally: 

2771 self._recursively_seen.discard(ref) 

2772 

2773 def get_schema_from_ref(self, ref: str) -> CoreSchema | None: 

2774 """Resolve the schema from the given reference.""" 

2775 return self._definitions.get(ref) 

2776 

2777 def create_definition_reference_schema(self, schema: CoreSchema) -> core_schema.DefinitionReferenceSchema: 

2778 """Store the schema as a definition and return a `'definition-reference'` schema pointing to it. 

2779 

2780 The schema must have a reference attached to it. 

2781 """ 

2782 ref = schema['ref'] # pyright: ignore 

2783 self._definitions[ref] = schema 

2784 return core_schema.definition_reference_schema(ref) 

2785 

2786 def unpack_definitions(self, schema: core_schema.DefinitionsSchema) -> CoreSchema: 

2787 """Store the definitions of the `'definitions'` core schema and return the inner core schema.""" 

2788 for def_schema in schema['definitions']: 

2789 self._definitions[def_schema['ref']] = def_schema # pyright: ignore 

2790 return schema['schema'] 

2791 

2792 def finalize_schema(self, schema: CoreSchema) -> CoreSchema: 

2793 """Finalize the core schema. 

2794 

2795 This traverses the core schema and referenced definitions, replaces `'definition-ref'` schemas 

2796 by the referenced definition if possible, and applies deferred discriminators. 

2797 """ 

2798 definitions = self._definitions 

2799 try: 

2800 gather_result = gather_schemas_for_cleaning( 

2801 schema, 

2802 definitions=definitions, 

2803 ) 

2804 except MissingDefinitionError as e: 

2805 raise InvalidSchemaError from e 

2806 

2807 remaining_defs: dict[str, CoreSchema] = {} 

2808 

2809 # Note: this logic doesn't play well when core schemas with deferred discriminator metadata 

2810 # and references are encountered. See the `test_deferred_discriminated_union_and_references()` test. 

2811 for ref, inlinable_def_ref in gather_result['collected_references'].items(): 

2812 if inlinable_def_ref is not None and (inlining_behavior := _inlining_behavior(inlinable_def_ref)) != 'keep': 

2813 if inlining_behavior == 'inline': 

2814 # `ref` was encountered, and only once: 

2815 # - `inlinable_def_ref` is a `'definition-ref'` schema and is guaranteed to be 

2816 # the only one. Transform it into the definition it points to. 

2817 # - Do not store the definition in the `remaining_defs`. 

2818 inlinable_def_ref.clear() # pyright: ignore[reportAttributeAccessIssue] 

2819 inlinable_def_ref.update(self._resolve_definition(ref, definitions)) # pyright: ignore 

2820 elif inlining_behavior == 'preserve_metadata': 

2821 # `ref` was encountered, and only once, but contains discriminator metadata. 

2822 # We will do the same thing as if `inlining_behavior` was `'inline'`, but make 

2823 # sure to keep the metadata for the deferred discriminator application logic below. 

2824 meta = inlinable_def_ref.pop('metadata') 

2825 inlinable_def_ref.clear() # pyright: ignore[reportAttributeAccessIssue] 

2826 inlinable_def_ref.update(self._resolve_definition(ref, definitions)) # pyright: ignore 

2827 inlinable_def_ref['metadata'] = meta 

2828 else: 

2829 # `ref` was encountered, at least two times (or only once, but with metadata or a serialization schema): 

2830 # - Do not inline the `'definition-ref'` schemas (they are not provided in the gather result anyway). 

2831 # - Store the the definition in the `remaining_defs` 

2832 remaining_defs[ref] = self._resolve_definition(ref, definitions) 

2833 

2834 for cs in gather_result['deferred_discriminator_schemas']: 

2835 discriminator: str | None = cs['metadata'].pop('pydantic_internal_union_discriminator', None) # pyright: ignore[reportTypedDictNotRequiredAccess] 

2836 if discriminator is None: 

2837 # This can happen in rare scenarios, when a deferred schema is present multiple times in the 

2838 # gather result (e.g. when using the `Sequence` type -- see `test_sequence_discriminated_union()`). 

2839 # In this case, a previous loop iteration applied the discriminator and so we can just skip it here. 

2840 continue 

2841 applied = _discriminated_union.apply_discriminator(cs.copy(), discriminator, remaining_defs) 

2842 # Mutate the schema directly to have the discriminator applied 

2843 cs.clear() # pyright: ignore[reportAttributeAccessIssue] 

2844 cs.update(applied) # pyright: ignore 

2845 

2846 if remaining_defs: 

2847 schema = core_schema.definitions_schema(schema=schema, definitions=[*remaining_defs.values()]) 

2848 return schema 

2849 

2850 def _resolve_definition(self, ref: str, definitions: dict[str, CoreSchema]) -> CoreSchema: 

2851 definition = definitions[ref] 

2852 if definition['type'] != 'definition-ref': 

2853 return definition 

2854 

2855 # Some `'definition-ref'` schemas might act as "intermediate" references (e.g. when using 

2856 # a PEP 695 type alias (which is referenceable) that references another PEP 695 type alias): 

2857 visited: set[str] = set() 

2858 while definition['type'] == 'definition-ref' and _inlining_behavior(definition) == 'inline': 

2859 schema_ref = definition['schema_ref'] 

2860 if schema_ref in visited: 

2861 raise PydanticUserError( 

2862 f'{ref} contains a circular reference to itself.', code='circular-reference-schema' 

2863 ) 

2864 visited.add(schema_ref) 

2865 definition = definitions[schema_ref] 

2866 return {**definition, 'ref': ref} # pyright: ignore[reportReturnType] 

2867 

2868 

2869class _FieldNameStack: 

2870 __slots__ = ('_stack',) 

2871 

2872 def __init__(self) -> None: 

2873 self._stack: list[str] = [] 

2874 

2875 @contextmanager 

2876 def push(self, field_name: str) -> Iterator[None]: 

2877 self._stack.append(field_name) 

2878 yield 

2879 self._stack.pop() 

2880 

2881 def get(self) -> str | None: 

2882 if self._stack: 

2883 return self._stack[-1] 

2884 else: 

2885 return None 

2886 

2887 

2888class _ModelTypeStack: 

2889 __slots__ = ('_stack',) 

2890 

2891 def __init__(self) -> None: 

2892 self._stack: list[type] = [] 

2893 

2894 @contextmanager 

2895 def push(self, type_obj: type) -> Iterator[None]: 

2896 self._stack.append(type_obj) 

2897 yield 

2898 self._stack.pop() 

2899 

2900 def get(self) -> type | None: 

2901 if self._stack: 

2902 return self._stack[-1] 

2903 else: 

2904 return None