Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pydantic/_internal/_decorators.py: 64%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

335 statements  

1"""Logic related to validators applied to models etc. via the `@field_validator` and `@model_validator` decorators.""" 

2 

3from __future__ import annotations as _annotations 

4 

5import sys 

6import types 

7from collections import deque 

8from collections.abc import Iterable 

9from dataclasses import dataclass, field 

10from functools import cached_property, partial, partialmethod 

11from inspect import Parameter, Signature, isdatadescriptor, ismethoddescriptor, signature 

12from itertools import islice 

13from typing import TYPE_CHECKING, Any, Callable, ClassVar, Generic, Literal, TypeVar, Union 

14 

15from pydantic_core import PydanticUndefined, PydanticUndefinedType, core_schema 

16from typing_extensions import TypeAlias, is_typeddict 

17 

18from ..errors import PydanticUserError 

19from ._core_utils import get_type_ref 

20from ._internal_dataclass import slots_true 

21from ._namespace_utils import GlobalsNamespace, MappingNamespace 

22from ._typing_extra import get_function_type_hints 

23from ._utils import can_be_positional 

24 

25if TYPE_CHECKING: 

26 from ..fields import ComputedFieldInfo 

27 from ..functional_validators import FieldValidatorModes 

28 from ._config import ConfigWrapper 

29 

30 

31@dataclass(**slots_true) 

32class ValidatorDecoratorInfo: 

33 """A container for data from `@validator` so that we can access it 

34 while building the pydantic-core schema. 

35 

36 Attributes: 

37 decorator_repr: A class variable representing the decorator string, '@validator'. 

38 fields: A tuple of field names the validator should be called on. 

39 mode: The proposed validator mode. 

40 each_item: For complex objects (sets, lists etc.) whether to validate individual 

41 elements rather than the whole object. 

42 always: Whether this method and other validators should be called even if the value is missing. 

43 check_fields: Whether to check that the fields actually exist on the model. 

44 """ 

45 

46 decorator_repr: ClassVar[str] = '@validator' 

47 

48 fields: tuple[str, ...] 

49 mode: Literal['before', 'after'] 

50 each_item: bool 

51 always: bool 

52 check_fields: bool | None 

53 

54 

55@dataclass(**slots_true) 

56class FieldValidatorDecoratorInfo: 

57 """A container for data from `@field_validator` so that we can access it 

58 while building the pydantic-core schema. 

59 

60 Attributes: 

61 decorator_repr: A class variable representing the decorator string, '@field_validator'. 

62 fields: A tuple of field names the validator should be called on. 

63 mode: The proposed validator mode. 

64 check_fields: Whether to check that the fields actually exist on the model. 

65 json_schema_input_type: The input type of the function. This is only used to generate 

66 the appropriate JSON Schema (in validation mode) and can only specified 

67 when `mode` is either `'before'`, `'plain'` or `'wrap'`. 

68 """ 

69 

70 decorator_repr: ClassVar[str] = '@field_validator' 

71 

72 fields: tuple[str, ...] 

73 mode: FieldValidatorModes 

74 check_fields: bool | None 

75 json_schema_input_type: Any 

76 

77 

78@dataclass(**slots_true) 

79class RootValidatorDecoratorInfo: 

80 """A container for data from `@root_validator` so that we can access it 

81 while building the pydantic-core schema. 

82 

83 Attributes: 

84 decorator_repr: A class variable representing the decorator string, '@root_validator'. 

85 mode: The proposed validator mode. 

86 """ 

87 

88 decorator_repr: ClassVar[str] = '@root_validator' 

89 mode: Literal['before', 'after'] 

90 

91 

92@dataclass(**slots_true) 

93class FieldSerializerDecoratorInfo: 

94 """A container for data from `@field_serializer` so that we can access it 

95 while building the pydantic-core schema. 

96 

97 Attributes: 

98 decorator_repr: A class variable representing the decorator string, '@field_serializer'. 

99 fields: A tuple of field names the serializer should be called on. 

100 mode: The proposed serializer mode. 

101 return_type: The type of the serializer's return value. 

102 when_used: The serialization condition. Accepts a string with values `'always'`, `'unless-none'`, `'json'`, 

103 and `'json-unless-none'`. 

104 check_fields: Whether to check that the fields actually exist on the model. 

105 """ 

106 

107 decorator_repr: ClassVar[str] = '@field_serializer' 

