Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pydantic/_internal/_generate_schema.py: 44%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1320 statements  

1"""Convert python types to pydantic-core schema.""" 

2 

3from __future__ import annotations as _annotations 

4 

5import collections.abc 

6import dataclasses 

7import datetime 

8import inspect 

9import os 

10import pathlib 

11import re 

12import sys 

13import typing 

14import warnings 

15from collections.abc import Generator, Iterable, Iterator, Mapping 

16from contextlib import contextmanager 

17from copy import copy 

18from decimal import Decimal 

19from enum import Enum 

20from fractions import Fraction 

21from functools import partial 

22from inspect import Parameter, _ParameterKind, signature 

23from ipaddress import IPv4Address, IPv4Interface, IPv4Network, IPv6Address, IPv6Interface, IPv6Network 

24from itertools import chain 

25from operator import attrgetter 

26from types import FunctionType, GenericAlias, LambdaType, MethodType 

27from typing import ( 

28 TYPE_CHECKING, 

29 Any, 

30 Callable, 

31 Final, 

32 ForwardRef, 

33 Literal, 

34 TypeVar, 

35 Union, 

36 cast, 

37 overload, 

38) 

39from uuid import UUID 

40from zoneinfo import ZoneInfo 

41 

42import typing_extensions 

43from pydantic_core import ( 

44 MISSING, 

45 CoreSchema, 

46 MultiHostUrl, 

47 PydanticCustomError, 

48 PydanticSerializationUnexpectedValue, 

49 PydanticUndefined, 

50 Url, 

51 core_schema, 

52 to_jsonable_python, 

53) 

54from typing_extensions import TypeAlias, TypeAliasType, get_args, get_origin, is_typeddict 

55from typing_inspection import typing_objects 

56from typing_inspection.introspection import AnnotationSource, get_literal_values, is_union_origin 

57 

58from ..aliases import AliasChoices, AliasPath 

59from ..annotated_handlers import GetCoreSchemaHandler, GetJsonSchemaHandler 

60from ..config import ConfigDict, JsonDict, JsonEncoder, JsonSchemaExtraCallable 

61from ..errors import PydanticSchemaGenerationError, PydanticUndefinedAnnotation, PydanticUserError 

62from ..functional_validators import AfterValidator, BeforeValidator, FieldValidatorModes, PlainValidator, WrapValidator 

63from ..json_schema import JsonSchemaValue 

64from ..version import version_short 

65from ..warnings import ( 

66 ArbitraryTypeWarning, 

67 PydanticDeprecatedSince20, 

68 TypedDictExtraConfigWarning, 

69 UnsupportedFieldAttributeWarning, 

70) 

71from . import _decorators, _discriminated_union, _known_annotated_metadata, _repr, _typing_extra 

72from ._config import ConfigWrapper, ConfigWrapperStack 

73from ._core_metadata import CoreMetadata, update_core_metadata 

74from ._core_utils import ( 

75 get_ref, 

76 get_type_ref, 

77 is_list_like_schema_with_items_schema, 

78) 

79from ._decorators import ( 

80 Decorator, 

81 DecoratorInfos, 

82 FieldSerializerDecoratorInfo, 

83 FieldValidatorDecoratorInfo, 

84 ModelSerializerDecoratorInfo, 

85 ModelValidatorDecoratorInfo, 

86 RootValidatorDecoratorInfo, 

87 ValidatorDecoratorInfo, 

88 get_attribute_from_bases, 

89 inspect_field_serializer, 

90 inspect_model_serializer, 

91 inspect_validator, 

92) 

93from ._docs_extraction import extract_docstrings_from_cls 

94from ._fields import ( 

95 collect_dataclass_fields, 

96 rebuild_dataclass_fields, 

97 rebuild_model_fields, 

98 takes_validated_data_argument, 

99 update_field_from_config, 

100) 

101from ._forward_ref import PydanticRecursiveRef 

102from ._generics import get_standard_typevars_map, replace_types 

103from ._import_utils import import_cached_base_model, import_cached_field_info 

104from ._mock_val_ser import MockCoreSchema 

105from ._namespace_utils import NamespacesTuple, NsResolver 

106from ._schema_gather import MissingDefinitionError, gather_schemas_for_cleaning 

107from ._schema_generation_shared import CallbackGetCoreSchemaHandler 

108from ._utils import lenient_issubclass, smart_deepcopy 

109 

110if TYPE_CHECKING: 

111 from ..fields import ComputedFieldInfo, FieldInfo 

112 from ..main import BaseModel 

113 from ..types import Discriminator 

114 from ._dataclasses import StandardDataclass 

115 from ._schema_generation_shared import GetJsonSchemaFunction 

116 

117_SUPPORTS_TYPEDDICT = sys.version_info >= (3, 12) 

118 

119FieldDecoratorInfo = Union[ValidatorDecoratorInfo, FieldValidatorDecoratorInfo, FieldSerializerDecoratorInfo] 

120FieldDecoratorInfoType = TypeVar('FieldDecoratorInfoType', bound=FieldDecoratorInfo) 

121AnyFieldDecorator = Union[ 

122 Decorator[ValidatorDecoratorInfo], 

123 Decorator[FieldValidatorDecoratorInfo], 

124 Decorator[FieldSerializerDecoratorInfo], 

125] 

126 

127ModifyCoreSchemaWrapHandler: TypeAlias = GetCoreSchemaHandler 

128GetCoreSchemaFunction: TypeAlias = Callable[[Any, ModifyCoreSchemaWrapHandler], core_schema.CoreSchema] 

129ParametersCallback: TypeAlias = "Callable[[int, str, Any], Literal['skip'] | None]" 

130 

131TUPLE_TYPES: list[type] = [typing.Tuple, tuple] # noqa: UP006 

132LIST_TYPES: list[type] = [typing.List, list, collections.abc.MutableSequence] # noqa: UP006 

133SET_TYPES: list[type] = [typing.Set, set, collections.abc.MutableSet] # noqa: UP006 

134FROZEN_SET_TYPES: list[type] = [typing.FrozenSet, frozenset, collections.abc.Set] # noqa: UP006 

135DICT_TYPES: list[type] = [typing.Dict, dict] # noqa: UP006 

136IP_TYPES: list[type] = [IPv4Address, IPv4Interface, IPv4Network, IPv6Address, IPv6Interface, IPv6Network] 

137SEQUENCE_TYPES: list[type] = [typing.Sequence, collections.abc.Sequence] 

138ITERABLE_TYPES: list[type] = [typing.Iterable, collections.abc.Iterable, typing.Generator, collections.abc.Generator] 

139TYPE_TYPES: list[type] = [typing.Type, type] # noqa: UP006 

140PATTERN_TYPES: list[type] = [typing.Pattern, re.Pattern] 

141PATH_TYPES: list[type] = [ 

142 os.PathLike, 

143 pathlib.Path, 

144 pathlib.PurePath, 

145 pathlib.PosixPath, 

146 pathlib.PurePosixPath, 

147 pathlib.PureWindowsPath, 

148] 

149MAPPING_TYPES = [ 

150 typing.Mapping, 

151 typing.MutableMapping, 

152 collections.abc.Mapping, 

153 collections.abc.MutableMapping, 

154 collections.OrderedDict, 

155 typing_extensions.OrderedDict, 

156 typing.DefaultDict, # noqa: UP006 

157 collections.defaultdict, 

158] 

159COUNTER_TYPES = [collections.Counter, typing.Counter] 

160DEQUE_TYPES: list[type] = [collections.deque, typing.Deque] # noqa: UP006 

161 

162# Note: This does not play very well with type checkers. For example, 

163# `a: LambdaType = lambda x: x` will raise a type error by Pyright. 

164ValidateCallSupportedTypes = Union[ 

165 LambdaType, 

166 FunctionType, 

167 MethodType, 

168 partial, 

169] 

170 

171VALIDATE_CALL_SUPPORTED_TYPES = get_args(ValidateCallSupportedTypes) 

172UNSUPPORTED_STANDALONE_FIELDINFO_ATTRIBUTES: list[tuple[str, Any]] = [ 

173 ('alias', None), 

174 ('validation_alias', None), 

175 ('serialization_alias', None), 

176 # will be set if any alias is set, so disable it to avoid double warnings: 

177 # 'alias_priority', 

178 ('default', PydanticUndefined), 

179 ('default_factory', None), 

180 ('exclude', None), 

181 ('deprecated', None), 

182 ('repr', True), 

183 ('validate_default', None), 

184 ('frozen', None), 

185 ('init', None), 

186 ('init_var', None), 

187 ('kw_only', None), 

188] 

189"""`FieldInfo` attributes (and their default value) that can't be used outside of a model (e.g. in a type adapter or a PEP 695 type alias).""" 

190 

191_mode_to_validator: dict[ 

192 FieldValidatorModes, type[BeforeValidator | AfterValidator | PlainValidator | WrapValidator] 

193] = {'before': BeforeValidator, 'after': AfterValidator, 'plain': PlainValidator, 'wrap': WrapValidator} 

194 

195 

196def check_validator_fields_against_field_name( 

197 info: FieldDecoratorInfo, 

198 field: str, 

199) -> bool: 

200 """Check if field name is in validator fields. 

201 

202 Args: 

203 info: The field info. 

204 field: The field name to check. 

205 

206 Returns: 

207 `True` if field name is in validator fields, `False` otherwise. 

208 """ 

209 fields = info.fields 

210 return '*' in fields or field in fields 

211 

212 

213def check_decorator_fields_exist(decorators: Iterable[AnyFieldDecorator], fields: Iterable[str]) -> None: 

214 """Check if the defined fields in decorators exist in `fields` param. 

215 

216 It ignores the check for a decorator if the decorator has `*` as field or `check_fields=False`. 

217 

218 Args: 

219 decorators: An iterable of decorators. 

220 fields: An iterable of fields name. 

221 

222 Raises: 

223 PydanticUserError: If one of the field names does not exist in `fields` param. 

224 """ 

225 fields = set(fields) 

226 for dec in decorators: 

227 if '*' in dec.info.fields: 

228 continue 

229 if dec.info.check_fields is False: 

230 continue 

231 for field in dec.info.fields: 

232 if field not in fields: 

233 raise PydanticUserError( 

234 f'Decorators defined with incorrect fields: {dec.cls_ref}.{dec.cls_var_name}' 

235 " (use check_fields=False if you're inheriting from the model and intended this)", 

236 code='decorator-missing-field', 

237 ) 

238 

239 

240def filter_field_decorator_info_by_field( 

241 validator_functions: Iterable[Decorator[FieldDecoratorInfoType]], field: str 

242) -> list[Decorator[FieldDecoratorInfoType]]: 

243 return [dec for dec in validator_functions if check_validator_fields_against_field_name(dec.info, field)] 

244 

245 

246def apply_each_item_validators( 

247 schema: core_schema.CoreSchema, 

248 each_item_validators: list[Decorator[ValidatorDecoratorInfo]], 

249) -> core_schema.CoreSchema: 

250 # This V1 compatibility shim should eventually be removed 

251 

252 # fail early if each_item_validators is empty 

253 if not each_item_validators: 

254 return schema 

255 

256 # push down any `each_item=True` validators 

257 # note that this won't work for any Annotated types that get wrapped by a function validator 

258 # but that's okay because that didn't exist in V1 

259 if schema['type'] == 'nullable': 

260 schema['schema'] = apply_each_item_validators(schema['schema'], each_item_validators) 

261 return schema 

262 elif schema['type'] == 'tuple': 

263 if (variadic_item_index := schema.get('variadic_item_index')) is not None: 

264 schema['items_schema'][variadic_item_index] = apply_validators( 

265 schema['items_schema'][variadic_item_index], 

266 each_item_validators, 

267 ) 

268 elif is_list_like_schema_with_items_schema(schema): 

269 inner_schema = schema.get('items_schema', core_schema.any_schema()) 

270 schema['items_schema'] = apply_validators(inner_schema, each_item_validators) 

271 elif schema['type'] == 'dict': 

272 inner_schema = schema.get('values_schema', core_schema.any_schema()) 

273 schema['values_schema'] = apply_validators(inner_schema, each_item_validators) 

274 else: 

275 raise TypeError( 

276 f'`@validator(..., each_item=True)` cannot be applied to fields with a schema of {schema["type"]}' 

277 ) 

278 return schema 

279 

280 

281def _extract_json_schema_info_from_field_info( 

282 info: FieldInfo | ComputedFieldInfo, 

283) -> tuple[JsonDict | None, JsonDict | JsonSchemaExtraCallable | None]: 

284 json_schema_updates = { 

285 'title': info.title, 

286 'description': info.description, 

287 'deprecated': bool(info.deprecated) or info.deprecated == '' or None, 

288 'examples': to_jsonable_python(info.examples), 

289 } 

290 json_schema_updates = {k: v for k, v in json_schema_updates.items() if v is not None} 

291 return (json_schema_updates or None, info.json_schema_extra) 

292 

293 

294JsonEncoders = dict[type[Any], JsonEncoder] 

295 

296 

297def _add_custom_serialization_from_json_encoders( 

298 json_encoders: JsonEncoders | None, tp: Any, schema: CoreSchema 

299) -> CoreSchema: 

300 """Iterate over the json_encoders and add the first matching encoder to the schema. 

301 

302 Args: 

303 json_encoders: A dictionary of types and their encoder functions. 

304 tp: The type to check for a matching encoder. 

305 schema: The schema to add the encoder to. 

306 """ 

307 if not json_encoders: 

308 return schema 

309 if 'serialization' in schema: 

310 return schema 

311 # Check the class type and its superclasses for a matching encoder 

312 # Decimal.__class__.__mro__ (and probably other cases) doesn't include Decimal itself 

313 # if the type is a GenericAlias (e.g. from list[int]) we need to use __class__ instead of .__mro__ 

314 for base in (tp, *getattr(tp, '__mro__', tp.__class__.__mro__)[:-1]): 

315 encoder = json_encoders.get(base) 

316 if encoder is None: 

317 continue 

318 

319 warnings.warn( 

320 f'`json_encoders` is deprecated. See https://docs.pydantic.dev/{version_short()}/concepts/serialization/#custom-serializers for alternatives', 

321 PydanticDeprecatedSince20, 

322 ) 

323 

324 # TODO: in theory we should check that the schema accepts a serialization key 

325 schema['serialization'] = core_schema.plain_serializer_function_ser_schema(encoder, when_used='json') 

326 return schema 

327 

328 return schema 

329 

330 

331class InvalidSchemaError(Exception): 

332 """The core schema is invalid.""" 

333 

334 

335class GenerateSchema: 

336 """Generate core schema for a Pydantic model, dataclass and types like `str`, `datetime`, ... .""" 

337 

338 __slots__ = ( 

339 '_config_wrapper_stack', 

340 '_ns_resolver', 

341 '_typevars_map', 

342 'field_name_stack', 

343 'model_type_stack', 

344 'defs', 

345 ) 

346 

347 def __init__( 

348 self, 

349 config_wrapper: ConfigWrapper, 

350 ns_resolver: NsResolver | None = None, 

351 typevars_map: Mapping[TypeVar, Any] | None = None, 

352 ) -> None: 

353 # we need a stack for recursing into nested models 

354 self._config_wrapper_stack = ConfigWrapperStack(config_wrapper) 

355 self._ns_resolver = ns_resolver or NsResolver() 

356 self._typevars_map = typevars_map 

357 self.field_name_stack = _FieldNameStack() 

358 self.model_type_stack = _ModelTypeStack() 

359 self.defs = _Definitions() 

360 

361 def __init_subclass__(cls) -> None: 

362 super().__init_subclass__() 

363 warnings.warn( 

364 'Subclassing `GenerateSchema` is not supported. The API is highly subject to change in minor versions.', 

365 UserWarning, 

366 stacklevel=2, 

367 ) 

368 

369 @property 

370 def _config_wrapper(self) -> ConfigWrapper: 

371 return self._config_wrapper_stack.tail 

372 

373 @property 

374 def _types_namespace(self) -> NamespacesTuple: 

375 return self._ns_resolver.types_namespace 

376 

377 @property 

378 def _arbitrary_types(self) -> bool: 

379 return self._config_wrapper.arbitrary_types_allowed 

380 

381 # the following methods can be overridden but should be considered 

382 # unstable / private APIs 

383 def _list_schema(self, items_type: Any) -> CoreSchema: 

384 return core_schema.list_schema(self.generate_schema(items_type)) 

385 

386 def _dict_schema(self, keys_type: Any, values_type: Any) -> CoreSchema: 

387 return core_schema.dict_schema(self.generate_schema(keys_type), self.generate_schema(values_type)) 

388 