108 fields: tuple[str, ...] 

109 mode: Literal['plain', 'wrap'] 

110 return_type: Any 

111 when_used: core_schema.WhenUsed 

112 check_fields: bool | None 

113 

114 

115@dataclass(**slots_true) 

116class ModelSerializerDecoratorInfo: 

117 """A container for data from `@model_serializer` so that we can access it 

118 while building the pydantic-core schema. 

119 

120 Attributes: 

121 decorator_repr: A class variable representing the decorator string, '@model_serializer'. 

122 mode: The proposed serializer mode. 

123 return_type: The type of the serializer's return value. 

124 when_used: The serialization condition. Accepts a string with values `'always'`, `'unless-none'`, `'json'`, 

125 and `'json-unless-none'`. 

126 """ 

127 

128 decorator_repr: ClassVar[str] = '@model_serializer' 

129 mode: Literal['plain', 'wrap'] 

130 return_type: Any 

131 when_used: core_schema.WhenUsed 

132 

133 

134@dataclass(**slots_true) 

135class ModelValidatorDecoratorInfo: 

136 """A container for data from `@model_validator` so that we can access it 

137 while building the pydantic-core schema. 

138 

139 Attributes: 

140 decorator_repr: A class variable representing the decorator string, '@model_validator'. 

141 mode: The proposed serializer mode. 

142 """ 

143 

144 decorator_repr: ClassVar[str] = '@model_validator' 

145 mode: Literal['wrap', 'before', 'after'] 

146 

147 

148DecoratorInfo: TypeAlias = """Union[ 

149 ValidatorDecoratorInfo, 

150 FieldValidatorDecoratorInfo, 

151 RootValidatorDecoratorInfo, 

152 FieldSerializerDecoratorInfo, 

153 ModelSerializerDecoratorInfo, 

154 ModelValidatorDecoratorInfo, 

155 ComputedFieldInfo, 

156]""" 

157 

158ReturnType = TypeVar('ReturnType') 

159DecoratedType: TypeAlias = ( 

160 'Union[classmethod[Any, Any, ReturnType], staticmethod[Any, ReturnType], Callable[..., ReturnType], property]' 

161) 

162 

163 

164@dataclass # can't use slots here since we set attributes on `__post_init__` 

165class PydanticDescriptorProxy(Generic[ReturnType]): 

166 """Wrap a classmethod, staticmethod, property or unbound function 

167 and act as a descriptor that allows us to detect decorated items 

168 from the class' attributes. 

169 

170 This class' __get__ returns the wrapped item's __get__ result, 

171 which makes it transparent for classmethods and staticmethods. 

172 

173 Attributes: 

174 wrapped: The decorator that has to be wrapped. 

175 decorator_info: The decorator info. 

176 shim: A wrapper function to wrap V1 style function. 

177 """ 

178 

179 wrapped: DecoratedType[ReturnType] 

180 decorator_info: DecoratorInfo 

181 shim: Callable[[Callable[..., Any]], Callable[..., Any]] | None = None 

182 

183 def __post_init__(self): 

184 for attr in 'setter', 'deleter': 

185 if hasattr(self.wrapped, attr): 

186 f = partial(self._call_wrapped_attr, name=attr) 

187 setattr(self, attr, f) 

188 

189 def _call_wrapped_attr(self, func: Callable[[Any], None], *, name: str) -> PydanticDescriptorProxy[ReturnType]: 

190 self.wrapped = getattr(self.wrapped, name)(func) 

191 if isinstance(self.wrapped, property): 

192 # update ComputedFieldInfo.wrapped_property 

193 from ..fields import ComputedFieldInfo 

194 

195 if isinstance(self.decorator_info, ComputedFieldInfo): 

196 self.decorator_info.wrapped_property = self.wrapped 

197 return self 

198 

199 def __get__(self, obj: object | None, obj_type: type[object] | None = None) -> PydanticDescriptorProxy[ReturnType]: 

200 try: 

201 return self.wrapped.__get__(obj, obj_type) # pyright: ignore[reportReturnType] 

202 except AttributeError: 

203 # not a descriptor, e.g. a partial object 

204 return self.wrapped # type: ignore[return-value] 

205 

206 def __set_name__(self, instance: Any, name: str) -> None: 

207 if hasattr(self.wrapped, '__set_name__'): 

208 self.wrapped.__set_name__(instance, name) # pyright: ignore[reportFunctionMemberAccess] 

209 

210 def __getattr__(self, name: str, /) -> Any: 

211 """Forward checks for __isabstractmethod__ and such.""" 

212 return getattr(self.wrapped, name) 

213 

214 

215DecoratorInfoType = TypeVar('DecoratorInfoType', bound=DecoratorInfo) 

216 

217 

218@dataclass(**slots_true) 

219class Decorator(Generic[DecoratorInfoType]): 

220 """A generic container class to join together the decorator metadata 

221 (metadata from decorator itself, which we have when the 

222 decorator is called but not when we are building the core-schema) 

223 and the bound function (which we have after the class itself is created). 

224 

225 Attributes: 

226 cls_ref: The class ref. 

227 cls_var_name: The decorated function name. 

228 func: The decorated function. 

229 shim: A wrapper function to wrap V1 style function. 

230 info: The decorator info. 

231 """ 

232 

233 cls_ref: str 

234 cls_var_name: str 

235 func: Callable[..., Any] 

236 shim: Callable[[Any], Any] | None 

237 info: DecoratorInfoType 

238 

239 @staticmethod 

240 def build( 

241 cls_: Any, 

242 *, 

243 cls_var_name: str, 

244 shim: Callable[[Any], Any] | None, 

245 info: DecoratorInfoType, 

246 ) -> Decorator[DecoratorInfoType]: 

247 """Build a new decorator. 

248 

249 Args: 

250 cls_: The class. 

251 cls_var_name: The decorated function name. 

252 shim: A wrapper function to wrap V1 style function. 

253 info: The decorator info. 

254 

255 Returns: 

256 The new decorator instance. 

257 """ 

258 func = get_attribute_from_bases(cls_, cls_var_name) 

259 if shim is not None: 

260 func = shim(func) 

261 func = unwrap_wrapped_function(func, unwrap_partial=False) 

262 if not callable(func): 

263 # This branch will get hit for classmethod properties 

264 attribute = get_attribute_from_base_dicts(cls_, cls_var_name) # prevents the binding call to `__get__` 

265 if isinstance(attribute, PydanticDescriptorProxy): 

266 func = unwrap_wrapped_function(attribute.wrapped) 

267 return Decorator( 

268 cls_ref=get_type_ref(cls_), 

269 cls_var_name=cls_var_name, 

270 func=func, 

271 shim=shim, 

272 info=info, 

273 ) 

274 

275 def bind_to_cls(self, cls: Any) -> Decorator[DecoratorInfoType]: 

276 """Bind the decorator to a class. 

277 

278 Args: 

279 cls: the class. 

280 

281 Returns: 

282 The new decorator instance. 

283 """ 

284 return self.build( 

285 cls, 

286 cls_var_name=self.cls_var_name, 

287 shim=self.shim, 

288 info=self.info, 

289 ) 

290 

291 

292def get_bases(tp: type[Any]) -> tuple[type[Any], ...]: 

293 """Get the base classes of a class or typeddict. 

294 

295 Args: 

296 tp: The type or class to get the bases. 

297 

298 Returns: 

299 The base classes. 

300 """ 

301 if is_typeddict(tp): 

302 return tp.__orig_bases__ # type: ignore 

303 try: 

304 return tp.__bases__ 

305 except AttributeError: 

306 return () 

307 

308 

309def mro(tp: type[Any]) -> tuple[type[Any], ...]: 

310 """Calculate the Method Resolution Order of bases using the C3 algorithm. 

311 

312 See https://www.python.org/download/releases/2.3/mro/ 

313 """ 

314 # try to use the existing mro, for performance mainly 

315 # but also because it helps verify the implementation below 

316 if not is_typeddict(tp): 

317 try: 

318 return tp.__mro__ 

319 except AttributeError: 

320 # GenericAlias and some other cases 

321 pass 

322 

323 bases = get_bases(tp) 

324 return (tp,) + mro_for_bases(bases) 

325 

326 

327def mro_for_bases(bases: tuple[type[Any], ...]) -> tuple[type[Any], ...]: 

328 def merge_seqs(seqs: list[deque[type[Any]]]) -> Iterable[type[Any]]: 

329 while True: 

330 non_empty = [seq for seq in seqs if seq] 

331 if not non_empty: 