389 def _set_schema(self, items_type: Any) -> CoreSchema: 

390 return core_schema.set_schema(self.generate_schema(items_type)) 

391 

392 def _frozenset_schema(self, items_type: Any) -> CoreSchema: 

393 return core_schema.frozenset_schema(self.generate_schema(items_type)) 

394 

395 def _enum_schema(self, enum_type: type[Enum]) -> CoreSchema: 

396 cases: list[Any] = list(enum_type.__members__.values()) 

397 

398 enum_ref = get_type_ref(enum_type) 

399 description = None if not enum_type.__doc__ else inspect.cleandoc(enum_type.__doc__) 

400 if ( 

401 description == 'An enumeration.' 

402 ): # This is the default value provided by enum.EnumMeta.__new__; don't use it 

403 description = None 

404 js_updates = {'title': enum_type.__name__, 'description': description} 

405 js_updates = {k: v for k, v in js_updates.items() if v is not None} 

406 

407 sub_type: Literal['str', 'int', 'float'] | None = None 

408 if issubclass(enum_type, int): 

409 sub_type = 'int' 

410 value_ser_type: core_schema.SerSchema = core_schema.simple_ser_schema('int') 

411 elif issubclass(enum_type, str): 

412 # this handles `StrEnum` (3.11 only), and also `Foobar(str, Enum)` 

413 sub_type = 'str' 

414 value_ser_type = core_schema.simple_ser_schema('str') 

415 elif issubclass(enum_type, float): 

416 sub_type = 'float' 

417 value_ser_type = core_schema.simple_ser_schema('float') 

418 else: 

419 # TODO this is an ugly hack, how do we trigger an Any schema for serialization? 

420 value_ser_type = core_schema.plain_serializer_function_ser_schema(lambda x: x) 

421 

422 if cases: 

423 

424 def get_json_schema(schema: CoreSchema, handler: GetJsonSchemaHandler) -> JsonSchemaValue: 

425 json_schema = handler(schema) 

426 original_schema = handler.resolve_ref_schema(json_schema) 

427 original_schema.update(js_updates) 

428 return json_schema 

429 

430 # we don't want to add the missing to the schema if it's the default one 

431 default_missing = getattr(enum_type._missing_, '__func__', None) is Enum._missing_.__func__ # pyright: ignore[reportFunctionMemberAccess] 

432 enum_schema = core_schema.enum_schema( 

433 enum_type, 

434 cases, 

435 sub_type=sub_type, 

436 missing=None if default_missing else enum_type._missing_, 

437 ref=enum_ref, 

438 metadata={'pydantic_js_functions': [get_json_schema]}, 

439 ) 

440 

441 if self._config_wrapper.use_enum_values: 

442 enum_schema = core_schema.no_info_after_validator_function( 

443 attrgetter('value'), enum_schema, serialization=value_ser_type 

444 ) 

445 

446 return enum_schema 

447 

448 else: 

449 

450 def get_json_schema_no_cases(_, handler: GetJsonSchemaHandler) -> JsonSchemaValue: 

451 json_schema = handler(core_schema.enum_schema(enum_type, cases, sub_type=sub_type, ref=enum_ref)) 

452 original_schema = handler.resolve_ref_schema(json_schema) 

453 original_schema.update(js_updates) 

454 return json_schema 

455 

456 # Use an isinstance check for enums with no cases. 

457 # The most important use case for this is creating TypeVar bounds for generics that should 

458 # be restricted to enums. This is more consistent than it might seem at first, since you can only 

459 # subclass enum.Enum (or subclasses of enum.Enum) if all parent classes have no cases. 

460 # We use the get_json_schema function when an Enum subclass has been declared with no cases 

461 # so that we can still generate a valid json schema. 

462 return core_schema.is_instance_schema( 

463 enum_type, 

464 metadata={'pydantic_js_functions': [get_json_schema_no_cases]}, 

465 ) 

466 

467 def _ip_schema(self, tp: Any) -> CoreSchema: 

468 from ._validators import IP_VALIDATOR_LOOKUP, IpType 

469 

470 ip_type_json_schema_format: dict[type[IpType], str] = { 

471 IPv4Address: 'ipv4', 

472 IPv4Network: 'ipv4network', 

473 IPv4Interface: 'ipv4interface', 

474 IPv6Address: 'ipv6', 

475 IPv6Network: 'ipv6network', 

476 IPv6Interface: 'ipv6interface', 

477 } 

478 

479 def ser_ip(ip: Any, info: core_schema.SerializationInfo) -> str | IpType: 

480 if not isinstance(ip, (tp, str)): 

481 raise PydanticSerializationUnexpectedValue( 

482 f"Expected `{tp}` but got `{type(ip)}` with value `'{ip}'` - serialized value may not be as expected." 

483 ) 

484 if info.mode == 'python': 

485 return ip 

486 return str(ip) 

487 

488 return core_schema.lax_or_strict_schema( 

489 lax_schema=core_schema.no_info_plain_validator_function(IP_VALIDATOR_LOOKUP[tp]), 

490 strict_schema=core_schema.json_or_python_schema( 

491 json_schema=core_schema.no_info_after_validator_function(tp, core_schema.str_schema()), 

492 python_schema=core_schema.is_instance_schema(tp), 

493 ), 

494 serialization=core_schema.plain_serializer_function_ser_schema(ser_ip, info_arg=True, when_used='always'), 

495 metadata={ 

496 'pydantic_js_functions': [lambda _1, _2: {'type': 'string', 'format': ip_type_json_schema_format[tp]}] 

497 }, 

498 ) 

499 

500 def _path_schema(self, tp: Any, path_type: Any) -> CoreSchema: 

501 if tp is os.PathLike and (path_type not in {str, bytes} and not typing_objects.is_any(path_type)): 

502 raise PydanticUserError( 

503 '`os.PathLike` can only be used with `str`, `bytes` or `Any`', code='schema-for-unknown-type' 

504 ) 

505 

506 path_constructor = pathlib.PurePath if tp is os.PathLike else tp 

507 strict_inner_schema = ( 

508 core_schema.bytes_schema(strict=True) if (path_type is bytes) else core_schema.str_schema(strict=True) 

509 ) 

510 lax_inner_schema = core_schema.bytes_schema() if (path_type is bytes) else core_schema.str_schema() 

511 

512 def path_validator(input_value: str | bytes) -> os.PathLike[Any]: # type: ignore 

513 try: 

514 if path_type is bytes: 

515 if isinstance(input_value, bytes): 

516 try: 

517 input_value = input_value.decode() 

518 except UnicodeDecodeError as e: 

519 raise PydanticCustomError('bytes_type', 'Input must be valid bytes') from e 

520 else: 

521 raise PydanticCustomError('bytes_type', 'Input must be bytes') 

522 elif not isinstance(input_value, str): 

523 raise PydanticCustomError('path_type', 'Input is not a valid path') 

524 

525 return path_constructor(input_value) # type: ignore 

526 except TypeError as e: 

527 raise PydanticCustomError('path_type', 'Input is not a valid path') from e 

528 

529 def ser_path(path: Any, info: core_schema.SerializationInfo) -> str | os.PathLike[Any]: 

530 if not isinstance(path, (tp, str)): 

531 raise PydanticSerializationUnexpectedValue( 

532 f"Expected `{tp}` but got `{type(path)}` with value `'{path}'` - serialized value may not be as expected." 

533 ) 

534 if info.mode == 'python': 

535 return path 

536 return str(path) 

537 

538 instance_schema = core_schema.json_or_python_schema( 

539 json_schema=core_schema.no_info_after_validator_function(path_validator, lax_inner_schema), 

540 python_schema=core_schema.is_instance_schema(tp), 

541 ) 

542 

543 schema = core_schema.lax_or_strict_schema( 

544 lax_schema=core_schema.union_schema( 

545 [ 

546 instance_schema, 

547 core_schema.no_info_after_validator_function(path_validator, strict_inner_schema), 

548 ], 

549 custom_error_type='path_type', 

550 custom_error_message=f'Input is not a valid path for {tp}', 

551 ), 

552 strict_schema=instance_schema, 

553 serialization=core_schema.plain_serializer_function_ser_schema(ser_path, info_arg=True, when_used='always'), 

554 metadata={'pydantic_js_functions': [lambda source, handler: {**handler(source), 'format': 'path'}]}, 

555 ) 

556 return schema 

557 

558 def _deque_schema(self, items_type: Any) -> CoreSchema: 

559 from ._serializers import serialize_sequence_via_list 

560 from ._validators import deque_validator 

561 

562 item_type_schema = self.generate_schema(items_type) 

563 

564 # we have to use a lax list schema here, because we need to validate the deque's 

565 # items via a list schema, but it's ok if the deque itself is not a list 

566 list_schema = core_schema.list_schema(item_type_schema, strict=False) 

567 

568 check_instance = core_schema.json_or_python_schema( 

569 json_schema=list_schema, 

570 python_schema=core_schema.is_instance_schema(collections.deque, cls_repr='Deque'), 

571 ) 

572 

573 lax_schema = core_schema.no_info_wrap_validator_function(deque_validator, list_schema) 

574 

575 return core_schema.lax_or_strict_schema( 

576 lax_schema=lax_schema, 

577 strict_schema=core_schema.chain_schema([check_instance, lax_schema]), 

578 serialization=core_schema.wrap_serializer_function_ser_schema( 

579 serialize_sequence_via_list, schema=item_type_schema, info_arg=True 

580 ), 

581 ) 

582 

583 def _mapping_schema(self, tp: Any, keys_type: Any, values_type: Any) -> CoreSchema: 

584 from ._validators import MAPPING_ORIGIN_MAP, defaultdict_validator, get_defaultdict_default_default_factory 

585 

586 mapped_origin = MAPPING_ORIGIN_MAP[tp] 

587 keys_schema = self.generate_schema(keys_type) 

588 with warnings.catch_warnings(): 

589 # We kind of abused `Field()` default factories to be able to specify 

590 # the `defaultdict`'s `default_factory`. As a consequence, we get warnings 

591 # as normally `FieldInfo.default_factory` is unsupported in the context where 

592 # `Field()` is used and our only solution is to ignore them (note that this might 

593 # wrongfully ignore valid warnings, e.g. if the `value_type` is a PEP 695 type alias 

594 # with unsupported metadata). 

595 warnings.simplefilter('ignore', category=UnsupportedFieldAttributeWarning) 

596 values_schema = self.generate_schema(values_type) 

597 dict_schema = core_schema.dict_schema(keys_schema, values_schema, strict=False) 

598 

599 if mapped_origin is dict: 

600 schema = dict_schema 

601 else: 

602 check_instance = core_schema.json_or_python_schema( 

603 json_schema=dict_schema, 

604 python_schema=core_schema.is_instance_schema(mapped_origin), 

605 ) 

606 

607 if tp is collections.defaultdict: 

608 default_default_factory = get_defaultdict_default_default_factory(values_type) 

609 coerce_instance_wrap = partial( 

610 core_schema.no_info_wrap_validator_function, 

611 partial(defaultdict_validator, default_default_factory=default_default_factory), 

612 ) 

613 else: 

614 coerce_instance_wrap = partial(core_schema.no_info_after_validator_function, mapped_origin) 

615 

616 lax_schema = coerce_instance_wrap(dict_schema) 

617 strict_schema = core_schema.chain_schema([check_instance, lax_schema]) 

618 

619 schema = core_schema.lax_or_strict_schema( 

620 lax_schema=lax_schema, 

621 strict_schema=strict_schema, 

622 serialization=core_schema.wrap_serializer_function_ser_schema( 

623 lambda v, h: h(v), schema=dict_schema, info_arg=False 

624 ), 

625 ) 

626 

627 return schema 

628 

629 def _fraction_schema(self) -> CoreSchema: 

630 """Support for [`fractions.Fraction`][fractions.Fraction].""" 

631 from ._validators import fraction_validator 

632 

633 # TODO: note, this is a fairly common pattern, re lax / strict for attempted type coercion, 

634 # can we use a helper function to reduce boilerplate? 

635 return core_schema.lax_or_strict_schema( 

636 lax_schema=core_schema.no_info_plain_validator_function(fraction_validator), 

637 strict_schema=core_schema.json_or_python_schema( 

638 json_schema=core_schema.no_info_plain_validator_function(fraction_validator), 

639 python_schema=core_schema.is_instance_schema(Fraction), 

640 ), 

641 # use str serialization to guarantee round trip behavior 

642 serialization=core_schema.to_string_ser_schema(when_used='always'), 

643 metadata={'pydantic_js_functions': [lambda _1, _2: {'type': 'string', 'format': 'fraction'}]}, 

644 ) 

645 

646 def _arbitrary_type_schema(self, tp: Any) -> CoreSchema: 

647 if not isinstance(tp, type): 

648 warnings.warn( 

649 f'{tp!r} is not a Python type (it may be an instance of an object),' 

650 ' Pydantic will allow any object with no validation since we cannot even' 

651 ' enforce that the input is an instance of the given type.' 

652 ' To get rid of this error wrap the type with `pydantic.SkipValidation`.', 

653 ArbitraryTypeWarning, 

654 ) 

655 return core_schema.any_schema() 

656 return core_schema.is_instance_schema(tp) 

657 

658 def _unknown_type_schema(self, obj: Any) -> CoreSchema: 

659 raise PydanticSchemaGenerationError( 

660 f'Unable to generate pydantic-core schema for {obj!r}. ' 

661 'Set `arbitrary_types_allowed=True` in the model_config to ignore this error' 

662 ' or implement `__get_pydantic_core_schema__` on your type to fully support it.' 

663 '\n\nIf you got this error by calling handler(<some type>) within' 

664 ' `__get_pydantic_core_schema__` then you likely need to call' 

665 ' `handler.generate_schema(<some type>)` since we do not call' 

666 ' `__get_pydantic_core_schema__` on `<some type>` otherwise to avoid infinite recursion.' 

667 ) 

668 

669 def _apply_discriminator_to_union( 

670 self, schema: CoreSchema, discriminator: str | Discriminator | None 

671 ) -> CoreSchema: 

672 if discriminator is None: 

673 return schema 

674 try: 

675 return _discriminated_union.apply_discriminator( 

676 schema, 

677 discriminator, 

678 self.defs._definitions, 

679 ) 

680 except _discriminated_union.MissingDefinitionForUnionRef: 

681 # defer until defs are resolved 

682 _discriminated_union.set_discriminator_in_metadata( 

683 schema, 

684 discriminator, 

685 ) 

686 return schema 

687 

688 def clean_schema(self, schema: CoreSchema) -> CoreSchema: 

689 return self.defs.finalize_schema(schema) 

690 

691 def _add_js_function(self, metadata_schema: CoreSchema, js_function: Callable[..., Any]) -> None: 

692 metadata = metadata_schema.get('metadata', {}) 

693 pydantic_js_functions = metadata.setdefault('pydantic_js_functions', []) 

694 # because of how we generate core schemas for nested generic models 

695 # we can end up adding `BaseModel.__get_pydantic_json_schema__` multiple times 

696 # this check may fail to catch duplicates if the function is a `functools.partial` 

697 # or something like that, but if it does it'll fail by inserting the duplicate 

698 if js_function not in pydantic_js_functions: 

699 pydantic_js_functions.append(js_function) 

700 metadata_schema['metadata'] = metadata 

701 

702 def generate_schema( 

703 self, 

704 obj: Any, 

705 ) -> core_schema.CoreSchema: 

706 """Generate core schema. 

707 

708 Args: 

709 obj: The object to generate core schema for. 

710 

711 Returns: 

712 The generated core schema. 

713 

714 Raises: 

715 PydanticUndefinedAnnotation: 

716 If it is not possible to evaluate forward reference. 

717 PydanticSchemaGenerationError: 

718 If it is not possible to generate pydantic-core schema. 

719 TypeError: 

720 - If `alias_generator` returns a disallowed type (must be str, AliasPath or AliasChoices). 

721 - If V1 style validator with `each_item=True` applied on a wrong field. 

722 PydanticUserError: 

723 - If `typing.TypedDict` is used instead of `typing_extensions.TypedDict` on Python < 3.12. 

724 - If `__modify_schema__` method is used instead of `__get_pydantic_json_schema__`. 

725 """ 

726 schema = self._generate_schema_from_get_schema_method(obj, obj) 

727 

728 if schema is None: 

729 schema = self._generate_schema_inner(obj) 

730 

731 metadata_js_function = _extract_get_pydantic_json_schema(obj) 

732 if metadata_js_function is not None: 

733 metadata_schema = resolve_original_schema(schema, self.defs) 

734 if metadata_schema: 

735 self._add_js_function(metadata_schema, metadata_js_function) 