332 # Nothing left to process, we're done. 

333 return 

334 candidate: type[Any] | None = None 

335 for seq in non_empty: # Find merge candidates among seq heads. 

336 candidate = seq[0] 

337 not_head = [s for s in non_empty if candidate in islice(s, 1, None)] 

338 if not_head: 

339 # Reject the candidate. 

340 candidate = None 

341 else: 

342 break 

343 if not candidate: 

344 raise TypeError('Inconsistent hierarchy, no C3 MRO is possible') 

345 yield candidate 

346 for seq in non_empty: 

347 # Remove candidate. 

348 if seq[0] == candidate: 

349 seq.popleft() 

350 

351 seqs = [deque(mro(base)) for base in bases] + [deque(bases)] 

352 return tuple(merge_seqs(seqs)) 

353 

354 

355_sentinel = object() 

356 

357 

358def get_attribute_from_bases(tp: type[Any] | tuple[type[Any], ...], name: str) -> Any: 

359 """Get the attribute from the next class in the MRO that has it, 

360 aiming to simulate calling the method on the actual class. 

361 

362 The reason for iterating over the mro instead of just getting 

363 the attribute (which would do that for us) is to support TypedDict, 

364 which lacks a real __mro__, but can have a virtual one constructed 

365 from its bases (as done here). 

366 

367 Args: 

368 tp: The type or class to search for the attribute. If a tuple, this is treated as a set of base classes. 

369 name: The name of the attribute to retrieve. 

370 

371 Returns: 

372 Any: The attribute value, if found. 

373 

374 Raises: 

375 AttributeError: If the attribute is not found in any class in the MRO. 

376 """ 

377 if isinstance(tp, tuple): 

378 for base in mro_for_bases(tp): 

379 attribute = base.__dict__.get(name, _sentinel) 

380 if attribute is not _sentinel: 

381 attribute_get = getattr(attribute, '__get__', None) 

382 if attribute_get is not None: 

383 return attribute_get(None, tp) 

384 return attribute 

385 raise AttributeError(f'{name} not found in {tp}') 

386 else: 

387 try: 

388 return getattr(tp, name) 

389 except AttributeError: 

390 return get_attribute_from_bases(mro(tp), name) 

391 

392 

393def get_attribute_from_base_dicts(tp: type[Any], name: str) -> Any: 

394 """Get an attribute out of the `__dict__` following the MRO. 

395 This prevents the call to `__get__` on the descriptor, and allows 

396 us to get the original function for classmethod properties. 

397 

398 Args: 

399 tp: The type or class to search for the attribute. 

400 name: The name of the attribute to retrieve. 

401 

402 Returns: 

403 Any: The attribute value, if found. 

404 

405 Raises: 

406 KeyError: If the attribute is not found in any class's `__dict__` in the MRO. 

407 """ 

408 for base in reversed(mro(tp)): 

409 if name in base.__dict__: 

410 return base.__dict__[name] 

411 return tp.__dict__[name] # raise the error 

412 

413 

414@dataclass(**slots_true) 

415class DecoratorInfos: 

416 """Mapping of name in the class namespace to decorator info. 

417 

418 note that the name in the class namespace is the function or attribute name 

419 not the field name! 

420 """ 

421 

422 validators: dict[str, Decorator[ValidatorDecoratorInfo]] = field(default_factory=dict) 

423 field_validators: dict[str, Decorator[FieldValidatorDecoratorInfo]] = field(default_factory=dict) 

424 root_validators: dict[str, Decorator[RootValidatorDecoratorInfo]] = field(default_factory=dict) 

425 field_serializers: dict[str, Decorator[FieldSerializerDecoratorInfo]] = field(default_factory=dict) 

426 model_serializers: dict[str, Decorator[ModelSerializerDecoratorInfo]] = field(default_factory=dict) 

427 model_validators: dict[str, Decorator[ModelValidatorDecoratorInfo]] = field(default_factory=dict) 

428 computed_fields: dict[str, Decorator[ComputedFieldInfo]] = field(default_factory=dict) 

429 

430 @staticmethod 

431 def build(model_dc: type[Any]) -> DecoratorInfos: # noqa: C901 (ignore complexity) 

432 """We want to collect all DecFunc instances that exist as 

433 attributes in the namespace of the class (a BaseModel or dataclass) 

434 that called us 

435 But we want to collect these in the order of the bases 

436 So instead of getting them all from the leaf class (the class that called us), 

437 we traverse the bases from root (the oldest ancestor class) to leaf 

438 and collect all of the instances as we go, taking care to replace 

439 any duplicate ones with the last one we see to mimic how function overriding 

440 works with inheritance. 

441 If we do replace any functions we put the replacement into the position 

442 the replaced function was in; that is, we maintain the order. 

443 """ 

444 # reminder: dicts are ordered and replacement does not alter the order 

445 res = DecoratorInfos() 

446 for base in reversed(mro(model_dc)[1:]): 

447 existing: DecoratorInfos | None = base.__dict__.get('__pydantic_decorators__') 

448 if existing is None: 

449 existing = DecoratorInfos.build(base) 

450 res.validators.update({k: v.bind_to_cls(model_dc) for k, v in existing.validators.items()}) 

451 res.field_validators.update({k: v.bind_to_cls(model_dc) for k, v in existing.field_validators.items()}) 

452 res.root_validators.update({k: v.bind_to_cls(model_dc) for k, v in existing.root_validators.items()}) 

453 res.field_serializers.update({k: v.bind_to_cls(model_dc) for k, v in existing.field_serializers.items()}) 

454 res.model_serializers.update({k: v.bind_to_cls(model_dc) for k, v in existing.model_serializers.items()}) 

455 res.model_validators.update({k: v.bind_to_cls(model_dc) for k, v in existing.model_validators.items()}) 

456 res.computed_fields.update({k: v.bind_to_cls(model_dc) for k, v in existing.computed_fields.items()}) 

457 

458 to_replace: list[tuple[str, Any]] = [] 

459 

460 for var_name, var_value in vars(model_dc).items(): 

461 if isinstance(var_value, PydanticDescriptorProxy): 

462 info = var_value.decorator_info 

463 if isinstance(info, ValidatorDecoratorInfo): 

464 res.validators[var_name] = Decorator.build( 

465 model_dc, cls_var_name=var_name, shim=var_value.shim, info=info 

466 ) 

467 elif isinstance(info, FieldValidatorDecoratorInfo): 

468 res.field_validators[var_name] = Decorator.build( 

469 model_dc, cls_var_name=var_name, shim=var_value.shim, info=info 

470 ) 

471 elif isinstance(info, RootValidatorDecoratorInfo): 

472 res.root_validators[var_name] = Decorator.build( 

473 model_dc, cls_var_name=var_name, shim=var_value.shim, info=info 

474 ) 

475 elif isinstance(info, FieldSerializerDecoratorInfo): 

476 # check whether a serializer function is already registered for fields 

477 for field_serializer_decorator in res.field_serializers.values(): 

478 # check that each field has at most one serializer function. 

479 # serializer functions for the same field in subclasses are allowed, 

480 # and are treated as overrides 

481 if field_serializer_decorator.cls_var_name == var_name: 

482 continue 

483 for f in info.fields: 

484 if f in field_serializer_decorator.info.fields: 

485 raise PydanticUserError( 

486 'Multiple field serializer functions were defined ' 

487 f'for field {f!r}, this is not allowed.', 

488 code='multiple-field-serializers', 

489 ) 

490 res.field_serializers[var_name] = Decorator.build( 

491 model_dc, cls_var_name=var_name, shim=var_value.shim, info=info 

492 ) 

493 elif isinstance(info, ModelValidatorDecoratorInfo): 

494 res.model_validators[var_name] = Decorator.build( 

495 model_dc, cls_var_name=var_name, shim=var_value.shim, info=info 

496 ) 

497 elif isinstance(info, ModelSerializerDecoratorInfo): 

498 res.model_serializers[var_name] = Decorator.build( 

499 model_dc, cls_var_name=var_name, shim=var_value.shim, info=info 

500 ) 

501 else: 

502 from ..fields import ComputedFieldInfo 

503 

504 isinstance(var_value, ComputedFieldInfo) 

505 res.computed_fields[var_name] = Decorator.build( 

506 model_dc, cls_var_name=var_name, shim=None, info=info 

507 ) 

508 to_replace.append((var_name, var_value.wrapped)) 

509 if to_replace: 

510 # If we can save `__pydantic_decorators__` on the class we'll be able to check for it above 

511 # so then we don't need to re-process the type, which means we can discard our descriptor wrappers 