736 

737 schema = _add_custom_serialization_from_json_encoders(self._config_wrapper.json_encoders, obj, schema) 

738 

739 return schema 

740 

741 def _model_schema(self, cls: type[BaseModel]) -> core_schema.CoreSchema: 

742 """Generate schema for a Pydantic model.""" 

743 BaseModel_ = import_cached_base_model() 

744 

745 with self.defs.get_schema_or_ref(cls) as (model_ref, maybe_schema): 

746 if maybe_schema is not None: 

747 return maybe_schema 

748 

749 schema = cls.__dict__.get('__pydantic_core_schema__') 

750 if schema is not None and not isinstance(schema, MockCoreSchema): 

751 if schema['type'] == 'definitions': 

752 schema = self.defs.unpack_definitions(schema) 

753 ref = get_ref(schema) 

754 if ref: 

755 return self.defs.create_definition_reference_schema(schema) 

756 else: 

757 return schema 

758 

759 config_wrapper = ConfigWrapper(cls.model_config, check=False) 

760 

761 with self._config_wrapper_stack.push(config_wrapper), self._ns_resolver.push(cls): 

762 core_config = self._config_wrapper.core_config(title=cls.__name__) 

763 

764 if cls.__pydantic_fields_complete__ or cls is BaseModel_: 

765 fields = getattr(cls, '__pydantic_fields__', {}) 

766 else: 

767 if not hasattr(cls, '__pydantic_fields__'): 

768 # This happens when we have a loop in the schema generation: 

769 # class Base[T](BaseModel): 

770 # t: T 

771 # 

772 # class Other(BaseModel): 

773 # b: 'Base[Other]' 

774 # When we build fields for `Other`, we evaluate the forward annotation. 

775 # At this point, `Other` doesn't have the model fields set. We create 

776 # `Base[Other]`; model fields are successfully built, and we try to generate 

777 # a schema for `t: Other`. As `Other.__pydantic_fields__` aren't set, we abort. 

778 raise PydanticUndefinedAnnotation( 

779 name=cls.__name__, 

780 message=f'Class {cls.__name__!r} is not defined', 

781 ) 

782 try: 

783 fields = rebuild_model_fields( 

784 cls, 

785 config_wrapper=self._config_wrapper, 

786 ns_resolver=self._ns_resolver, 

787 typevars_map=self._typevars_map or {}, 

788 ) 

789 except NameError as e: 

790 raise PydanticUndefinedAnnotation.from_name_error(e) from e 

791 

792 decorators = cls.__pydantic_decorators__ 

793 computed_fields = decorators.computed_fields 

794 check_decorator_fields_exist( 

795 chain( 

796 decorators.field_validators.values(), 

797 decorators.field_serializers.values(), 

798 decorators.validators.values(), 

799 ), 

800 {*fields.keys(), *computed_fields.keys()}, 

801 ) 

802 

803 model_validators = decorators.model_validators.values() 

804 

805 extras_schema = None 

806 extras_keys_schema = None 

807 if core_config.get('extra_fields_behavior') == 'allow': 

808 assert cls.__mro__[0] is cls 

809 assert cls.__mro__[-1] is object 

810 for candidate_cls in cls.__mro__[:-1]: 

811 extras_annotation = getattr(candidate_cls, '__annotations__', {}).get( 

812 '__pydantic_extra__', None 

813 ) 

814 if extras_annotation is not None: 

815 if isinstance(extras_annotation, str): 

816 extras_annotation = _typing_extra.eval_type_backport( 

817 _typing_extra._make_forward_ref( 

818 extras_annotation, is_argument=False, is_class=True 

819 ), 

820 *self._types_namespace, 

821 ) 

822 tp = get_origin(extras_annotation) 

823 if tp not in DICT_TYPES: 

824 raise PydanticSchemaGenerationError( 

825 'The type annotation for `__pydantic_extra__` must be `dict[str, ...]`' 

826 ) 

827 extra_keys_type, extra_items_type = self._get_args_resolving_forward_refs( 

828 extras_annotation, 

829 required=True, 

830 ) 

831 if extra_keys_type is not str: 

832 extras_keys_schema = self.generate_schema(extra_keys_type) 

833 if not typing_objects.is_any(extra_items_type): 

834 extras_schema = self.generate_schema(extra_items_type) 

835 if extras_keys_schema is not None or extras_schema is not None: 

836 break 

837 

838 generic_origin: type[BaseModel] | None = getattr(cls, '__pydantic_generic_metadata__', {}).get('origin') 

839 

840 if cls.__pydantic_root_model__: 

841 # FIXME: should the common field metadata be used here? 

842 inner_schema, _ = self._common_field_schema('root', fields['root'], decorators) 

843 inner_schema = apply_model_validators(inner_schema, model_validators, 'inner') 

844 model_schema = core_schema.model_schema( 

845 cls, 

846 inner_schema, 

847 generic_origin=generic_origin, 

848 custom_init=getattr(cls, '__pydantic_custom_init__', None), 

849 root_model=True, 

850 post_init=getattr(cls, '__pydantic_post_init__', None), 

851 config=core_config, 

852 ref=model_ref, 

853 ) 

854 else: 

855 fields_schema: core_schema.CoreSchema = core_schema.model_fields_schema( 

856 {k: self._generate_md_field_schema(k, v, decorators) for k, v in fields.items()}, 

857 computed_fields=[ 

858 self._computed_field_schema(d, decorators.field_serializers) 

859 for d in computed_fields.values() 

860 ], 

861 extras_schema=extras_schema, 

862 extras_keys_schema=extras_keys_schema, 

863 model_name=cls.__name__, 

864 ) 

865 inner_schema = apply_validators(fields_schema, decorators.root_validators.values()) 

866 inner_schema = apply_model_validators(inner_schema, model_validators, 'inner') 

867 

868 model_schema = core_schema.model_schema( 

869 cls, 

870 inner_schema, 

871 generic_origin=generic_origin, 

872 custom_init=getattr(cls, '__pydantic_custom_init__', None), 

873 root_model=False, 

874 post_init=getattr(cls, '__pydantic_post_init__', None), 

875 config=core_config, 

876 ref=model_ref, 

877 ) 

878 

879 schema = self._apply_model_serializers(model_schema, decorators.model_serializers.values()) 

880 schema = apply_model_validators(schema, model_validators, 'outer') 

881 return self.defs.create_definition_reference_schema(schema) 

882 

883 def _resolve_self_type(self, obj: Any) -> Any: 

884 obj = self.model_type_stack.get() 

885 if obj is None: 

886 raise PydanticUserError('`typing.Self` is invalid in this context', code='invalid-self-type') 

887 return obj 

888 

889 def _generate_schema_from_get_schema_method(self, obj: Any, source: Any) -> core_schema.CoreSchema | None: 

890 BaseModel_ = import_cached_base_model() 

891 

892 get_schema = getattr(obj, '__get_pydantic_core_schema__', None) 

893 is_base_model_get_schema = ( 

894 getattr(get_schema, '__func__', None) is BaseModel_.__get_pydantic_core_schema__.__func__ # pyright: ignore[reportFunctionMemberAccess] 

895 ) 

896 

897 if ( 

898 get_schema is not None 

899 # BaseModel.__get_pydantic_core_schema__ is defined for backwards compatibility, 

900 # to allow existing code to call `super().__get_pydantic_core_schema__` in Pydantic 

901 # model that overrides `__get_pydantic_core_schema__`. However, it raises a deprecation 

902 # warning stating that the method will be removed, and during the core schema gen we actually 

903 # don't call the method: 

904 and not is_base_model_get_schema 

905 ): 

906 # Some referenceable types might have a `__get_pydantic_core_schema__` method 

907 # defined on it by users (e.g. on a dataclass). This generally doesn't play well 

908 # as these types are already recognized by the `GenerateSchema` class and isn't ideal 

909 # as we might end up calling `get_schema_or_ref` (expensive) on types that are actually 

910 # not referenceable: 

911 with self.defs.get_schema_or_ref(obj) as (_, maybe_schema): 

912 if maybe_schema is not None: 

913 return maybe_schema 

914 

915 if obj is source: 

916 ref_mode = 'unpack' 

917 else: 

918 ref_mode = 'to-def' 

919 schema = get_schema( 

920 source, CallbackGetCoreSchemaHandler(self._generate_schema_inner, self, ref_mode=ref_mode) 

921 ) 

922 if schema['type'] == 'definitions': 

923 schema = self.defs.unpack_definitions(schema) 

924 

925 ref = get_ref(schema) 

926 if ref: 

927 return self.defs.create_definition_reference_schema(schema) 

928 

929 # Note: if schema is of type `'definition-ref'`, we might want to copy it as a 

930 # safety measure (because these are inlined in place -- i.e. mutated directly) 

931 return schema 

932 

933 if get_schema is None and (validators := getattr(obj, '__get_validators__', None)) is not None: 

934 from pydantic.v1 import BaseModel as BaseModelV1 

935 

936 if issubclass(obj, BaseModelV1): 

937 warnings.warn( 

938 f'Mixing V1 models and V2 models (or constructs, like `TypeAdapter`) is not supported. Please upgrade `{obj.__name__}` to V2.', 

939 UserWarning, 

940 ) 

941 else: 

942 warnings.warn( 

943 '`__get_validators__` is deprecated and will be removed, use `__get_pydantic_core_schema__` instead.', 

944 PydanticDeprecatedSince20, 

945 ) 

946 return core_schema.chain_schema([core_schema.with_info_plain_validator_function(v) for v in validators()]) 

947 

948 def _resolve_forward_ref(self, obj: Any) -> Any: 

949 # we assume that types_namespace has the target of forward references in its scope, 

950 # but this could fail, for example, if calling Validator on an imported type which contains 

951 # forward references to other types only defined in the module from which it was imported 

952 # `Validator(SomeImportedTypeAliasWithAForwardReference)` 

953 # or the equivalent for BaseModel 

954 # class Model(BaseModel): 

955 # x: SomeImportedTypeAliasWithAForwardReference 

956 try: 

957 obj = _typing_extra.eval_type_backport(obj, *self._types_namespace) 

958 except NameError as e: 

959 raise PydanticUndefinedAnnotation.from_name_error(e) from e 

960 

961 # if obj is still a ForwardRef, it means we can't evaluate it, raise PydanticUndefinedAnnotation 

962 if isinstance(obj, ForwardRef): 

963 raise PydanticUndefinedAnnotation(obj.__forward_arg__, f'Unable to evaluate forward reference {obj}') 

964 

965 if self._typevars_map: 

966 obj = replace_types(obj, self._typevars_map) 

967 

968 return obj 

969 

970 @overload 

971 def _get_args_resolving_forward_refs(self, obj: Any, required: Literal[True]) -> tuple[Any, ...]: ... 

972 

973 @overload 

974 def _get_args_resolving_forward_refs(self, obj: Any) -> tuple[Any, ...] | None: ... 

975 

976 def _get_args_resolving_forward_refs(self, obj: Any, required: bool = False) -> tuple[Any, ...] | None: 

977 args = get_args(obj) 

978 if args: 

979 if isinstance(obj, GenericAlias): 

980 # PEP 585 generic aliases don't convert args to ForwardRefs, unlike `typing.List/Dict` etc. 

981 args = (_typing_extra._make_forward_ref(a) if isinstance(a, str) else a for a in args) 

982 args = tuple(self._resolve_forward_ref(a) if isinstance(a, ForwardRef) else a for a in args) 

983 elif required: # pragma: no cover 

984 raise TypeError(f'Expected {obj} to have generic parameters but it had none') 

985 return args 

986 

987 def _get_first_arg_or_any(self, obj: Any) -> Any: 

988 args = self._get_args_resolving_forward_refs(obj) 

989 if not args: 

990 return Any 

991 return args[0] 

992 

993 def _get_first_two_args_or_any(self, obj: Any) -> tuple[Any, Any]: 

994 args = self._get_args_resolving_forward_refs(obj) 

995 if not args: 

996 return (Any, Any) 

997 if len(args) < 2: 

998 origin = get_origin(obj) 

999 raise TypeError(f'Expected two type arguments for {origin}, got 1') 

1000 return args[0], args[1] 

1001 

1002 def _generate_schema_inner(self, obj: Any) -> core_schema.CoreSchema: 

1003 if typing_objects.is_self(obj): 

1004 obj = self._resolve_self_type(obj) 

1005 

1006 if typing_objects.is_annotated(get_origin(obj)): 

1007 return self._annotated_schema(obj) 

1008 

1009 if isinstance(obj, dict): 

1010 # we assume this is already a valid schema 

1011 return obj # type: ignore[return-value] 

1012 

1013 if isinstance(obj, str): 

1014 obj = ForwardRef(obj) 

1015 

1016 if isinstance(obj, ForwardRef): 

1017 return self.generate_schema(self._resolve_forward_ref(obj)) 

1018 

1019 BaseModel = import_cached_base_model() 

1020 

1021 if lenient_issubclass(obj, BaseModel): 

1022 with self.model_type_stack.push(obj): 

1023 return self._model_schema(obj) 

1024 

1025 if isinstance(obj, PydanticRecursiveRef): 

1026 return core_schema.definition_reference_schema(schema_ref=obj.type_ref) 

1027 

1028 return self.match_type(obj) 

1029 

1030 def match_type(self, obj: Any) -> core_schema.CoreSchema: # noqa: C901 

1031 """Main mapping of types to schemas. 

1032 

1033 The general structure is a series of if statements starting with the simple cases 

1034 (non-generic primitive types) and then handling generics and other more complex cases. 

1035 

1036 Each case either generates a schema directly, calls into a public user-overridable method 

1037 (like `GenerateSchema.tuple_variable_schema`) or calls into a private method that handles some 

1038 boilerplate before calling into the user-facing method (e.g. `GenerateSchema._tuple_schema`). 

1039 

1040 The idea is that we'll evolve this into adding more and more user facing methods over time 

1041 as they get requested and we figure out what the right API for them is. 

1042 """ 

1043 if obj is str: 

1044 return core_schema.str_schema() 

1045 elif obj is bytes: 

1046 return core_schema.bytes_schema() 

1047 elif obj is int: 

1048 return core_schema.int_schema() 

1049 elif obj is float: 

1050 return core_schema.float_schema() 

1051 elif obj is bool: 

1052 return core_schema.bool_schema() 

1053 elif obj is complex: 

1054 return core_schema.complex_schema() 

1055 elif typing_objects.is_any(obj) or obj is object: 

1056 return core_schema.any_schema() 

1057 elif obj is datetime.date: 

1058 return core_schema.date_schema() 

1059 elif obj is datetime.datetime: 

1060 return core_schema.datetime_schema() 

1061 elif obj is datetime.time: 

1062 return core_schema.time_schema() 

1063 elif obj is datetime.timedelta: 

1064 return core_schema.timedelta_schema() 

1065 elif obj is Decimal: 

1066 return core_schema.decimal_schema() 

1067 elif obj is UUID: 

1068 return core_schema.uuid_schema() 

1069 elif obj is Url: 

1070 return core_schema.url_schema() 

1071 elif obj is Fraction: 

1072 return self._fraction_schema() 

1073 elif obj is MultiHostUrl: 

1074 return core_schema.multi_host_url_schema() 

1075 elif obj is None or obj is _typing_extra.NoneType: 

1076 return core_schema.none_schema() 

1077 if obj is MISSING: 

1078 return core_schema.missing_sentinel_schema() 

1079 elif obj in IP_TYPES: 

1080 return self._ip_schema(obj) 

1081 elif obj in TUPLE_TYPES: 

1082 return self._tuple_schema(obj) 

1083 elif obj in LIST_TYPES: 

1084 return self._list_schema(Any) 

1085 elif obj in SET_TYPES: 

1086 return self._set_schema(Any) 

1087 elif obj in FROZEN_SET_TYPES: 

1088 return self._frozenset_schema(Any) 

1089 elif obj in SEQUENCE_TYPES: 

1090 return self._sequence_schema(Any) 

1091 elif obj in ITERABLE_TYPES: 

1092 return self._iterable_schema(obj) 

1093 elif obj in DICT_TYPES: 

1094 return self._dict_schema(Any, Any) 

1095 elif obj in PATH_TYPES: 

1096 return self._path_schema(obj, Any) 

1097 elif obj in DEQUE_TYPES: 

1098 return self._deque_schema(Any) 

1099 elif obj in MAPPING_TYPES: 

1100 return self._mapping_schema(obj, Any, Any) 

1101 elif obj in COUNTER_TYPES: 

1102 return self._mapping_schema(obj, Any, int) 

1103 elif typing_objects.is_typealiastype(obj): 