512 # and replace them with the thing they are wrapping (see the other setattr call below) 

513 # which allows validator class methods to also function as regular class methods 

514 model_dc.__pydantic_decorators__ = res 

515 for name, value in to_replace: 

516 setattr(model_dc, name, value) 

517 return res 

518 

519 def update_from_config(self, config_wrapper: ConfigWrapper) -> None: 

520 """Update the decorator infos from the configuration of the class they are attached to.""" 

521 for name, computed_field_dec in self.computed_fields.items(): 

522 computed_field_dec.info._update_from_config(config_wrapper, name) 

523 

524 

525def inspect_validator( 

526 validator: Callable[..., Any], *, mode: FieldValidatorModes, type: Literal['field', 'model'] 

527) -> bool: 

528 """Look at a field or model validator function and determine whether it takes an info argument. 

529 

530 An error is raised if the function has an invalid signature. 

531 

532 Args: 

533 validator: The validator function to inspect. 

534 mode: The proposed validator mode. 

535 type: The type of validator, either 'field' or 'model'. 

536 

537 Returns: 

538 Whether the validator takes an info argument. 

539 """ 

540 try: 

541 sig = _signature_no_eval(validator) 

542 except (ValueError, TypeError): 

543 # `inspect.signature` might not be able to infer a signature, e.g. with C objects. 

544 # In this case, we assume no info argument is present: 

545 return False 

546 n_positional = count_positional_required_params(sig) 

547 if mode == 'wrap': 

548 if n_positional == 3: 

549 return True 

550 elif n_positional == 2: 

551 return False 

552 else: 

553 assert mode in {'before', 'after', 'plain'}, f"invalid mode: {mode!r}, expected 'before', 'after' or 'plain" 

554 if n_positional == 2: 

555 return True 

556 elif n_positional == 1: 

557 return False 

558 

559 raise PydanticUserError( 

560 f'Unrecognized {type} validator function signature for {validator} with `mode={mode}`: {sig}', 

561 code='validator-signature', 

562 ) 

563 

564 

565def inspect_field_serializer(serializer: Callable[..., Any], mode: Literal['plain', 'wrap']) -> tuple[bool, bool]: 

566 """Look at a field serializer function and determine if it is a field serializer, 

567 and whether it takes an info argument. 

568 

569 An error is raised if the function has an invalid signature. 

570 

571 Args: 

572 serializer: The serializer function to inspect. 

573 mode: The serializer mode, either 'plain' or 'wrap'. 

574 

575 Returns: 

576 Tuple of (is_field_serializer, info_arg). 

577 """ 

578 try: 

579 sig = _signature_no_eval(serializer) 

580 except (ValueError, TypeError): 

581 # `inspect.signature` might not be able to infer a signature, e.g. with C objects. 

582 # In this case, we assume no info argument is present and this is not a method: 

583 return (False, False) 

584 

585 first = next(iter(sig.parameters.values()), None) 

586 is_field_serializer = first is not None and first.name == 'self' 

587 

588 n_positional = count_positional_required_params(sig) 

589 if is_field_serializer: 

590 # -1 to correct for self parameter 

591 info_arg = _serializer_info_arg(mode, n_positional - 1) 

592 else: 

593 info_arg = _serializer_info_arg(mode, n_positional) 

594 

595 if info_arg is None: 

596 raise PydanticUserError( 

597 f'Unrecognized field_serializer function signature for {serializer} with `mode={mode}`:{sig}', 

598 code='field-serializer-signature', 

599 ) 

600 

601 return is_field_serializer, info_arg 

602 

603 

604def inspect_annotated_serializer(serializer: Callable[..., Any], mode: Literal['plain', 'wrap']) -> bool: 

605 """Look at a serializer function used via `Annotated` and determine whether it takes an info argument. 

606 

607 An error is raised if the function has an invalid signature. 

608 

609 Args: 

610 serializer: The serializer function to check. 

611 mode: The serializer mode, either 'plain' or 'wrap'. 

612 

613 Returns: 

614 info_arg 

615 """ 

616 try: 

617 sig = _signature_no_eval(serializer) 

618 except (ValueError, TypeError): 

619 # `inspect.signature` might not be able to infer a signature, e.g. with C objects. 

620 # In this case, we assume no info argument is present: 

621 return False 

622 info_arg = _serializer_info_arg(mode, count_positional_required_params(sig)) 

623 if info_arg is None: 

624 raise PydanticUserError( 

625 f'Unrecognized field_serializer function signature for {serializer} with `mode={mode}`:{sig}', 

626 code='field-serializer-signature', 

627 ) 

628 else: 

629 return info_arg 

630 

631 

632def inspect_model_serializer(serializer: Callable[..., Any], mode: Literal['plain', 'wrap']) -> bool: 

633 """Look at a model serializer function and determine whether it takes an info argument. 

634 

635 An error is raised if the function has an invalid signature. 

636 

637 Args: 

638 serializer: The serializer function to check. 

639 mode: The serializer mode, either 'plain' or 'wrap'. 

640 

641 Returns: 

642 `info_arg` - whether the function expects an info argument. 

643 """ 

644 if isinstance(serializer, (staticmethod, classmethod)) or not is_instance_method_from_sig(serializer): 

645 raise PydanticUserError( 

646 '`@model_serializer` must be applied to instance methods', code='model-serializer-instance-method' 

647 ) 

648 

649 sig = _signature_no_eval(serializer) 

650 info_arg = _serializer_info_arg(mode, count_positional_required_params(sig)) 

651 if info_arg is None: 

652 raise PydanticUserError( 

653 f'Unrecognized model_serializer function signature for {serializer} with `mode={mode}`:{sig}', 

654 code='model-serializer-signature', 

655 ) 

656 else: 

657 return info_arg 

658 

659 

660def _serializer_info_arg(mode: Literal['plain', 'wrap'], n_positional: int) -> bool | None: 

661 if mode == 'plain': 

662 if n_positional == 1: 

663 # (input_value: Any, /) -> Any 

664 return False 

665 elif n_positional == 2: 

666 # (model: Any, input_value: Any, /) -> Any 

667 return True 

668 else: 

669 assert mode == 'wrap', f"invalid mode: {mode!r}, expected 'plain' or 'wrap'" 

670 if n_positional == 2: 

671 # (input_value: Any, serializer: SerializerFunctionWrapHandler, /) -> Any 

672 return False 

673 elif n_positional == 3: 

674 # (input_value: Any, serializer: SerializerFunctionWrapHandler, info: SerializationInfo, /) -> Any 

675 return True 

676 

677 return None 

678 

679 

680AnyDecoratorCallable: TypeAlias = ( 

681 'Union[classmethod[Any, Any, Any], staticmethod[Any, Any], partialmethod[Any], Callable[..., Any]]' 

682) 

683 

684 

685def is_instance_method_from_sig(function: AnyDecoratorCallable) -> bool: 

686 """Whether the function is an instance method. 

687 

688 It will consider a function as instance method if the first parameter of 

689 function is `self`. 

690 

691 Args: 

692 function: The function to check. 

693 

694 Returns: 

695 `True` if the function is an instance method, `False` otherwise. 

696 """ 

697 sig = _signature_no_eval(unwrap_wrapped_function(function)) 

698 first = next(iter(sig.parameters.values()), None) 

699 if first and first.name == 'self': 

700 return True 

701 return False 

702 

703 

704def ensure_classmethod_based_on_signature(function: AnyDecoratorCallable) -> Any: 

705 """Apply the `@classmethod` decorator on the function. 

706 

707 Args: 

708 function: The function to apply the decorator on. 

709 

710 Return: 

711 The `@classmethod` decorator applied function. 

712 """ 

713 if not isinstance( 

714 unwrap_wrapped_function(function, unwrap_class_static_method=False), classmethod 

715 ) and _is_classmethod_from_sig(function): 

716 return classmethod(function) # type: ignore[arg-type] 

717 return function 

718 

719 

720def _is_classmethod_from_sig(function: AnyDecoratorCallable) -> bool: 

721 sig = _signature_no_eval(unwrap_wrapped_function(function)) 

722 first = next(iter(sig.parameters.values()), None) 

723 if first and first.name == 'cls': 

724 return True 

725 return False 

726 

727 

728def unwrap_wrapped_function( 

729 func: Any, 

730 *, 

731 unwrap_partial: bool = True, 

732 unwrap_class_static_method: bool = True, 

733) -> Any: 