1104 return self._type_alias_type_schema(obj) 

1105 elif obj is type: 

1106 return self._type_schema() 

1107 elif _typing_extra.is_callable(obj): 

1108 return core_schema.callable_schema() 

1109 elif typing_objects.is_literal(get_origin(obj)): 

1110 return self._literal_schema(obj) 

1111 elif is_typeddict(obj): 

1112 return self._typed_dict_schema(obj, None) 

1113 elif _typing_extra.is_namedtuple(obj): 

1114 return self._namedtuple_schema(obj, None) 

1115 elif typing_objects.is_newtype(obj): 

1116 # NewType, can't use isinstance because it fails <3.10 

1117 return self.generate_schema(obj.__supertype__) 

1118 elif obj in PATTERN_TYPES: 

1119 return self._pattern_schema(obj) 

1120 elif _typing_extra.is_hashable(obj): 

1121 return self._hashable_schema() 

1122 elif isinstance(obj, typing.TypeVar): 

1123 return self._unsubstituted_typevar_schema(obj) 

1124 elif _typing_extra.is_finalvar(obj): 

1125 if obj is Final: 

1126 return core_schema.any_schema() 

1127 return self.generate_schema( 

1128 self._get_first_arg_or_any(obj), 

1129 ) 

1130 elif isinstance(obj, VALIDATE_CALL_SUPPORTED_TYPES): 

1131 return self._call_schema(obj) 

1132 elif inspect.isclass(obj) and issubclass(obj, Enum): 

1133 return self._enum_schema(obj) 

1134 elif obj is ZoneInfo: 

1135 return self._zoneinfo_schema() 

1136 

1137 # dataclasses.is_dataclass coerces dc instances to types, but we only handle 

1138 # the case of a dc type here 

1139 if dataclasses.is_dataclass(obj): 

1140 return self._dataclass_schema(obj, None) # pyright: ignore[reportArgumentType] 

1141 

1142 origin = get_origin(obj) 

1143 if origin is not None: 

1144 return self._match_generic_type(obj, origin) 

1145 

1146 if self._arbitrary_types: 

1147 return self._arbitrary_type_schema(obj) 

1148 return self._unknown_type_schema(obj) 

1149 

1150 def _match_generic_type(self, obj: Any, origin: Any) -> CoreSchema: # noqa: C901 

1151 # Need to handle generic dataclasses before looking for the schema properties because attribute accesses 

1152 # on _GenericAlias delegate to the origin type, so lose the information about the concrete parametrization 

1153 # As a result, currently, there is no way to cache the schema for generic dataclasses. This may be possible 

1154 # to resolve by modifying the value returned by `Generic.__class_getitem__`, but that is a dangerous game. 

1155 if dataclasses.is_dataclass(origin): 

1156 return self._dataclass_schema(obj, origin) # pyright: ignore[reportArgumentType] 

1157 if _typing_extra.is_namedtuple(origin): 

1158 return self._namedtuple_schema(obj, origin) 

1159 

1160 schema = self._generate_schema_from_get_schema_method(origin, obj) 

1161 if schema is not None: 

1162 return schema 

1163 

1164 if typing_objects.is_typealiastype(origin): 

1165 return self._type_alias_type_schema(obj) 

1166 elif is_union_origin(origin): 

1167 return self._union_schema(obj) 

1168 elif origin in TUPLE_TYPES: 

1169 return self._tuple_schema(obj) 

1170 elif origin in LIST_TYPES: 

1171 return self._list_schema(self._get_first_arg_or_any(obj)) 

1172 elif origin in SET_TYPES: 

1173 return self._set_schema(self._get_first_arg_or_any(obj)) 

1174 elif origin in FROZEN_SET_TYPES: 

1175 return self._frozenset_schema(self._get_first_arg_or_any(obj)) 

1176 elif origin in DICT_TYPES: 

1177 return self._dict_schema(*self._get_first_two_args_or_any(obj)) 

1178 elif origin in PATH_TYPES: 

1179 return self._path_schema(origin, self._get_first_arg_or_any(obj)) 

1180 elif origin in DEQUE_TYPES: 

1181 return self._deque_schema(self._get_first_arg_or_any(obj)) 

1182 elif origin in MAPPING_TYPES: 

1183 return self._mapping_schema(origin, *self._get_first_two_args_or_any(obj)) 

1184 elif origin in COUNTER_TYPES: 

1185 return self._mapping_schema(origin, self._get_first_arg_or_any(obj), int) 

1186 elif is_typeddict(origin): 

1187 return self._typed_dict_schema(obj, origin) 

1188 elif origin in TYPE_TYPES: 

1189 return self._subclass_schema(obj) 

1190 elif origin in SEQUENCE_TYPES: 

1191 return self._sequence_schema(self._get_first_arg_or_any(obj)) 

1192 elif origin in ITERABLE_TYPES: 

1193 return self._iterable_schema(obj) 

1194 elif origin in PATTERN_TYPES: 

1195 return self._pattern_schema(obj) 

1196 

1197 if self._arbitrary_types: 

1198 return self._arbitrary_type_schema(origin) 

1199 return self._unknown_type_schema(obj) 

1200 

1201 def _generate_td_field_schema( 

1202 self, 

1203 name: str, 

1204 field_info: FieldInfo, 

1205 decorators: DecoratorInfos, 

1206 *, 

1207 required: bool = True, 

1208 ) -> core_schema.TypedDictField: 

1209 """Prepare a TypedDictField to represent a model or typeddict field.""" 

1210 schema, metadata = self._common_field_schema(name, field_info, decorators) 

1211 return core_schema.typed_dict_field( 

1212 schema, 

1213 required=False if not field_info.is_required() else required, 

1214 serialization_exclude=field_info.exclude, 

1215 validation_alias=_convert_to_aliases(field_info.validation_alias), 

1216 serialization_alias=field_info.serialization_alias, 

1217 serialization_exclude_if=field_info.exclude_if, 

1218 metadata=metadata, 

1219 ) 

1220 

1221 def _generate_md_field_schema( 

1222 self, 

1223 name: str, 

1224 field_info: FieldInfo, 

1225 decorators: DecoratorInfos, 

1226 ) -> core_schema.ModelField: 

1227 """Prepare a ModelField to represent a model field.""" 

1228 schema, metadata = self._common_field_schema(name, field_info, decorators) 

1229 return core_schema.model_field( 

1230 schema, 

1231 serialization_exclude=field_info.exclude, 

1232 validation_alias=_convert_to_aliases(field_info.validation_alias), 

1233 serialization_alias=field_info.serialization_alias, 

1234 serialization_exclude_if=field_info.exclude_if, 

1235 frozen=field_info.frozen, 

1236 metadata=metadata, 

1237 ) 

1238 

1239 def _generate_dc_field_schema( 

1240 self, 

1241 name: str, 

1242 field_info: FieldInfo, 

1243 decorators: DecoratorInfos, 

1244 ) -> core_schema.DataclassField: 

1245 """Prepare a DataclassField to represent the parameter/field, of a dataclass.""" 

1246 schema, metadata = self._common_field_schema(name, field_info, decorators) 

1247 return core_schema.dataclass_field( 

1248 name, 

1249 schema, 

1250 init=field_info.init, 

1251 init_only=field_info.init_var or None, 

1252 kw_only=None if field_info.kw_only else False, 

1253 serialization_exclude=field_info.exclude, 

1254 validation_alias=_convert_to_aliases(field_info.validation_alias), 

1255 serialization_alias=field_info.serialization_alias, 

1256 serialization_exclude_if=field_info.exclude_if, 

1257 frozen=field_info.frozen, 

1258 metadata=metadata, 

1259 ) 

1260 

1261 def _common_field_schema( # C901 

1262 self, name: str, field_info: FieldInfo, decorators: DecoratorInfos 

1263 ) -> tuple[CoreSchema, dict[str, Any]]: 

1264 source_type, annotations = field_info.annotation, field_info.metadata 

1265 

1266 def set_discriminator(schema: CoreSchema) -> CoreSchema: 

1267 schema = self._apply_discriminator_to_union(schema, field_info.discriminator) 

1268 return schema 

1269 

1270 # Convert `@field_validator` decorators to `Before/After/Plain/WrapValidator` instances: 

1271 validators_from_decorators = [ 

1272 _mode_to_validator[decorator.info.mode]._from_decorator(decorator) 

1273 for decorator in filter_field_decorator_info_by_field(decorators.field_validators.values(), name) 

1274 ] 

1275 

1276 with self.field_name_stack.push(name): 

1277 if field_info.discriminator is not None: 

1278 schema = self._apply_annotations( 

1279 source_type, annotations + validators_from_decorators, transform_inner_schema=set_discriminator 

1280 ) 

1281 else: 

1282 schema = self._apply_annotations( 

1283 source_type, 

1284 annotations + validators_from_decorators, 

1285 ) 

1286 

1287 # This V1 compatibility shim should eventually be removed 

1288 # push down any `each_item=True` validators 

1289 # note that this won't work for any Annotated types that get wrapped by a function validator 

1290 # but that's okay because that didn't exist in V1 

1291 this_field_validators = filter_field_decorator_info_by_field(decorators.validators.values(), name) 

1292 if _validators_require_validate_default(this_field_validators): 

1293 field_info.validate_default = True 

1294 each_item_validators = [v for v in this_field_validators if v.info.each_item is True] 

1295 this_field_validators = [v for v in this_field_validators if v not in each_item_validators] 

1296 schema = apply_each_item_validators(schema, each_item_validators) 

1297 

1298 schema = apply_validators(schema, this_field_validators) 

1299 

1300 # the default validator needs to go outside of any other validators 

1301 # so that it is the topmost validator for the field validator 

1302 # which uses it to check if the field has a default value or not 

1303 if not field_info.is_required(): 

1304 schema = wrap_default(field_info, schema) 

1305 

1306 schema = self._apply_field_serializers( 

1307 schema, filter_field_decorator_info_by_field(decorators.field_serializers.values(), name) 

1308 ) 

1309 

1310 pydantic_js_updates, pydantic_js_extra = _extract_json_schema_info_from_field_info(field_info) 

1311 core_metadata: dict[str, Any] = {} 

1312 update_core_metadata( 

1313 core_metadata, pydantic_js_updates=pydantic_js_updates, pydantic_js_extra=pydantic_js_extra 

1314 ) 

1315 

1316 return schema, core_metadata 

1317 

1318 def _union_schema(self, union_type: Any) -> core_schema.CoreSchema: 

1319 """Generate schema for a Union.""" 

1320 args = self._get_args_resolving_forward_refs(union_type, required=True) 

1321 choices: list[CoreSchema] = [] 

1322 nullable = False 

1323 for arg in args: 

1324 if arg is None or arg is _typing_extra.NoneType: 

1325 nullable = True 

1326 else: 

1327 choices.append(self.generate_schema(arg)) 

1328 

1329 if len(choices) == 1: 

1330 s = choices[0] 

1331 else: 

1332 choices_with_tags: list[CoreSchema | tuple[CoreSchema, str]] = [] 

1333 for choice in choices: 

1334 tag = cast(CoreMetadata, choice.get('metadata', {})).get('pydantic_internal_union_tag_key') 

1335 if tag is not None: 

1336 choices_with_tags.append((choice, tag)) 

1337 else: 

1338 choices_with_tags.append(choice) 

1339 s = core_schema.union_schema(choices_with_tags) 

1340 

1341 if nullable: 

1342 s = core_schema.nullable_schema(s) 

1343 return s 

1344 

1345 def _type_alias_type_schema(self, obj: TypeAliasType) -> CoreSchema: 

1346 with self.defs.get_schema_or_ref(obj) as (ref, maybe_schema): 

1347 if maybe_schema is not None: 

1348 return maybe_schema 

1349 

1350 origin: TypeAliasType = get_origin(obj) or obj 

1351 typevars_map = get_standard_typevars_map(obj) 

1352 

1353 with self._ns_resolver.push(origin): 

1354 try: 

1355 annotation = _typing_extra.eval_type(origin.__value__, *self._types_namespace) 

1356 except NameError as e: 

1357 raise PydanticUndefinedAnnotation.from_name_error(e) from e 

1358 annotation = replace_types(annotation, typevars_map) 

1359 schema = self.generate_schema(annotation) 

1360 assert schema['type'] != 'definitions' 

1361 schema['ref'] = ref # type: ignore 

1362 return self.defs.create_definition_reference_schema(schema) 

1363 

1364 def _literal_schema(self, literal_type: Any) -> CoreSchema: 

1365 """Generate schema for a Literal.""" 

1366 expected = list(get_literal_values(literal_type, type_check=False, unpack_type_aliases='eager')) 

1367 assert expected, f'literal "expected" cannot be empty, obj={literal_type}' 

1368 schema = core_schema.literal_schema(expected) 

1369 

1370 if self._config_wrapper.use_enum_values and any(isinstance(v, Enum) for v in expected): 

1371 schema = core_schema.no_info_after_validator_function( 

1372 lambda v: v.value if isinstance(v, Enum) else v, schema 

1373 ) 

1374 

1375 return schema 

1376 

1377 def _typed_dict_schema(self, typed_dict_cls: Any, origin: Any) -> core_schema.CoreSchema: 

1378 """Generate a core schema for a `TypedDict` class. 

1379 

1380 To be able to build a `DecoratorInfos` instance for the `TypedDict` class (which will include 

1381 validators, serializers, etc.), we need to have access to the original bases of the class 

1382 (see https://docs.python.org/3/library/types.html#types.get_original_bases). 

1383 However, the `__orig_bases__` attribute was only added in 3.12 (https://github.com/python/cpython/pull/103698). 

1384 

1385 For this reason, we require Python 3.12 (or using the `typing_extensions` backport). 

1386 """ 

1387 FieldInfo = import_cached_field_info() 

1388 

1389 with ( 

1390 self.model_type_stack.push(typed_dict_cls), 

1391 self.defs.get_schema_or_ref(typed_dict_cls) as ( 

1392 typed_dict_ref, 

1393 maybe_schema, 

1394 ), 

1395 ): 

1396 if maybe_schema is not None: 

1397 return maybe_schema 

1398 

1399 typevars_map = get_standard_typevars_map(typed_dict_cls) 

1400 if origin is not None: 

1401 typed_dict_cls = origin 

1402 

1403 if not _SUPPORTS_TYPEDDICT and type(typed_dict_cls).__module__ == 'typing': 

1404 raise PydanticUserError( 

1405 'Please use `typing_extensions.TypedDict` instead of `typing.TypedDict` on Python < 3.12.', 

1406 code='typed-dict-version', 

1407 ) 

1408 

1409 try: 

1410 # if a typed dictionary class doesn't have config, we use the parent's config, hence a default of `None` 

1411 # see https://github.com/pydantic/pydantic/issues/10917 

1412 config: ConfigDict | None = get_attribute_from_bases(typed_dict_cls, '__pydantic_config__') 

1413 except AttributeError: 

1414 config = None 

1415 

1416 with self._config_wrapper_stack.push(config): 

1417 core_config = self._config_wrapper.core_config(title=typed_dict_cls.__name__) 

1418 

1419 required_keys: frozenset[str] = typed_dict_cls.__required_keys__ 

1420 

1421 fields: dict[str, core_schema.TypedDictField] = {} 

1422 

1423 decorators = DecoratorInfos.build(typed_dict_cls) 

1424 decorators.update_from_config(self._config_wrapper) 

1425 

1426 if self._config_wrapper.use_attribute_docstrings: 

1427 field_docstrings = extract_docstrings_from_cls(typed_dict_cls, use_inspect=True) 

1428 else: 

1429 field_docstrings = None 

1430 

1431 try: 

1432 annotations = _typing_extra.get_cls_type_hints(typed_dict_cls, ns_resolver=self._ns_resolver) 

1433 except NameError as e: 

1434 raise PydanticUndefinedAnnotation.from_name_error(e) from e 

1435 

1436 readonly_fields: list[str] = [] 

1437 

1438 for field_name, annotation in annotations.items(): 

1439 field_info = FieldInfo.from_annotation(annotation, _source=AnnotationSource.TYPED_DICT) 

1440 field_info.annotation = replace_types(field_info.annotation, typevars_map) 

1441 

1442 required = ( 

1443 field_name in required_keys or 'required' in field_info._qualifiers 

1444 ) and 'not_required' not in field_info._qualifiers 

1445 if 'read_only' in field_info._qualifiers: 

1446 readonly_fields.append(field_name) 

1447 

1448 if ( 

1449 field_docstrings is not None 

1450 and field_info.description is None 

1451 and field_name in field_docstrings 

1452 ): 

1453 field_info.description = field_docstrings[field_name] 