734 """Recursively unwraps a wrapped function until the underlying function is reached. 

735 This handles property, functools.partial, functools.partialmethod, staticmethod, and classmethod. 

736 

737 Args: 

738 func: The function to unwrap. 

739 unwrap_partial: If True (default), unwrap partial and partialmethod decorators. 

740 unwrap_class_static_method: If True (default), also unwrap classmethod and staticmethod 

741 decorators. If False, only unwrap partial and partialmethod decorators. 

742 

743 Returns: 

744 The underlying function of the wrapped function. 

745 """ 

746 # Define the types we want to check against as a single tuple. 

747 unwrap_types = ( 

748 (property, cached_property) 

749 + ((partial, partialmethod) if unwrap_partial else ()) 

750 + ((staticmethod, classmethod) if unwrap_class_static_method else ()) 

751 ) 

752 

753 while isinstance(func, unwrap_types): 

754 if unwrap_class_static_method and isinstance(func, (classmethod, staticmethod)): 

755 func = func.__func__ 

756 elif isinstance(func, (partial, partialmethod)): 

757 func = func.func 

758 elif isinstance(func, property): 

759 func = func.fget # arbitrary choice, convenient for computed fields 

760 else: 

761 # Make coverage happy as it can only get here in the last possible case 

762 assert isinstance(func, cached_property) 

763 func = func.func # type: ignore 

764 

765 return func 

766 

767 

768_function_like = ( 

769 partial, 

770 partialmethod, 

771 types.FunctionType, 

772 types.BuiltinFunctionType, 

773 types.MethodType, 

774 types.WrapperDescriptorType, 

775 types.MethodWrapperType, 

776 types.MemberDescriptorType, 

777) 

778 

779 

780def get_callable_return_type( 

781 callable_obj: Any, 

782 globalns: GlobalsNamespace | None = None, 

783 localns: MappingNamespace | None = None, 

784) -> Any | PydanticUndefinedType: 

785 """Get the callable return type. 

786 

787 Args: 

788 callable_obj: The callable to analyze. 

789 globalns: The globals namespace to use during type annotation evaluation. 

790 localns: The locals namespace to use during type annotation evaluation. 

791 

792 Returns: 

793 The function return type. 

794 """ 

795 if isinstance(callable_obj, type): 

796 # types are callables, and we assume the return type 

797 # is the type itself (e.g. `int()` results in an instance of `int`). 

798 return callable_obj 

799 

800 if not isinstance(callable_obj, _function_like): 

801 call_func = getattr(type(callable_obj), '__call__', None) # noqa: B004 

802 if call_func is not None: 

803 callable_obj = call_func 

804 

805 hints = get_function_type_hints( 

806 unwrap_wrapped_function(callable_obj), 

807 include_keys={'return'}, 

808 globalns=globalns, 

809 localns=localns, 

810 ) 

811 return hints.get('return', PydanticUndefined) 

812 

813 

814def count_positional_required_params(sig: Signature) -> int: 

815 """Get the number of positional (required) arguments of a signature. 

816 

817 This function should only be used to inspect signatures of validation and serialization functions. 

818 The first argument (the value being serialized or validated) is counted as a required argument 

819 even if a default value exists. 

820 

821 Returns: 

822 The number of positional arguments of a signature. 

823 """ 

824 parameters = list(sig.parameters.values()) 

825 return sum( 

826 1 

827 for param in parameters 

828 if can_be_positional(param) 

829 # First argument is the value being validated/serialized, and can have a default value 

830 # (e.g. `float`, which has signature `(x=0, /)`). We assume other parameters (the info arg 

831 # for instance) should be required, and thus without any default value. 

832 and (param.default is Parameter.empty or param is parameters[0]) 

833 ) 

834 

835 

836def ensure_property(f: Any) -> Any: 

837 """Ensure that a function is a `property` or `cached_property`, or is a valid descriptor. 

838 

839 Args: 

840 f: The function to check. 

841 

842 Returns: 

843 The function, or a `property` or `cached_property` instance wrapping the function. 

844 """ 

845 if ismethoddescriptor(f) or isdatadescriptor(f): 

846 return f 

847 else: 

848 return property(f) 

849 

850 

851def _signature_no_eval(f: Callable[..., Any]) -> Signature: 

852 """Get the signature of a callable without evaluating any annotations.""" 

853 if sys.version_info >= (3, 14): 

854 from annotationlib import Format 

855 

856 return signature(f, annotation_format=Format.FORWARDREF) 

857 else: 

858 return signature(f)