1454 update_field_from_config(self._config_wrapper, field_name, field_info) 

1455 

1456 fields[field_name] = self._generate_td_field_schema( 

1457 field_name, field_info, decorators, required=required 

1458 ) 

1459 

1460 if readonly_fields: 

1461 fields_repr = ', '.join(repr(f) for f in readonly_fields) 

1462 plural = len(readonly_fields) >= 2 

1463 warnings.warn( 

1464 f'Item{"s" if plural else ""} {fields_repr} on TypedDict class {typed_dict_cls.__name__!r} ' 

1465 f'{"are" if plural else "is"} using the `ReadOnly` qualifier. Pydantic will not protect items ' 

1466 'from any mutation on dictionary instances.', 

1467 UserWarning, 

1468 ) 

1469 

1470 extra_behavior: core_schema.ExtraBehavior = 'ignore' 

1471 extras_schema: CoreSchema | None = None # For 'allow', equivalent to `Any` - no validation performed. 

1472 

1473 # `__closed__` is `None` when not specified (equivalent to `False`): 

1474 is_closed = bool(getattr(typed_dict_cls, '__closed__', False)) 

1475 extra_items = getattr(typed_dict_cls, '__extra_items__', typing_extensions.NoExtraItems) 

1476 if is_closed: 

1477 extra_behavior = 'forbid' 

1478 extras_schema = None 

1479 elif not typing_objects.is_noextraitems(extra_items): 

1480 extra_behavior = 'allow' 

1481 extras_schema = self.generate_schema(replace_types(extra_items, typevars_map)) 

1482 

1483 if (config_extra := self._config_wrapper.extra) in ('allow', 'forbid'): 

1484 if is_closed and config_extra == 'allow': 

1485 warnings.warn( 

1486 f"TypedDict class {typed_dict_cls.__qualname__!r} is closed, but 'extra' configuration " 

1487 "is set to `'allow'`. The 'extra' configuration value will be ignored.", 

1488 category=TypedDictExtraConfigWarning, 

1489 ) 

1490 elif not typing_objects.is_noextraitems(extra_items) and config_extra == 'forbid': 

1491 warnings.warn( 

1492 f"TypedDict class {typed_dict_cls.__qualname__!r} allows extra items, but 'extra' configuration " 

1493 "is set to `'forbid'`. The 'extra' configuration value will be ignored.", 

1494 category=TypedDictExtraConfigWarning, 

1495 ) 

1496 else: 

1497 extra_behavior = config_extra 

1498 

1499 td_schema = core_schema.typed_dict_schema( 

1500 fields, 

1501 cls=typed_dict_cls, 

1502 computed_fields=[ 

1503 self._computed_field_schema(d, decorators.field_serializers) 

1504 for d in decorators.computed_fields.values() 

1505 ], 

1506 extra_behavior=extra_behavior, 

1507 extras_schema=extras_schema, 

1508 ref=typed_dict_ref, 

1509 config=core_config, 

1510 ) 

1511 

1512 schema = self._apply_model_serializers(td_schema, decorators.model_serializers.values()) 

1513 schema = apply_model_validators(schema, decorators.model_validators.values(), 'all') 

1514 return self.defs.create_definition_reference_schema(schema) 

1515 

1516 def _namedtuple_schema(self, namedtuple_cls: Any, origin: Any) -> core_schema.CoreSchema: 

1517 """Generate schema for a NamedTuple.""" 

1518 with ( 

1519 self.model_type_stack.push(namedtuple_cls), 

1520 self.defs.get_schema_or_ref(namedtuple_cls) as ( 

1521 namedtuple_ref, 

1522 maybe_schema, 

1523 ), 

1524 ): 

1525 if maybe_schema is not None: 

1526 return maybe_schema 

1527 typevars_map = get_standard_typevars_map(namedtuple_cls) 

1528 if origin is not None: 

1529 namedtuple_cls = origin 

1530 

1531 try: 

1532 annotations = _typing_extra.get_cls_type_hints(namedtuple_cls, ns_resolver=self._ns_resolver) 

1533 except NameError as e: 

1534 raise PydanticUndefinedAnnotation.from_name_error(e) from e 

1535 if not annotations: 

1536 # annotations is empty, happens if namedtuple_cls defined via collections.namedtuple(...) 

1537 annotations: dict[str, Any] = dict.fromkeys(namedtuple_cls._fields, Any) 

1538 

1539 if typevars_map: 

1540 annotations = { 

1541 field_name: replace_types(annotation, typevars_map) 

1542 for field_name, annotation in annotations.items() 

1543 } 

1544 

1545 arguments_schema = core_schema.arguments_schema( 

1546 [ 

1547 self._generate_parameter_schema( 

1548 field_name, 

1549 annotation, 

1550 source=AnnotationSource.NAMED_TUPLE, 

1551 default=namedtuple_cls._field_defaults.get(field_name, Parameter.empty), 

1552 ) 

1553 for field_name, annotation in annotations.items() 

1554 ], 

1555 metadata={'pydantic_js_prefer_positional_arguments': True}, 

1556 ) 

1557 schema = core_schema.call_schema(arguments_schema, namedtuple_cls, ref=namedtuple_ref) 

1558 return self.defs.create_definition_reference_schema(schema) 

1559 

1560 def _generate_parameter_schema( 

1561 self, 

1562 name: str, 

1563 annotation: type[Any], 

1564 source: AnnotationSource, 

1565 default: Any = Parameter.empty, 

1566 mode: Literal['positional_only', 'positional_or_keyword', 'keyword_only'] | None = None, 

1567 ) -> core_schema.ArgumentsParameter: 

1568 """Generate the definition of a field in a namedtuple or a parameter in a function signature. 

1569 

1570 This definition is meant to be used for the `'arguments'` core schema, which will be replaced 

1571 in V3 by the `'arguments-v3`'. 

1572 """ 

1573 FieldInfo = import_cached_field_info() 

1574 

1575 if default is Parameter.empty: 

1576 field = FieldInfo.from_annotation(annotation, _source=source) 

1577 else: 

1578 field = FieldInfo.from_annotated_attribute(annotation, default, _source=source) 

1579 

1580 assert field.annotation is not None, 'field.annotation should not be None when generating a schema' 

1581 update_field_from_config(self._config_wrapper, name, field) 

1582 

1583 with self.field_name_stack.push(name): 

1584 schema = self._apply_annotations( 

1585 field.annotation, 

1586 [field], 

1587 # Because we pass `field` as metadata above (required for attributes relevant for 

1588 # JSON Scheme generation), we need to ignore the potential warnings about `FieldInfo` 

1589 # attributes that will not be used: 

1590 check_unsupported_field_info_attributes=False, 

1591 ) 

1592 

1593 if not field.is_required(): 

1594 schema = wrap_default(field, schema) 

1595 

1596 parameter_schema = core_schema.arguments_parameter( 

1597 name, 

1598 schema, 

1599 mode=mode, 

1600 alias=_convert_to_aliases(field.validation_alias), 

1601 ) 

1602 

1603 return parameter_schema 

1604 

1605 def _generate_parameter_v3_schema( 

1606 self, 

1607 name: str, 

1608 annotation: Any, 

1609 source: AnnotationSource, 

1610 mode: Literal[ 

1611 'positional_only', 

1612 'positional_or_keyword', 

1613 'keyword_only', 

1614 'var_args', 

1615 'var_kwargs_uniform', 

1616 'var_kwargs_unpacked_typed_dict', 

1617 ], 

1618 default: Any = Parameter.empty, 

1619 ) -> core_schema.ArgumentsV3Parameter: 

1620 """Generate the definition of a parameter in a function signature. 

1621 

1622 This definition is meant to be used for the `'arguments-v3'` core schema, which will replace 

1623 the `'arguments`' schema in V3. 

1624 """ 

1625 FieldInfo = import_cached_field_info() 

1626 

1627 if default is Parameter.empty: 

1628 field = FieldInfo.from_annotation(annotation, _source=source) 

1629 else: 

1630 field = FieldInfo.from_annotated_attribute(annotation, default, _source=source) 

1631 update_field_from_config(self._config_wrapper, name, field) 

1632 

1633 with self.field_name_stack.push(name): 

1634 schema = self._apply_annotations( 

1635 field.annotation, 

1636 [field], 

1637 # Because we pass `field` as metadata above (required for attributes relevant for 

1638 # JSON Scheme generation), we need to ignore the potential warnings about `FieldInfo` 

1639 # attributes that will not be used: 

1640 check_unsupported_field_info_attributes=False, 

1641 ) 

1642 

1643 if not field.is_required(): 

1644 schema = wrap_default(field, schema) 

1645 

1646 parameter_schema = core_schema.arguments_v3_parameter( 

1647 name=name, 

1648 schema=schema, 

1649 mode=mode, 

1650 alias=_convert_to_aliases(field.validation_alias), 

1651 ) 

1652 

1653 return parameter_schema 

1654 

1655 def _tuple_schema(self, tuple_type: Any) -> core_schema.CoreSchema: 

1656 """Generate schema for a Tuple, e.g. `tuple[int, str]` or `tuple[int, ...]`.""" 

1657 # TODO: do we really need to resolve type vars here? 

1658 typevars_map = get_standard_typevars_map(tuple_type) 

1659 params = self._get_args_resolving_forward_refs(tuple_type) 

1660 

1661 if typevars_map and params: 

1662 params = tuple(replace_types(param, typevars_map) for param in params) 

1663 

1664 # NOTE: subtle difference: `tuple[()]` gives `params=()`, whereas `typing.Tuple[()]` gives `params=((),)` 

1665 # This is only true for <3.11, on Python 3.11+ `typing.Tuple[()]` gives `params=()` 

1666 if not params: 

1667 if tuple_type in TUPLE_TYPES: 

1668 return core_schema.tuple_schema([core_schema.any_schema()], variadic_item_index=0) 

1669 else: 

1670 # special case for `tuple[()]` which means `tuple[]` - an empty tuple 

1671 return core_schema.tuple_schema([]) 

1672 elif params[-1] is Ellipsis: 

1673 if len(params) == 2: 

1674 return core_schema.tuple_schema([self.generate_schema(params[0])], variadic_item_index=0) 

1675 else: 

1676 # TODO: something like https://github.com/pydantic/pydantic/issues/5952 

1677 raise ValueError('Variable tuples can only have one type') 

1678 elif len(params) == 1 and params[0] == (): 

1679 # special case for `tuple[()]` which means `tuple[]` - an empty tuple 

1680 # NOTE: This conditional can be removed when we drop support for Python 3.10. 

1681 return core_schema.tuple_schema([]) 

1682 else: 

1683 return core_schema.tuple_schema([self.generate_schema(param) for param in params]) 

1684 

1685 def _type_schema(self) -> core_schema.CoreSchema: 

1686 return core_schema.custom_error_schema( 

1687 core_schema.is_instance_schema(type), 

1688 custom_error_type='is_type', 

1689 custom_error_message='Input should be a type', 

1690 ) 

1691 

1692 def _zoneinfo_schema(self) -> core_schema.CoreSchema: 

1693 """Generate schema for a zone_info.ZoneInfo object""" 

1694 from ._validators import validate_str_is_valid_iana_tz 

1695 

1696 metadata = {'pydantic_js_functions': [lambda _1, _2: {'type': 'string', 'format': 'zoneinfo'}]} 

1697 return core_schema.no_info_plain_validator_function( 

1698 validate_str_is_valid_iana_tz, 

1699 serialization=core_schema.to_string_ser_schema(), 

1700 metadata=metadata, 

1701 ) 

1702 

1703 def _union_is_subclass_schema(self, union_type: Any) -> core_schema.CoreSchema: 

1704 """Generate schema for `type[Union[X, ...]]`.""" 

1705 args = self._get_args_resolving_forward_refs(union_type, required=True) 

1706 return core_schema.union_schema([self.generate_schema(type[args]) for args in args]) 

1707 

1708 def _subclass_schema(self, type_: Any) -> core_schema.CoreSchema: 

1709 """Generate schema for a type, e.g. `type[int]`.""" 

1710 type_param = self._get_first_arg_or_any(type_) 

1711 

1712 # Assume `type[Annotated[<typ>, ...]]` is equivalent to `type[<typ>]`: 

1713 type_param = _typing_extra.annotated_type(type_param) or type_param 

1714 

1715 if typing_objects.is_any(type_param): 

1716 return self._type_schema() 

1717 elif typing_objects.is_typealiastype(type_param): 

1718 return self.generate_schema(type[type_param.__value__]) 

1719 elif typing_objects.is_typevar(type_param): 

1720 if type_param.__bound__: 

1721 if is_union_origin(get_origin(type_param.__bound__)): 

1722 return self._union_is_subclass_schema(type_param.__bound__) 

1723 return core_schema.is_subclass_schema(type_param.__bound__) 

1724 elif type_param.__constraints__: 

1725 return core_schema.union_schema([self.generate_schema(type[c]) for c in type_param.__constraints__]) 

1726 else: 

1727 return self._type_schema() 

1728 elif is_union_origin(get_origin(type_param)): 

1729 return self._union_is_subclass_schema(type_param) 

1730 else: 

1731 if typing_objects.is_self(type_param): 

1732 type_param = self._resolve_self_type(type_param) 

1733 if _typing_extra.is_generic_alias(type_param): 

1734 raise PydanticUserError( 

1735 'Subscripting `type[]` with an already parametrized type is not supported. ' 

1736 f'Instead of using type[{type_param!r}], use type[{_repr.display_as_type(get_origin(type_param))}].', 

1737 code=None, 

1738 ) 

1739 if not inspect.isclass(type_param): 

1740 # when using type[None], this doesn't type convert to type[NoneType], and None isn't a class 

1741 # so we handle it manually here 

1742 if type_param is None: 

1743 return core_schema.is_subclass_schema(_typing_extra.NoneType) 

1744 raise TypeError(f'Expected a class, got {type_param!r}') 

1745 return core_schema.is_subclass_schema(type_param) 

1746 

1747 def _sequence_schema(self, items_type: Any) -> core_schema.CoreSchema: 

1748 """Generate schema for a Sequence, e.g. `Sequence[int]`.""" 

1749 from ._serializers import serialize_sequence_via_list 

1750 

1751 item_type_schema = self.generate_schema(items_type) 

1752 list_schema = core_schema.list_schema(item_type_schema) 

1753 

1754 json_schema = smart_deepcopy(list_schema) 

1755 python_schema = core_schema.is_instance_schema(typing.Sequence, cls_repr='Sequence') 

1756 if not typing_objects.is_any(items_type): 

1757 from ._validators import sequence_validator 

1758 

1759 python_schema = core_schema.chain_schema( 

1760 [python_schema, core_schema.no_info_wrap_validator_function(sequence_validator, list_schema)], 

1761 ) 

1762 

1763 serialization = core_schema.wrap_serializer_function_ser_schema( 

1764 serialize_sequence_via_list, schema=item_type_schema, info_arg=True 

1765 ) 

1766 return core_schema.json_or_python_schema( 

1767 json_schema=json_schema, python_schema=python_schema, serialization=serialization 

1768 ) 

1769 

1770 def _iterable_schema(self, type_: Any) -> core_schema.GeneratorSchema: 

1771 """Generate a schema for an `Iterable`.""" 

1772 item_type = self._get_first_arg_or_any(type_) 

1773 

1774 return core_schema.generator_schema(self.generate_schema(item_type)) 

1775 

1776 def _pattern_schema(self, pattern_type: Any) -> core_schema.CoreSchema: 

1777 from . import _validators 

1778 

1779 metadata = {'pydantic_js_functions': [lambda _1, _2: {'type': 'string', 'format': 'regex'}]} 

1780 ser = core_schema.plain_serializer_function_ser_schema( 

1781 attrgetter('pattern'), when_used='json', return_schema=core_schema.str_schema() 

1782 ) 

1783 if pattern_type is typing.Pattern or pattern_type is re.Pattern: 

1784 # bare type 

1785 return core_schema.no_info_plain_validator_function( 

1786 _validators.pattern_either_validator, serialization=ser, metadata=metadata 

1787 ) 

1788 

1789 param = self._get_args_resolving_forward_refs( 

1790 pattern_type, 

1791 required=True, 

1792 )[0] 

1793 if param is str: 

1794 return core_schema.no_info_plain_validator_function( 

1795 _validators.pattern_str_validator, serialization=ser, metadata=metadata 

1796 ) 

1797 elif param is bytes: 

1798 return core_schema.no_info_plain_validator_function( 

1799 _validators.pattern_bytes_validator, serialization=ser, metadata=metadata 

1800 ) 

1801 else: 

1802 raise PydanticSchemaGenerationError(f'Unable to generate pydantic-core schema for {pattern_type!r}.') 

1803 

1804 def _hashable_schema(self) -> core_schema.CoreSchema: 

1805 return core_schema.custom_error_schema( 

1806 schema=core_schema.json_or_python_schema( 

1807 json_schema=core_schema.chain_schema( 

1808 [core_schema.any_schema(), core_schema.is_instance_schema(collections.abc.Hashable)] 

1809 ), 

1810 python_schema=core_schema.is_instance_schema(collections.abc.Hashable), 

1811 ), 

1812 custom_error_type='is_hashable', 

1813 custom_error_message='Input should be hashable', 

1814 ) 

1815 

1816 def _dataclass_schema( 

1817 self, dataclass: type[StandardDataclass], origin: type[StandardDataclass] | None 

1818 ) -> core_schema.CoreSchema: 

1819 """Generate schema for a dataclass.""" 

1820 with ( 

1821 self.model_type_stack.push(dataclass), 

1822 self.defs.get_schema_or_ref(dataclass) as ( 

1823 dataclass_ref, 

1824 maybe_schema, 

1825 ), 

1826 ): 

1827 if maybe_schema is not None: 

1828 return maybe_schema 

1829 

1830 schema = dataclass.__dict__.get('__pydantic_core_schema__') 

1831 if schema is not None and not isinstance(schema, MockCoreSchema): 

1832 if schema['type'] == 'definitions': 

1833 schema = self.defs.unpack_definitions(schema) 

1834 ref = get_ref(schema) 

1835 if ref: 

1836 return self.defs.create_definition_reference_schema(schema) 

1837 else: 

1838 return schema 

1839 

1840 typevars_map = get_standard_typevars_map(dataclass) 

1841 if origin is not None: 

1842 dataclass = origin 

1843 

1844 # if (plain) dataclass doesn't have config, we use the parent's config, hence a default of `None` 

1845 # (Pydantic dataclasses have an empty dict config by default). 

1846 # see https://github.com/pydantic/pydantic/issues/10917 

1847 config = getattr(dataclass, '__pydantic_config__', None) 

1848 

1849 from ..dataclasses import is_pydantic_dataclass 

1850 

1851 with self._ns_resolver.push(dataclass), self._config_wrapper_stack.push(config): 

1852 if is_pydantic_dataclass(dataclass): 

1853 if dataclass.__pydantic_fields_complete__(): 

1854 # Copy the field info instances to avoid mutating the `FieldInfo` instances 

1855 # of the generic dataclass generic origin (e.g. `apply_typevars_map` below). 

1856 # Note that we don't apply `deepcopy` on `__pydantic_fields__` because we 

1857 # don't want to copy the `FieldInfo` attributes: 

1858 fields = { 

1859 f_name: copy(field_info) for f_name, field_info in dataclass.__pydantic_fields__.items() 

1860 } 

1861 if typevars_map: 

1862 for field in fields.values(): 

1863 field.apply_typevars_map(typevars_map, *self._types_namespace) 

1864 else: 

1865 try: 

1866 fields = rebuild_dataclass_fields( 

1867 dataclass, 

1868 config_wrapper=self._config_wrapper, 

1869 ns_resolver=self._ns_resolver, 

1870 typevars_map=typevars_map or {}, 

1871 ) 

1872 except NameError as e: 

1873 raise PydanticUndefinedAnnotation.from_name_error(e) from e 

1874 else: 

1875 fields = collect_dataclass_fields( 

1876 dataclass, 

1877 typevars_map=typevars_map, 

1878 config_wrapper=self._config_wrapper, 

1879 ) 

1880 

1881 if self._config_wrapper.extra == 'allow': 

1882 # disallow combination of init=False on a dataclass field and extra='allow' on a dataclass 

1883 for field_name, field in fields.items(): 

1884 if field.init is False: 

1885 raise PydanticUserError( 

1886 f'Field {field_name} has `init=False` and dataclass has config setting `extra="allow"`. ' 

1887 f'This combination is not allowed.', 

1888 code='dataclass-init-false-extra-allow', 

1889 ) 

1890 

1891 decorators = dataclass.__dict__.get('__pydantic_decorators__') 

1892 if decorators is None: 

1893 decorators = DecoratorInfos.build(dataclass) 

1894 decorators.update_from_config(self._config_wrapper) 

1895 # Move kw_only=False args to the start of the list, as this is how vanilla dataclasses work. 

1896 # Note that when kw_only is missing or None, it is treated as equivalent to kw_only=True 

1897 args = sorted( 

1898 (self._generate_dc_field_schema(k, v, decorators) for k, v in fields.items()), 

1899 key=lambda a: a.get('kw_only') is not False, 

1900 ) 

1901 has_post_init = hasattr(dataclass, '__post_init__') 

1902 has_slots = hasattr(dataclass, '__slots__') 

1903 

1904 args_schema = core_schema.dataclass_args_schema( 

1905 dataclass.__name__, 

1906 args, 

1907 computed_fields=[ 

1908 self._computed_field_schema(d, decorators.field_serializers) 

1909 for d in decorators.computed_fields.values() 

1910 ], 

1911 collect_init_only=has_post_init, 

1912 ) 

1913 

1914 inner_schema = apply_validators(args_schema, decorators.root_validators.values()) 

1915 

1916 model_validators = decorators.model_validators.values() 

1917 inner_schema = apply_model_validators(inner_schema, model_validators, 'inner') 

1918 

1919 core_config = self._config_wrapper.core_config(title=dataclass.__name__) 

1920 

1921 dc_schema = core_schema.dataclass_schema( 

1922 dataclass, 

1923 inner_schema, 

1924 generic_origin=origin, 

1925 post_init=has_post_init, 

1926 ref=dataclass_ref, 

1927 fields=[field.name for field in dataclasses.fields(dataclass)], 

1928 slots=has_slots, 

1929 config=core_config, 

1930 # we don't use a custom __setattr__ for dataclasses, so we must 

1931 # pass along the frozen config setting to the pydantic-core schema 

1932 frozen=self._config_wrapper_stack.tail.frozen, 

1933 ) 

1934 schema = self._apply_model_serializers(dc_schema, decorators.model_serializers.values()) 

1935 schema = apply_model_validators(schema, model_validators, 'outer') 

1936 return self.defs.create_definition_reference_schema(schema) 

1937 

1938 def _call_schema(self, function: ValidateCallSupportedTypes) -> core_schema.CallSchema: 

1939 """Generate schema for a Callable. 

1940 

1941 TODO support functional validators once we support them in Config 

1942 """ 

1943 arguments_schema = self._arguments_schema(function) 

1944 

1945 return_schema: core_schema.CoreSchema | None = None 

1946 config_wrapper = self._config_wrapper 

1947 if config_wrapper.validate_return: 

1948 sig = signature(function) 

1949 return_hint = sig.return_annotation 

1950 if return_hint is not sig.empty: 

1951 globalns, localns = self._types_namespace 

1952 type_hints = _typing_extra.get_function_type_hints( 

1953 function, globalns=globalns, localns=localns, include_keys={'return'} 

1954 ) 

1955 return_schema = self.generate_schema(type_hints['return']) 

1956 

1957 return core_schema.call_schema( 

1958 arguments_schema, 

1959 function, 

1960 return_schema=return_schema, 

1961 ) 

1962 

1963 def _arguments_schema( 

1964 self, function: ValidateCallSupportedTypes, parameters_callback: ParametersCallback | None = None 

1965 ) -> core_schema.ArgumentsSchema: 

1966 """Generate schema for a Signature.""" 

1967 mode_lookup: dict[_ParameterKind, Literal['positional_only', 'positional_or_keyword', 'keyword_only']] = { 

1968 Parameter.POSITIONAL_ONLY: 'positional_only', 

1969 Parameter.POSITIONAL_OR_KEYWORD: 'positional_or_keyword', 

1970 Parameter.KEYWORD_ONLY: 'keyword_only', 

1971 } 

1972 

1973 sig = signature(function) 

1974 globalns, localns = self._types_namespace 

1975 type_hints = _typing_extra.get_function_type_hints(function, globalns=globalns, localns=localns) 

1976 

1977 arguments_list: list[core_schema.ArgumentsParameter] = [] 

1978 var_args_schema: core_schema.CoreSchema | None = None 

1979 var_kwargs_schema: core_schema.CoreSchema | None = None 

1980 var_kwargs_mode: core_schema.VarKwargsMode | None = None 

1981 

1982 for i, (name, p) in enumerate(sig.parameters.items()): 

1983 if p.annotation is sig.empty: 

1984 annotation = typing.cast(Any, Any) 

1985 else: 

1986 annotation = type_hints[name] 

1987 

1988 if parameters_callback is not None: 

1989 result = parameters_callback(i, name, annotation) 

1990 if result == 'skip': 

1991 continue 

1992 

1993 parameter_mode = mode_lookup.get(p.kind) 

1994 if parameter_mode is not None: 

1995 arg_schema = self._generate_parameter_schema( 

1996 name, annotation, AnnotationSource.FUNCTION, p.default, parameter_mode 

1997 ) 

1998 arguments_list.append(arg_schema) 

1999 elif p.kind == Parameter.VAR_POSITIONAL: 

2000 var_args_schema = self.generate_schema(annotation) 

2001 else: 

2002 assert p.kind == Parameter.VAR_KEYWORD, p.kind 

2003 

2004 unpack_type = _typing_extra.unpack_type(annotation) 

2005 if unpack_type is not None: 

2006 origin = get_origin(unpack_type) or unpack_type 

2007 if not is_typeddict(origin): 

2008 raise PydanticUserError( 

2009 f'Expected a `TypedDict` class inside `Unpack[...]`, got {unpack_type!r}', 

2010 code='unpack-typed-dict', 

2011 ) 

2012 non_pos_only_param_names = { 

2013 name for name, p in sig.parameters.items() if p.kind != Parameter.POSITIONAL_ONLY 

2014 } 

2015 overlapping_params = non_pos_only_param_names.intersection(origin.__annotations__) 

2016 if overlapping_params: 

2017 raise PydanticUserError( 

2018 f'Typed dictionary {origin.__name__!r} overlaps with parameter' 

2019 f'{"s" if len(overlapping_params) >= 2 else ""} ' 

2020 f'{", ".join(repr(p) for p in sorted(overlapping_params))}', 

2021 code='overlapping-unpack-typed-dict', 

2022 ) 

2023 

2024 var_kwargs_mode = 'unpacked-typed-dict' 

2025 var_kwargs_schema = self._typed_dict_schema(unpack_type, get_origin(unpack_type)) 

2026 else: 

2027 var_kwargs_mode = 'uniform' 

2028 var_kwargs_schema = self.generate_schema(annotation) 

2029 

2030 return core_schema.arguments_schema( 

2031 arguments_list, 

2032 var_args_schema=var_args_schema, 

2033 var_kwargs_mode=var_kwargs_mode, 

2034 var_kwargs_schema=var_kwargs_schema, 

2035 validate_by_name=self._config_wrapper.validate_by_name, 

2036 ) 

2037 

2038 def _arguments_v3_schema( 

2039 self, function: ValidateCallSupportedTypes, parameters_callback: ParametersCallback | None = None 

2040 ) -> core_schema.ArgumentsV3Schema: 

2041 mode_lookup: dict[ 

2042 _ParameterKind, Literal['positional_only', 'positional_or_keyword', 'var_args', 'keyword_only'] 

2043 ] = { 

2044 Parameter.POSITIONAL_ONLY: 'positional_only', 

2045 Parameter.POSITIONAL_OR_KEYWORD: 'positional_or_keyword', 

2046 Parameter.VAR_POSITIONAL: 'var_args', 

2047 Parameter.KEYWORD_ONLY: 'keyword_only', 

2048 } 

2049 

2050 sig = signature(function) 

2051 globalns, localns = self._types_namespace 

2052 type_hints = _typing_extra.get_function_type_hints(function, globalns=globalns, localns=localns) 

2053 

2054 parameters_list: list[core_schema.ArgumentsV3Parameter] = [] 

2055 

2056 for i, (name, p) in enumerate(sig.parameters.items()): 

2057 if parameters_callback is not None: 

2058 result = parameters_callback(i, name, p.annotation) 

2059 if result == 'skip': 

2060 continue 

2061 

2062 if p.annotation is Parameter.empty: 

2063 annotation = typing.cast(Any, Any) 

2064 else: 

2065 annotation = type_hints[name] 

2066 

2067 parameter_mode = mode_lookup.get(p.kind) 

2068 if parameter_mode is None: 

2069 assert p.kind == Parameter.VAR_KEYWORD, p.kind 

2070 

2071 unpack_type = _typing_extra.unpack_type(annotation) 

2072 if unpack_type is not None: 

2073 origin = get_origin(unpack_type) or unpack_type 

2074 if not is_typeddict(origin): 

2075 raise PydanticUserError( 

2076 f'Expected a `TypedDict` class inside `Unpack[...]`, got {unpack_type!r}', 

2077 code='unpack-typed-dict', 

2078 ) 

2079 non_pos_only_param_names = { 

2080 name for name, p in sig.parameters.items() if p.kind != Parameter.POSITIONAL_ONLY 

2081 } 

2082 overlapping_params = non_pos_only_param_names.intersection(origin.__annotations__) 

2083 if overlapping_params: 

2084 raise PydanticUserError( 

2085 f'Typed dictionary {origin.__name__!r} overlaps with parameter' 

2086 f'{"s" if len(overlapping_params) >= 2 else ""} ' 

2087 f'{", ".join(repr(p) for p in sorted(overlapping_params))}', 

2088 code='overlapping-unpack-typed-dict', 

2089 ) 

2090 parameter_mode = 'var_kwargs_unpacked_typed_dict' 

2091 annotation = unpack_type 

2092 else: 

2093 parameter_mode = 'var_kwargs_uniform' 

2094 

2095 parameters_list.append( 

2096 self._generate_parameter_v3_schema( 

2097 name, annotation, AnnotationSource.FUNCTION, parameter_mode, default=p.default 

2098 ) 

2099 ) 

2100 

2101 return core_schema.arguments_v3_schema( 

2102 parameters_list, 

2103 validate_by_name=self._config_wrapper.validate_by_name, 

2104 ) 

2105 

2106 def _unsubstituted_typevar_schema(self, typevar: typing.TypeVar) -> core_schema.CoreSchema: 

2107 try: 

2108 has_default = typevar.has_default() # pyright: ignore[reportAttributeAccessIssue] 

2109 except AttributeError: 

2110 # Happens if using `typing.TypeVar` (and not `typing_extensions`) on Python < 3.13 

2111 pass 

2112 else: 

2113 if has_default: 

2114 return self.generate_schema(typevar.__default__) # pyright: ignore[reportAttributeAccessIssue] 

2115 

2116 if constraints := typevar.__constraints__: 

2117 return self._union_schema(typing.Union[constraints]) 

2118 

2119 if bound := typevar.__bound__: 

2120 schema = self.generate_schema(bound) 

2121 schema['serialization'] = core_schema.simple_ser_schema('any') 

2122 return schema 

2123 

2124 return core_schema.any_schema() 

2125 

2126 def _computed_field_schema( 

2127 self, 

2128 d: Decorator[ComputedFieldInfo], 

2129 field_serializers: dict[str, Decorator[FieldSerializerDecoratorInfo]], 

2130 ) -> core_schema.ComputedField: 

2131 if d.info.return_type is not PydanticUndefined: 

2132 return_type = d.info.return_type 

2133 else: 

2134 try: 

2135 # Do not pass in globals as the function could be defined in a different module. 

2136 # Instead, let `get_callable_return_type` infer the globals to use, but still pass 

2137 # in locals that may contain a parent/rebuild namespace: 

2138 return_type = _decorators.get_callable_return_type(d.func, localns=self._types_namespace.locals) 

2139 except NameError as e: 

2140 raise PydanticUndefinedAnnotation.from_name_error(e) from e 

2141 if return_type is PydanticUndefined: 

2142 raise PydanticUserError( 

2143 'Computed field is missing return type annotation or specifying `return_type`' 

2144 ' to the `@computed_field` decorator (e.g. `@computed_field(return_type=int | str)`)', 

2145 code='model-field-missing-annotation', 

2146 ) 

2147 

2148 return_type = replace_types(return_type, self._typevars_map) 

2149 # Create a new ComputedFieldInfo so that different type parametrizations of the same 

2150 # generic model's computed field can have different return types. 

2151 d.info = dataclasses.replace(d.info, return_type=return_type) 

2152 return_type_schema = self.generate_schema(return_type) 

2153 # Apply serializers to computed field if there exist 

2154 return_type_schema = self._apply_field_serializers( 

2155 return_type_schema, 

2156 filter_field_decorator_info_by_field(field_serializers.values(), d.cls_var_name), 

2157 ) 

2158 

2159 pydantic_js_updates, pydantic_js_extra = _extract_json_schema_info_from_field_info(d.info) 

2160 core_metadata: dict[str, Any] = {} 

2161 update_core_metadata( 

2162 core_metadata, 

2163 pydantic_js_updates={'readOnly': True, **(pydantic_js_updates if pydantic_js_updates else {})}, 

2164 pydantic_js_extra=pydantic_js_extra, 

2165 ) 

2166 return core_schema.computed_field( 

2167 d.cls_var_name, return_schema=return_type_schema, alias=d.info.alias, metadata=core_metadata 

2168 ) 

2169 

2170 def _annotated_schema(self, annotated_type: Any) -> core_schema.CoreSchema: 

2171 """Generate schema for an Annotated type, e.g. `Annotated[int, Field(...)]` or `Annotated[int, Gt(0)]`.""" 

2172 FieldInfo = import_cached_field_info() 

2173 source_type, *annotations = self._get_args_resolving_forward_refs( 

2174 annotated_type, 

2175 required=True, 

2176 ) 

2177 schema = self._apply_annotations(source_type, annotations) 

2178 # put the default validator last so that TypeAdapter.get_default_value() works 

2179 # even if there are function validators involved 

2180 for annotation in annotations: 

2181 if isinstance(annotation, FieldInfo): 

2182 schema = wrap_default(annotation, schema) 

2183 return schema 

2184 

2185 def _apply_annotations( 

2186 self, 

2187 source_type: Any, 

2188 annotations: list[Any], 

2189 transform_inner_schema: Callable[[CoreSchema], CoreSchema] = lambda x: x, 

2190 check_unsupported_field_info_attributes: bool = True, 

2191 ) -> CoreSchema: 

2192 """Apply arguments from `Annotated` or from `FieldInfo` to a schema. 

2193 

2194 This gets called by `GenerateSchema._annotated_schema` but differs from it in that it does 

2195 not expect `source_type` to be an `Annotated` object, it expects it to be the first argument of that 

2196 (in other words, `GenerateSchema._annotated_schema` just unpacks `Annotated`, this process it). 

2197 """ 

2198 annotations = list(_known_annotated_metadata.expand_grouped_metadata(annotations)) 

2199 

2200 pydantic_js_annotation_functions: list[GetJsonSchemaFunction] = [] 

2201 

2202 def inner_handler(obj: Any) -> CoreSchema: 

2203 schema = self._generate_schema_from_get_schema_method(obj, source_type) 

2204 

2205 if schema is None: 

2206 schema = self._generate_schema_inner(obj) 

2207 

2208 metadata_js_function = _extract_get_pydantic_json_schema(obj) 

2209 if metadata_js_function is not None: 

2210 metadata_schema = resolve_original_schema(schema, self.defs) 

2211 if metadata_schema is not None: 

2212 self._add_js_function(metadata_schema, metadata_js_function) 

2213 return transform_inner_schema(schema) 

2214 

2215 get_inner_schema = CallbackGetCoreSchemaHandler(inner_handler, self) 

2216 

2217 for annotation in annotations: 

2218 if annotation is None: 

2219 continue 

2220 get_inner_schema = self._get_wrapped_inner_schema( 

2221 get_inner_schema, 

2222 annotation, 

2223 pydantic_js_annotation_functions, 

2224 check_unsupported_field_info_attributes=check_unsupported_field_info_attributes, 

2225 ) 

2226 

2227 schema = get_inner_schema(source_type) 

2228 if pydantic_js_annotation_functions: 

2229 core_metadata = schema.setdefault('metadata', {}) 

2230 update_core_metadata(core_metadata, pydantic_js_annotation_functions=pydantic_js_annotation_functions) 

2231 return _add_custom_serialization_from_json_encoders(self._config_wrapper.json_encoders, source_type, schema) 

2232 

2233 def _apply_single_annotation( 

2234 self, 

2235 schema: core_schema.CoreSchema, 

2236 metadata: Any, 

2237 check_unsupported_field_info_attributes: bool = True, 

2238 ) -> core_schema.CoreSchema: 

2239 FieldInfo = import_cached_field_info() 

2240 

2241 if isinstance(metadata, FieldInfo): 

2242 if ( 

2243 check_unsupported_field_info_attributes 

2244 # HACK: we don't want to emit the warning for `FieldInfo` subclasses, because FastAPI does weird manipulations 

2245 # with its subclasses and their annotations: 

2246 and type(metadata) is FieldInfo 

2247 ): 

2248 for attr, value in (unsupported_attributes := self._get_unsupported_field_info_attributes(metadata)): 

2249 warnings.warn( 

2250 f'The {attr!r} attribute with value {value!r} was provided to the `Field()` function, ' 

2251 f'which has no effect in the context it was used. {attr!r} is field-specific metadata, ' 

2252 'and can only be attached to a model field using `Annotated` metadata or by assignment. ' 

2253 'This may have happened because an `Annotated` type alias using the `type` statement was ' 

2254 'used, or if the `Field()` function was attached to a single member of a union type.', 

2255 category=UnsupportedFieldAttributeWarning, 

2256 ) 

2257 

2258 if ( 

2259 metadata.default_factory_takes_validated_data 

2260 and self.model_type_stack.get() is None 

2261 and 'defaut_factory' not in unsupported_attributes 

2262 ): 

2263 warnings.warn( 

2264 "A 'default_factory' taking validated data as an argument was provided to the `Field()` function, " 

2265 'but no validated data is available in the context it was used.', 

2266 category=UnsupportedFieldAttributeWarning, 

2267 ) 

2268 

2269 for field_metadata in metadata.metadata: 

2270 schema = self._apply_single_annotation(schema, field_metadata) 

2271 

2272 if metadata.discriminator is not None: 

2273 schema = self._apply_discriminator_to_union(schema, metadata.discriminator) 

2274 return schema 

2275 

2276 if schema['type'] == 'nullable': 

2277 # for nullable schemas, metadata is automatically applied to the inner schema 

2278 inner = schema.get('schema', core_schema.any_schema()) 

2279 inner = self._apply_single_annotation(inner, metadata) 

2280 if inner: 

2281 schema['schema'] = inner 

2282 return schema 

2283 

2284 original_schema = schema 

2285 ref = schema.get('ref') 

2286 if ref is not None: 

2287 schema = schema.copy() 

2288 new_ref = ref + f'_{repr(metadata)}' 

2289 if (existing := self.defs.get_schema_from_ref(new_ref)) is not None: 

2290 return existing 

2291 schema['ref'] = new_ref # pyright: ignore[reportGeneralTypeIssues] 

2292 elif schema['type'] == 'definition-ref': 

2293 ref = schema['schema_ref'] 

2294 if (referenced_schema := self.defs.get_schema_from_ref(ref)) is not None: 

2295 schema = referenced_schema.copy() 

2296 new_ref = ref + f'_{repr(metadata)}' 

2297 if (existing := self.defs.get_schema_from_ref(new_ref)) is not None: 

2298 return existing 

2299 schema['ref'] = new_ref # pyright: ignore[reportGeneralTypeIssues] 

2300 

2301 maybe_updated_schema = _known_annotated_metadata.apply_known_metadata(metadata, schema) 

2302 

2303 if maybe_updated_schema is not None: 

2304 return maybe_updated_schema 

2305 return original_schema 

2306 

2307 def _apply_single_annotation_json_schema( 

2308 self, schema: core_schema.CoreSchema, metadata: Any 

2309 ) -> core_schema.CoreSchema: 

2310 FieldInfo = import_cached_field_info() 

2311 

2312 if isinstance(metadata, FieldInfo): 

2313 for field_metadata in metadata.metadata: 

2314 schema = self._apply_single_annotation_json_schema(schema, field_metadata) 

2315 

2316 pydantic_js_updates, pydantic_js_extra = _extract_json_schema_info_from_field_info(metadata) 

2317 core_metadata = schema.setdefault('metadata', {}) 

2318 update_core_metadata( 

2319 core_metadata, pydantic_js_updates=pydantic_js_updates, pydantic_js_extra=pydantic_js_extra 

2320 ) 

2321 return schema 

2322 

2323 def _get_unsupported_field_info_attributes(self, field_info: FieldInfo) -> list[tuple[str, Any]]: 

2324 """Get the list of unsupported `FieldInfo` attributes when not directly used in `Annotated` for field annotations.""" 

2325 unused_metadata: list[tuple[str, Any]] = [] 

2326 for unused_metadata_name, unset_value in UNSUPPORTED_STANDALONE_FIELDINFO_ATTRIBUTES: 

2327 if ( 

2328 (unused_metadata_value := getattr(field_info, unused_metadata_name)) is not unset_value 

2329 # `default` and `default_factory` can still be used with a type adapter, so only include them 

2330 # if used with a model-like class: 

2331 and ( 

2332 unused_metadata_name not in ('default', 'default_factory') 

2333 or self.model_type_stack.get() is not None 

2334 ) 

2335 # Setting `alias` will set `validation/serialization_alias` as well, so we want to avoid duplicate warnings: 

2336 and ( 

2337 unused_metadata_name not in ('validation_alias', 'serialization_alias') 

2338 or 'alias' not in field_info._attributes_set 

2339 ) 

2340 ): 

2341 unused_metadata.append((unused_metadata_name, unused_metadata_value)) 

2342 

2343 return unused_metadata 

2344 

2345 def _get_wrapped_inner_schema( 

2346 self, 

2347 get_inner_schema: GetCoreSchemaHandler, 

2348 annotation: Any, 

2349 pydantic_js_annotation_functions: list[GetJsonSchemaFunction], 

2350 check_unsupported_field_info_attributes: bool = False, 

2351 ) -> CallbackGetCoreSchemaHandler: 

2352 annotation_get_schema: GetCoreSchemaFunction | None = getattr(annotation, '__get_pydantic_core_schema__', None) 

2353 

2354 def new_handler(source: Any) -> core_schema.CoreSchema: 

2355 if annotation_get_schema is not None: 

2356 schema = annotation_get_schema(source, get_inner_schema) 

2357 else: 

2358 schema = get_inner_schema(source) 

2359 schema = self._apply_single_annotation( 

2360 schema, 

2361 annotation, 

2362 check_unsupported_field_info_attributes=check_unsupported_field_info_attributes, 

2363 ) 

2364 schema = self._apply_single_annotation_json_schema(schema, annotation) 

2365 

2366 metadata_js_function = _extract_get_pydantic_json_schema(annotation) 

2367 if metadata_js_function is not None: 

2368 pydantic_js_annotation_functions.append(metadata_js_function) 

2369 return schema 

2370 

2371 return CallbackGetCoreSchemaHandler(new_handler, self) 

2372 

2373 def _apply_field_serializers( 

2374 self, 

2375 schema: core_schema.CoreSchema, 

2376 serializers: list[Decorator[FieldSerializerDecoratorInfo]], 

2377 ) -> core_schema.CoreSchema: 

2378 """Apply field serializers to a schema.""" 

2379 if serializers: 

2380 schema = copy(schema) 

2381 if schema['type'] == 'definitions': 

2382 inner_schema = schema['schema'] 

2383 schema['schema'] = self._apply_field_serializers(inner_schema, serializers) 

2384 return schema 

2385 elif 'ref' in schema: 

2386 schema = self.defs.create_definition_reference_schema(schema) 

2387 

2388 # use the last serializer to make it easy to override a serializer set on a parent model 

2389 serializer = serializers[-1] 

2390 is_field_serializer, info_arg = inspect_field_serializer(serializer.func, serializer.info.mode) 

2391 

2392 if serializer.info.return_type is not PydanticUndefined: 

2393 return_type = serializer.info.return_type 

2394 else: 

2395 try: 

2396 # Do not pass in globals as the function could be defined in a different module. 

2397 # Instead, let `get_callable_return_type` infer the globals to use, but still pass 

2398 # in locals that may contain a parent/rebuild namespace: 

2399 return_type = _decorators.get_callable_return_type( 

2400 serializer.func, localns=self._types_namespace.locals 

2401 ) 

2402 except NameError as e: 

2403 raise PydanticUndefinedAnnotation.from_name_error(e) from e 

2404 

2405 if return_type is PydanticUndefined: 

2406 return_schema = None 

2407 else: 

2408 return_schema = self.generate_schema(return_type) 

2409 

2410 if serializer.info.mode == 'wrap': 

2411 schema['serialization'] = core_schema.wrap_serializer_function_ser_schema( 

2412 serializer.func, 

2413 is_field_serializer=is_field_serializer, 

2414 info_arg=info_arg, 

2415 return_schema=return_schema, 

2416 when_used=serializer.info.when_used, 

2417 ) 

2418 else: 

2419 assert serializer.info.mode == 'plain' 

2420 schema['serialization'] = core_schema.plain_serializer_function_ser_schema( 

2421 serializer.func, 

2422 is_field_serializer=is_field_serializer, 

2423 info_arg=info_arg, 

2424 return_schema=return_schema, 

2425 when_used=serializer.info.when_used, 

2426 ) 

2427 return schema 

2428 

2429 def _apply_model_serializers( 

2430 self, schema: core_schema.CoreSchema, serializers: Iterable[Decorator[ModelSerializerDecoratorInfo]] 

2431 ) -> core_schema.CoreSchema: 

2432 """Apply model serializers to a schema.""" 

2433 ref: str | None = schema.pop('ref', None) # type: ignore 

2434 if serializers: 

2435 serializer = list(serializers)[-1] 

2436 info_arg = inspect_model_serializer(serializer.func, serializer.info.mode) 

2437 

2438 if serializer.info.return_type is not PydanticUndefined: 

2439 return_type = serializer.info.return_type 

2440 else: 

2441 try: 

2442 # Do not pass in globals as the function could be defined in a different module. 

2443 # Instead, let `get_callable_return_type` infer the globals to use, but still pass 

2444 # in locals that may contain a parent/rebuild namespace: 

2445 return_type = _decorators.get_callable_return_type( 

2446 serializer.func, localns=self._types_namespace.locals 

2447 ) 

2448 except NameError as e: 

2449 raise PydanticUndefinedAnnotation.from_name_error(e) from e 

2450 

2451 if return_type is PydanticUndefined: 

2452 return_schema = None 

2453 else: 

2454 return_schema = self.generate_schema(return_type) 

2455 

2456 if serializer.info.mode == 'wrap': 

2457 ser_schema: core_schema.SerSchema = core_schema.wrap_serializer_function_ser_schema( 

2458 serializer.func, 

2459 info_arg=info_arg, 

2460 return_schema=return_schema, 

2461 when_used=serializer.info.when_used, 

2462 ) 

2463 else: 

2464 # plain 

2465 ser_schema = core_schema.plain_serializer_function_ser_schema( 

2466 serializer.func, 

2467 info_arg=info_arg, 

2468 return_schema=return_schema, 

2469 when_used=serializer.info.when_used, 

2470 ) 

2471 schema['serialization'] = ser_schema 

2472 if ref: 

2473 schema['ref'] = ref # type: ignore 

2474 return schema 

2475 

2476 

2477_VALIDATOR_F_MATCH: Mapping[ 

2478 tuple[FieldValidatorModes, Literal['no-info', 'with-info']], 

2479 Callable[[Callable[..., Any], core_schema.CoreSchema], core_schema.CoreSchema], 

2480] = { 

2481 ('before', 'no-info'): lambda f, schema: core_schema.no_info_before_validator_function(f, schema), 

2482 ('after', 'no-info'): lambda f, schema: core_schema.no_info_after_validator_function(f, schema), 

2483 ('plain', 'no-info'): lambda f, _: core_schema.no_info_plain_validator_function(f), 

2484 ('wrap', 'no-info'): lambda f, schema: core_schema.no_info_wrap_validator_function(f, schema), 

2485 ('before', 'with-info'): lambda f, schema: core_schema.with_info_before_validator_function(f, schema), 

2486 ('after', 'with-info'): lambda f, schema: core_schema.with_info_after_validator_function(f, schema), 

2487 ('plain', 'with-info'): lambda f, _: core_schema.with_info_plain_validator_function(f), 

2488 ('wrap', 'with-info'): lambda f, schema: core_schema.with_info_wrap_validator_function(f, schema), 

2489} 

2490 

2491 

2492# TODO V3: this function is only used for deprecated decorators. It should 

2493# be removed once we drop support for those. 

2494def apply_validators( 

2495 schema: core_schema.CoreSchema, 

2496 validators: Iterable[Decorator[RootValidatorDecoratorInfo]] 

2497 | Iterable[Decorator[ValidatorDecoratorInfo]] 

2498 | Iterable[Decorator[FieldValidatorDecoratorInfo]], 

2499) -> core_schema.CoreSchema: 

2500 """Apply validators to a schema. 

2501 

2502 Args: 

2503 schema: The schema to apply validators on. 

2504 validators: An iterable of validators. 

2505 field_name: The name of the field if validators are being applied to a model field. 

2506 

2507 Returns: 

2508 The updated schema. 

2509 """ 

2510 for validator in validators: 

2511 info_arg = inspect_validator(validator.func, validator.info.mode) 

2512 val_type = 'with-info' if info_arg else 'no-info' 

2513 

2514 schema = _VALIDATOR_F_MATCH[(validator.info.mode, val_type)](validator.func, schema) 

2515 return schema 

2516 

2517 

2518def _validators_require_validate_default(validators: Iterable[Decorator[ValidatorDecoratorInfo]]) -> bool: 

2519 """In v1, if any of the validators for a field had `always=True`, the default value would be validated. 

2520 

2521 This serves as an auxiliary function for re-implementing that logic, by looping over a provided 

2522 collection of (v1-style) ValidatorDecoratorInfo's and checking if any of them have `always=True`. 

2523 

2524 We should be able to drop this function and the associated logic calling it once we drop support 

2525 for v1-style validator decorators. (Or we can extend it and keep it if we add something equivalent 

2526 to the v1-validator `always` kwarg to `field_validator`.) 

2527 """ 

2528 for validator in validators: 

2529 if validator.info.always: 

2530 return True 

2531 return False 

2532 

2533 

2534def _convert_to_aliases( 

2535 alias: str | AliasChoices | AliasPath | None, 

2536) -> str | list[str | int] | list[list[str | int]] | None: 

2537 if isinstance(alias, (AliasChoices, AliasPath)): 

2538 return alias.convert_to_aliases() 

2539 else: 

2540 return alias 

2541 

2542 

2543def apply_model_validators( 

2544 schema: core_schema.CoreSchema, 

2545 validators: Iterable[Decorator[ModelValidatorDecoratorInfo]], 

2546 mode: Literal['inner', 'outer', 'all'], 

2547) -> core_schema.CoreSchema: 

2548 """Apply model validators to a schema. 

2549 

2550 If mode == 'inner', only "before" validators are applied 

2551 If mode == 'outer', validators other than "before" are applied 

2552 If mode == 'all', all validators are applied 

2553 

2554 Args: 

2555 schema: The schema to apply validators on. 

2556 validators: An iterable of validators. 

2557 mode: The validator mode. 

2558 

2559 Returns: 

2560 The updated schema. 

2561 """ 

2562 ref: str | None = schema.pop('ref', None) # type: ignore 

2563 for validator in validators: 

2564 if mode == 'inner' and validator.info.mode != 'before': 

2565 continue 

2566 if mode == 'outer' and validator.info.mode == 'before': 

2567 continue 

2568 info_arg = inspect_validator(validator.func, validator.info.mode) 

2569 if validator.info.mode == 'wrap': 

2570 if info_arg: 

2571 schema = core_schema.with_info_wrap_validator_function(function=validator.func, schema=schema) 

2572 else: 

2573 schema = core_schema.no_info_wrap_validator_function(function=validator.func, schema=schema) 

2574 elif validator.info.mode == 'before': 

2575 if info_arg: 

2576 schema = core_schema.with_info_before_validator_function(function=validator.func, schema=schema) 

2577 else: 

2578 schema = core_schema.no_info_before_validator_function(function=validator.func, schema=schema) 

2579 else: 

2580 assert validator.info.mode == 'after' 

2581 if info_arg: 

2582 schema = core_schema.with_info_after_validator_function(function=validator.func, schema=schema) 

2583 else: 

2584 schema = core_schema.no_info_after_validator_function(function=validator.func, schema=schema) 

2585 if ref: 

2586 schema['ref'] = ref # type: ignore 

2587 return schema 

2588 

2589 

2590def wrap_default(field_info: FieldInfo, schema: core_schema.CoreSchema) -> core_schema.CoreSchema: 

2591 """Wrap schema with default schema if default value or `default_factory` are available. 

2592 

2593 Args: 

2594 field_info: The field info object. 

2595 schema: The schema to apply default on. 

2596 

2597 Returns: 

2598 Updated schema by default value or `default_factory`. 

2599 """ 

2600 if field_info.default_factory: 

2601 return core_schema.with_default_schema( 

2602 schema, 

2603 default_factory=field_info.default_factory, 

2604 default_factory_takes_data=takes_validated_data_argument(field_info.default_factory), 

2605 validate_default=field_info.validate_default, 

2606 ) 

2607 elif field_info.default is not PydanticUndefined: 

2608 return core_schema.with_default_schema( 

2609 schema, default=field_info.default, validate_default=field_info.validate_default 

2610 ) 

2611 else: 

2612 return schema 

2613 

2614 

2615def _extract_get_pydantic_json_schema(tp: Any) -> GetJsonSchemaFunction | None: 

2616 """Extract `__get_pydantic_json_schema__` from a type, handling the deprecated `__modify_schema__`.""" 

2617 js_modify_function = getattr(tp, '__get_pydantic_json_schema__', None) 

2618 

2619 if hasattr(tp, '__modify_schema__'): 

2620 BaseModel = import_cached_base_model() 

2621 

2622 has_custom_v2_modify_js_func = ( 

2623 js_modify_function is not None 

2624 and BaseModel.__get_pydantic_json_schema__.__func__ # type: ignore 

2625 not in (js_modify_function, getattr(js_modify_function, '__func__', None)) 

2626 ) 

2627 

2628 if not has_custom_v2_modify_js_func: 

2629 cls_name = getattr(tp, '__name__', None) 

2630 raise PydanticUserError( 

2631 f'The `__modify_schema__` method is not supported in Pydantic v2. ' 

2632 f'Use `__get_pydantic_json_schema__` instead{f" in class `{cls_name}`" if cls_name else ""}.', 

2633 code='custom-json-schema', 

2634 ) 

2635 

2636 if (origin := get_origin(tp)) is not None: 

2637 # Generic aliases proxy attribute access to the origin, *except* dunder attributes, 

2638 # such as `__get_pydantic_json_schema__`, hence the explicit check. 

2639 return _extract_get_pydantic_json_schema(origin) 

2640 

2641 if js_modify_function is None: 

2642 return None 

2643 

2644 return js_modify_function 

2645 

2646 

2647def resolve_original_schema(schema: CoreSchema, definitions: _Definitions) -> CoreSchema | None: 

2648 if schema['type'] == 'definition-ref': 

2649 return definitions.get_schema_from_ref(schema['schema_ref']) 

2650 elif schema['type'] == 'definitions': 

2651 return schema['schema'] 

2652 else: 

2653 return schema 

2654 

2655 

2656def _inlining_behavior( 

2657 def_ref: core_schema.DefinitionReferenceSchema, 

2658) -> Literal['inline', 'keep', 'preserve_metadata']: 

2659 """Determine the inlining behavior of the `'definition-ref'` schema. 

2660 

2661 - If no `'serialization'` schema and no metadata is attached, the schema can safely be inlined. 

2662 - If it has metadata but only related to the deferred discriminator application, it can be inlined 

2663 provided that such metadata is kept. 

2664 - Otherwise, the schema should not be inlined. Doing so would remove the `'serialization'` schema or metadata. 

2665 """ 

2666 if 'serialization' in def_ref: 

2667 return 'keep' 

2668 metadata = def_ref.get('metadata') 

2669 if not metadata: 

2670 return 'inline' 

2671 if len(metadata) == 1 and 'pydantic_internal_union_discriminator' in metadata: 

2672 return 'preserve_metadata' 

2673 return 'keep' 

2674 

2675 

2676class _Definitions: 

2677 """Keeps track of references and definitions.""" 

2678 

2679 _recursively_seen: set[str] 

2680 """A set of recursively seen references. 

2681 

2682 When a referenceable type is encountered, the `get_schema_or_ref` context manager is 

2683 entered to compute the reference. If the type references itself by some way (e.g. for 

2684 a dataclass a Pydantic model, the class can be referenced as a field annotation), 

2685 entering the context manager again will yield a `'definition-ref'` schema that should 

2686 short-circuit the normal generation process, as the reference was already in this set. 

2687 """ 

2688 

2689 _definitions: dict[str, core_schema.CoreSchema] 

2690 """A mapping of references to their corresponding schema. 

2691 

2692 When a schema for a referenceable type is generated, it is stored in this mapping. If the 

2693 same type is encountered again, the reference is yielded by the `get_schema_or_ref` context 

2694 manager. 

2695 """ 

2696 

2697 def __init__(self) -> None: 

2698 self._recursively_seen = set() 

2699 self._definitions = {} 

2700 

2701 @contextmanager 

2702 def get_schema_or_ref(self, tp: Any, /) -> Generator[tuple[str, core_schema.DefinitionReferenceSchema | None]]: 

2703 """Get a definition for `tp` if one exists. 

2704 

2705 If a definition exists, a tuple of `(ref_string, CoreSchema)` is returned. 

2706 If no definition exists yet, a tuple of `(ref_string, None)` is returned. 

2707 

2708 Note that the returned `CoreSchema` will always be a `DefinitionReferenceSchema`, 

2709 not the actual definition itself. 

2710 

2711 This should be called for any type that can be identified by reference. 

2712 This includes any recursive types. 

2713 

2714 At present the following types can be named/recursive: 

2715 

2716 - Pydantic model 

2717 - Pydantic and stdlib dataclasses 

2718 - Typed dictionaries 

2719 - Named tuples 

2720 - `TypeAliasType` instances 

2721 - Enums 

2722 """ 

2723 ref = get_type_ref(tp) 

2724 # return the reference if we're either (1) in a cycle or (2) it the reference was already encountered: 

2725 if ref in self._recursively_seen or ref in self._definitions: 

2726 yield (ref, core_schema.definition_reference_schema(ref)) 

2727 else: 

2728 self._recursively_seen.add(ref) 

2729 try: 

2730 yield (ref, None) 

2731 finally: 

2732 self._recursively_seen.discard(ref) 

2733 

2734 def get_schema_from_ref(self, ref: str) -> CoreSchema | None: 

2735 """Resolve the schema from the given reference.""" 

2736 return self._definitions.get(ref) 

2737 

2738 def create_definition_reference_schema(self, schema: CoreSchema) -> core_schema.DefinitionReferenceSchema: 

2739 """Store the schema as a definition and return a `'definition-reference'` schema pointing to it. 

2740 

2741 The schema must have a reference attached to it. 

2742 """ 

2743 ref = schema['ref'] # pyright: ignore 

2744 self._definitions[ref] = schema 

2745 return core_schema.definition_reference_schema(ref) 

2746 

2747 def unpack_definitions(self, schema: core_schema.DefinitionsSchema) -> CoreSchema: 

2748 """Store the definitions of the `'definitions'` core schema and return the inner core schema.""" 

2749 for def_schema in schema['definitions']: 

2750 self._definitions[def_schema['ref']] = def_schema # pyright: ignore 

2751 return schema['schema'] 

2752 

2753 def finalize_schema(self, schema: CoreSchema) -> CoreSchema: 

2754 """Finalize the core schema. 

2755 

2756 This traverses the core schema and referenced definitions, replaces `'definition-ref'` schemas 

2757 by the referenced definition if possible, and applies deferred discriminators. 

2758 """ 

2759 definitions = self._definitions 

2760 try: 

2761 gather_result = gather_schemas_for_cleaning( 

2762 schema, 

2763 definitions=definitions, 

2764 ) 

2765 except MissingDefinitionError as e: 

2766 raise InvalidSchemaError from e 

2767 

2768 remaining_defs: dict[str, CoreSchema] = {} 

2769 

2770 # Note: this logic doesn't play well when core schemas with deferred discriminator metadata 

2771 # and references are encountered. See the `test_deferred_discriminated_union_and_references()` test. 

2772 for ref, inlinable_def_ref in gather_result['collected_references'].items(): 

2773 if inlinable_def_ref is not None and (inlining_behavior := _inlining_behavior(inlinable_def_ref)) != 'keep': 

2774 if inlining_behavior == 'inline': 

2775 # `ref` was encountered, and only once: 

2776 # - `inlinable_def_ref` is a `'definition-ref'` schema and is guaranteed to be 

2777 # the only one. Transform it into the definition it points to. 

2778 # - Do not store the definition in the `remaining_defs`. 

2779 inlinable_def_ref.clear() # pyright: ignore[reportAttributeAccessIssue] 

2780 inlinable_def_ref.update(self._resolve_definition(ref, definitions)) # pyright: ignore 

2781 elif inlining_behavior == 'preserve_metadata': 

2782 # `ref` was encountered, and only once, but contains discriminator metadata. 

2783 # We will do the same thing as if `inlining_behavior` was `'inline'`, but make 

2784 # sure to keep the metadata for the deferred discriminator application logic below. 

2785 meta = inlinable_def_ref.pop('metadata') 

2786 inlinable_def_ref.clear() # pyright: ignore[reportAttributeAccessIssue] 

2787 inlinable_def_ref.update(self._resolve_definition(ref, definitions)) # pyright: ignore 

2788 inlinable_def_ref['metadata'] = meta 

2789 else: 

2790 # `ref` was encountered, at least two times (or only once, but with metadata or a serialization schema): 

2791 # - Do not inline the `'definition-ref'` schemas (they are not provided in the gather result anyway). 

2792 # - Store the the definition in the `remaining_defs` 

2793 remaining_defs[ref] = self._resolve_definition(ref, definitions) 

2794 

2795 for cs in gather_result['deferred_discriminator_schemas']: 

2796 discriminator: str | None = cs['metadata'].pop('pydantic_internal_union_discriminator', None) # pyright: ignore[reportTypedDictNotRequiredAccess] 

2797 if discriminator is None: 

2798 # This can happen in rare scenarios, when a deferred schema is present multiple times in the 

2799 # gather result (e.g. when using the `Sequence` type -- see `test_sequence_discriminated_union()`). 

2800 # In this case, a previous loop iteration applied the discriminator and so we can just skip it here. 

2801 continue 

2802 applied = _discriminated_union.apply_discriminator(cs.copy(), discriminator, remaining_defs) 

2803 # Mutate the schema directly to have the discriminator applied 

2804 cs.clear() # pyright: ignore[reportAttributeAccessIssue] 

2805 cs.update(applied) # pyright: ignore 

2806 

2807 if remaining_defs: 

2808 schema = core_schema.definitions_schema(schema=schema, definitions=[*remaining_defs.values()]) 

2809 return schema 

2810 

2811 def _resolve_definition(self, ref: str, definitions: dict[str, CoreSchema]) -> CoreSchema: 

2812 definition = definitions[ref] 

2813 if definition['type'] != 'definition-ref': 

2814 return definition 

2815 

2816 # Some `'definition-ref'` schemas might act as "intermediate" references (e.g. when using 

2817 # a PEP 695 type alias (which is referenceable) that references another PEP 695 type alias): 

2818 visited: set[str] = set() 

2819 while definition['type'] == 'definition-ref' and _inlining_behavior(definition) == 'inline': 

2820 schema_ref = definition['schema_ref'] 

2821 if schema_ref in visited: 

2822 raise PydanticUserError( 

2823 f'{ref} contains a circular reference to itself.', code='circular-reference-schema' 

2824 ) 

2825 visited.add(schema_ref) 

2826 definition = definitions[schema_ref] 

2827 return {**definition, 'ref': ref} # pyright: ignore[reportReturnType] 

2828 

2829 

2830class _FieldNameStack: 

2831 __slots__ = ('_stack',) 

2832 

2833 def __init__(self) -> None: 

2834 self._stack: list[str] = [] 

2835 

2836 @contextmanager 

2837 def push(self, field_name: str) -> Iterator[None]: 

2838 self._stack.append(field_name) 

2839 yield 

2840 self._stack.pop() 

2841 

2842 def get(self) -> str | None: 

2843 if self._stack: 

2844 return self._stack[-1] 

2845 else: 

2846 return None 

2847 

2848 

2849class _ModelTypeStack: 

2850 __slots__ = ('_stack',) 

2851 

2852 def __init__(self) -> None: 

2853 self._stack: list[type] = [] 

2854 

2855 @contextmanager 

2856 def push(self, type_obj: type) -> Iterator[None]: 

2857 self._stack.append(type_obj) 

2858 yield 

2859 self._stack.pop() 

2860 

2861 def get(self) -> type | None: 

2862 if self._stack: 

2863 return self._stack[-1] 

2864 else: 

2865 return None