1"""Logic related to validators applied to models etc. via the `@field_validator` and `@model_validator` decorators."""
2
3from __future__ import annotations as _annotations
4
5import sys
6import types
7from collections import deque
8from collections.abc import Iterable
9from dataclasses import dataclass, field
10from functools import cached_property, partial, partialmethod
11from inspect import Parameter, Signature, isdatadescriptor, ismethoddescriptor, signature
12from itertools import islice
13from typing import TYPE_CHECKING, Any, Callable, ClassVar, Generic, Literal, TypeVar, Union
14
15from pydantic_core import PydanticUndefined, PydanticUndefinedType, core_schema
16from typing_extensions import TypeAlias, is_typeddict
17
18from ..errors import PydanticUserError
19from ._core_utils import get_type_ref
20from ._internal_dataclass import slots_true
21from ._namespace_utils import GlobalsNamespace, MappingNamespace
22from ._typing_extra import get_function_type_hints
23from ._utils import can_be_positional
24
25if TYPE_CHECKING:
26 from ..fields import ComputedFieldInfo
27 from ..functional_validators import FieldValidatorModes
28 from ._config import ConfigWrapper
29
30
31@dataclass(**slots_true)
32class ValidatorDecoratorInfo:
33 """A container for data from `@validator` so that we can access it
34 while building the pydantic-core schema.
35
36 Attributes:
37 decorator_repr: A class variable representing the decorator string, '@validator'.
38 fields: A tuple of field names the validator should be called on.
39 mode: The proposed validator mode.
40 each_item: For complex objects (sets, lists etc.) whether to validate individual
41 elements rather than the whole object.
42 always: Whether this method and other validators should be called even if the value is missing.
43 check_fields: Whether to check that the fields actually exist on the model.
44 """
45
46 decorator_repr: ClassVar[str] = '@validator'
47
48 fields: tuple[str, ...]
49 mode: Literal['before', 'after']
50 each_item: bool
51 always: bool
52 check_fields: bool | None
53
54
55@dataclass(**slots_true)
56class FieldValidatorDecoratorInfo:
57 """A container for data from `@field_validator` so that we can access it
58 while building the pydantic-core schema.
59
60 Attributes:
61 decorator_repr: A class variable representing the decorator string, '@field_validator'.
62 fields: A tuple of field names the validator should be called on.
63 mode: The proposed validator mode.
64 check_fields: Whether to check that the fields actually exist on the model.
65 json_schema_input_type: The input type of the function. This is only used to generate
66 the appropriate JSON Schema (in validation mode) and can only specified
67 when `mode` is either `'before'`, `'plain'` or `'wrap'`.
68 """
69
70 decorator_repr: ClassVar[str] = '@field_validator'
71
72 fields: tuple[str, ...]
73 mode: FieldValidatorModes
74 check_fields: bool | None
75 json_schema_input_type: Any
76
77
78@dataclass(**slots_true)
79class RootValidatorDecoratorInfo:
80 """A container for data from `@root_validator` so that we can access it
81 while building the pydantic-core schema.
82
83 Attributes:
84 decorator_repr: A class variable representing the decorator string, '@root_validator'.
85 mode: The proposed validator mode.
86 """
87
88 decorator_repr: ClassVar[str] = '@root_validator'
89 mode: Literal['before', 'after']
90
91
92@dataclass(**slots_true)
93class FieldSerializerDecoratorInfo:
94 """A container for data from `@field_serializer` so that we can access it
95 while building the pydantic-core schema.
96
97 Attributes:
98 decorator_repr: A class variable representing the decorator string, '@field_serializer'.
99 fields: A tuple of field names the serializer should be called on.
100 mode: The proposed serializer mode.
101 return_type: The type of the serializer's return value.
102 when_used: The serialization condition. Accepts a string with values `'always'`, `'unless-none'`, `'json'`,
103 and `'json-unless-none'`.
104 check_fields: Whether to check that the fields actually exist on the model.
105 """
106
107 decorator_repr: ClassVar[str] = '@field_serializer'
108 fields: tuple[str, ...]
109 mode: Literal['plain', 'wrap']
110 return_type: Any
111 when_used: core_schema.WhenUsed
112 check_fields: bool | None
113
114
115@dataclass(**slots_true)
116class ModelSerializerDecoratorInfo:
117 """A container for data from `@model_serializer` so that we can access it
118 while building the pydantic-core schema.
119
120 Attributes:
121 decorator_repr: A class variable representing the decorator string, '@model_serializer'.
122 mode: The proposed serializer mode.
123 return_type: The type of the serializer's return value.
124 when_used: The serialization condition. Accepts a string with values `'always'`, `'unless-none'`, `'json'`,
125 and `'json-unless-none'`.
126 """
127
128 decorator_repr: ClassVar[str] = '@model_serializer'
129 mode: Literal['plain', 'wrap']
130 return_type: Any
131 when_used: core_schema.WhenUsed
132
133
134@dataclass(**slots_true)
135class ModelValidatorDecoratorInfo:
136 """A container for data from `@model_validator` so that we can access it
137 while building the pydantic-core schema.
138
139 Attributes:
140 decorator_repr: A class variable representing the decorator string, '@model_validator'.
141 mode: The proposed serializer mode.
142 """
143
144 decorator_repr: ClassVar[str] = '@model_validator'
145 mode: Literal['wrap', 'before', 'after']
146
147
148DecoratorInfo: TypeAlias = """Union[
149 ValidatorDecoratorInfo,
150 FieldValidatorDecoratorInfo,
151 RootValidatorDecoratorInfo,
152 FieldSerializerDecoratorInfo,
153 ModelSerializerDecoratorInfo,
154 ModelValidatorDecoratorInfo,
155 ComputedFieldInfo,
156]"""
157
158ReturnType = TypeVar('ReturnType')
159DecoratedType: TypeAlias = (
160 'Union[classmethod[Any, Any, ReturnType], staticmethod[Any, ReturnType], Callable[..., ReturnType], property]'
161)
162
163
164@dataclass # can't use slots here since we set attributes on `__post_init__`
165class PydanticDescriptorProxy(Generic[ReturnType]):
166 """Wrap a classmethod, staticmethod, property or unbound function
167 and act as a descriptor that allows us to detect decorated items
168 from the class' attributes.
169
170 This class' __get__ returns the wrapped item's __get__ result,
171 which makes it transparent for classmethods and staticmethods.
172
173 Attributes:
174 wrapped: The decorator that has to be wrapped.
175 decorator_info: The decorator info.
176 shim: A wrapper function to wrap V1 style function.
177 """
178
179 wrapped: DecoratedType[ReturnType]
180 decorator_info: DecoratorInfo
181 shim: Callable[[Callable[..., Any]], Callable[..., Any]] | None = None
182
183 def __post_init__(self):
184 for attr in 'setter', 'deleter':
185 if hasattr(self.wrapped, attr):
186 f = partial(self._call_wrapped_attr, name=attr)
187 setattr(self, attr, f)
188
189 def _call_wrapped_attr(self, func: Callable[[Any], None], *, name: str) -> PydanticDescriptorProxy[ReturnType]:
190 self.wrapped = getattr(self.wrapped, name)(func)
191 if isinstance(self.wrapped, property):
192 # update ComputedFieldInfo.wrapped_property
193 from ..fields import ComputedFieldInfo
194
195 if isinstance(self.decorator_info, ComputedFieldInfo):
196 self.decorator_info.wrapped_property = self.wrapped
197 return self
198
199 def __get__(self, obj: object | None, obj_type: type[object] | None = None) -> PydanticDescriptorProxy[ReturnType]:
200 try:
201 return self.wrapped.__get__(obj, obj_type) # pyright: ignore[reportReturnType]
202 except AttributeError:
203 # not a descriptor, e.g. a partial object
204 return self.wrapped # type: ignore[return-value]
205
206 def __set_name__(self, instance: Any, name: str) -> None:
207 if hasattr(self.wrapped, '__set_name__'):
208 self.wrapped.__set_name__(instance, name) # pyright: ignore[reportFunctionMemberAccess]
209
210 def __getattr__(self, name: str, /) -> Any:
211 """Forward checks for __isabstractmethod__ and such."""
212 return getattr(self.wrapped, name)
213
214
215DecoratorInfoType = TypeVar('DecoratorInfoType', bound=DecoratorInfo)
216
217
218@dataclass(**slots_true)
219class Decorator(Generic[DecoratorInfoType]):
220 """A generic container class to join together the decorator metadata
221 (metadata from decorator itself, which we have when the
222 decorator is called but not when we are building the core-schema)
223 and the bound function (which we have after the class itself is created).
224
225 Attributes:
226 cls_ref: The class ref.
227 cls_var_name: The decorated function name.
228 func: The decorated function.
229 shim: A wrapper function to wrap V1 style function.
230 info: The decorator info.
231 """
232
233 cls_ref: str
234 cls_var_name: str
235 func: Callable[..., Any]
236 shim: Callable[[Any], Any] | None
237 info: DecoratorInfoType
238
239 @staticmethod
240 def build(
241 cls_: Any,
242 *,
243 cls_var_name: str,
244 shim: Callable[[Any], Any] | None,
245 info: DecoratorInfoType,
246 ) -> Decorator[DecoratorInfoType]:
247 """Build a new decorator.
248
249 Args:
250 cls_: The class.
251 cls_var_name: The decorated function name.
252 shim: A wrapper function to wrap V1 style function.
253 info: The decorator info.
254
255 Returns:
256 The new decorator instance.
257 """
258 func = get_attribute_from_bases(cls_, cls_var_name)
259 if shim is not None:
260 func = shim(func)
261 func = unwrap_wrapped_function(func, unwrap_partial=False)
262 if not callable(func):
263 # This branch will get hit for classmethod properties
264 attribute = get_attribute_from_base_dicts(cls_, cls_var_name) # prevents the binding call to `__get__`
265 if isinstance(attribute, PydanticDescriptorProxy):
266 func = unwrap_wrapped_function(attribute.wrapped)
267 return Decorator(
268 cls_ref=get_type_ref(cls_),
269 cls_var_name=cls_var_name,
270 func=func,
271 shim=shim,
272 info=info,
273 )
274
275 def bind_to_cls(self, cls: Any) -> Decorator[DecoratorInfoType]:
276 """Bind the decorator to a class.
277
278 Args:
279 cls: the class.
280
281 Returns:
282 The new decorator instance.
283 """
284 return self.build(
285 cls,
286 cls_var_name=self.cls_var_name,
287 shim=self.shim,
288 info=self.info,
289 )
290
291
292def get_bases(tp: type[Any]) -> tuple[type[Any], ...]:
293 """Get the base classes of a class or typeddict.
294
295 Args:
296 tp: The type or class to get the bases.
297
298 Returns:
299 The base classes.
300 """
301 if is_typeddict(tp):
302 return tp.__orig_bases__ # type: ignore
303 try:
304 return tp.__bases__
305 except AttributeError:
306 return ()
307
308
309def mro(tp: type[Any]) -> tuple[type[Any], ...]:
310 """Calculate the Method Resolution Order of bases using the C3 algorithm.
311
312 See https://www.python.org/download/releases/2.3/mro/
313 """
314 # try to use the existing mro, for performance mainly
315 # but also because it helps verify the implementation below
316 if not is_typeddict(tp):
317 try:
318 return tp.__mro__
319 except AttributeError:
320 # GenericAlias and some other cases
321 pass
322
323 bases = get_bases(tp)
324 return (tp,) + mro_for_bases(bases)
325
326
327def mro_for_bases(bases: tuple[type[Any], ...]) -> tuple[type[Any], ...]:
328 def merge_seqs(seqs: list[deque[type[Any]]]) -> Iterable[type[Any]]:
329 while True:
330 non_empty = [seq for seq in seqs if seq]
331 if not non_empty:
332 # Nothing left to process, we're done.
333 return
334 candidate: type[Any] | None = None
335 for seq in non_empty: # Find merge candidates among seq heads.
336 candidate = seq[0]
337 not_head = [s for s in non_empty if candidate in islice(s, 1, None)]
338 if not_head:
339 # Reject the candidate.
340 candidate = None
341 else:
342 break
343 if not candidate:
344 raise TypeError('Inconsistent hierarchy, no C3 MRO is possible')
345 yield candidate
346 for seq in non_empty:
347 # Remove candidate.
348 if seq[0] == candidate:
349 seq.popleft()
350
351 seqs = [deque(mro(base)) for base in bases] + [deque(bases)]
352 return tuple(merge_seqs(seqs))
353
354
355_sentinel = object()
356
357
358def get_attribute_from_bases(tp: type[Any] | tuple[type[Any], ...], name: str) -> Any:
359 """Get the attribute from the next class in the MRO that has it,
360 aiming to simulate calling the method on the actual class.
361
362 The reason for iterating over the mro instead of just getting
363 the attribute (which would do that for us) is to support TypedDict,
364 which lacks a real __mro__, but can have a virtual one constructed
365 from its bases (as done here).
366
367 Args:
368 tp: The type or class to search for the attribute. If a tuple, this is treated as a set of base classes.
369 name: The name of the attribute to retrieve.
370
371 Returns:
372 Any: The attribute value, if found.
373
374 Raises:
375 AttributeError: If the attribute is not found in any class in the MRO.
376 """
377 if isinstance(tp, tuple):
378 for base in mro_for_bases(tp):
379 attribute = base.__dict__.get(name, _sentinel)
380 if attribute is not _sentinel:
381 attribute_get = getattr(attribute, '__get__', None)
382 if attribute_get is not None:
383 return attribute_get(None, tp)
384 return attribute
385 raise AttributeError(f'{name} not found in {tp}')
386 else:
387 try:
388 return getattr(tp, name)
389 except AttributeError:
390 return get_attribute_from_bases(mro(tp), name)
391
392
393def get_attribute_from_base_dicts(tp: type[Any], name: str) -> Any:
394 """Get an attribute out of the `__dict__` following the MRO.
395 This prevents the call to `__get__` on the descriptor, and allows
396 us to get the original function for classmethod properties.
397
398 Args:
399 tp: The type or class to search for the attribute.
400 name: The name of the attribute to retrieve.
401
402 Returns:
403 Any: The attribute value, if found.
404
405 Raises:
406 KeyError: If the attribute is not found in any class's `__dict__` in the MRO.
407 """
408 for base in reversed(mro(tp)):
409 if name in base.__dict__:
410 return base.__dict__[name]
411 return tp.__dict__[name] # raise the error
412
413
414@dataclass(**slots_true)
415class DecoratorInfos:
416 """Mapping of name in the class namespace to decorator info.
417
418 note that the name in the class namespace is the function or attribute name
419 not the field name!
420 """
421
422 validators: dict[str, Decorator[ValidatorDecoratorInfo]] = field(default_factory=dict)
423 field_validators: dict[str, Decorator[FieldValidatorDecoratorInfo]] = field(default_factory=dict)
424 root_validators: dict[str, Decorator[RootValidatorDecoratorInfo]] = field(default_factory=dict)
425 field_serializers: dict[str, Decorator[FieldSerializerDecoratorInfo]] = field(default_factory=dict)
426 model_serializers: dict[str, Decorator[ModelSerializerDecoratorInfo]] = field(default_factory=dict)
427 model_validators: dict[str, Decorator[ModelValidatorDecoratorInfo]] = field(default_factory=dict)
428 computed_fields: dict[str, Decorator[ComputedFieldInfo]] = field(default_factory=dict)
429
430 @staticmethod
431 def build(model_dc: type[Any]) -> DecoratorInfos: # noqa: C901 (ignore complexity)
432 """We want to collect all DecFunc instances that exist as
433 attributes in the namespace of the class (a BaseModel or dataclass)
434 that called us
435 But we want to collect these in the order of the bases
436 So instead of getting them all from the leaf class (the class that called us),
437 we traverse the bases from root (the oldest ancestor class) to leaf
438 and collect all of the instances as we go, taking care to replace
439 any duplicate ones with the last one we see to mimic how function overriding
440 works with inheritance.
441 If we do replace any functions we put the replacement into the position
442 the replaced function was in; that is, we maintain the order.
443 """
444 # reminder: dicts are ordered and replacement does not alter the order
445 res = DecoratorInfos()
446 for base in reversed(mro(model_dc)[1:]):
447 existing: DecoratorInfos | None = base.__dict__.get('__pydantic_decorators__')
448 if existing is None:
449 existing = DecoratorInfos.build(base)
450 res.validators.update({k: v.bind_to_cls(model_dc) for k, v in existing.validators.items()})
451 res.field_validators.update({k: v.bind_to_cls(model_dc) for k, v in existing.field_validators.items()})
452 res.root_validators.update({k: v.bind_to_cls(model_dc) for k, v in existing.root_validators.items()})
453 res.field_serializers.update({k: v.bind_to_cls(model_dc) for k, v in existing.field_serializers.items()})
454 res.model_serializers.update({k: v.bind_to_cls(model_dc) for k, v in existing.model_serializers.items()})
455 res.model_validators.update({k: v.bind_to_cls(model_dc) for k, v in existing.model_validators.items()})
456 res.computed_fields.update({k: v.bind_to_cls(model_dc) for k, v in existing.computed_fields.items()})
457
458 to_replace: list[tuple[str, Any]] = []
459
460 for var_name, var_value in vars(model_dc).items():
461 if isinstance(var_value, PydanticDescriptorProxy):
462 info = var_value.decorator_info
463 if isinstance(info, ValidatorDecoratorInfo):
464 res.validators[var_name] = Decorator.build(
465 model_dc, cls_var_name=var_name, shim=var_value.shim, info=info
466 )
467 elif isinstance(info, FieldValidatorDecoratorInfo):
468 res.field_validators[var_name] = Decorator.build(
469 model_dc, cls_var_name=var_name, shim=var_value.shim, info=info
470 )
471 elif isinstance(info, RootValidatorDecoratorInfo):
472 res.root_validators[var_name] = Decorator.build(
473 model_dc, cls_var_name=var_name, shim=var_value.shim, info=info
474 )
475 elif isinstance(info, FieldSerializerDecoratorInfo):
476 # check whether a serializer function is already registered for fields
477 for field_serializer_decorator in res.field_serializers.values():
478 # check that each field has at most one serializer function.
479 # serializer functions for the same field in subclasses are allowed,
480 # and are treated as overrides
481 if field_serializer_decorator.cls_var_name == var_name:
482 continue
483 for f in info.fields:
484 if f in field_serializer_decorator.info.fields:
485 raise PydanticUserError(
486 'Multiple field serializer functions were defined '
487 f'for field {f!r}, this is not allowed.',
488 code='multiple-field-serializers',
489 )
490 res.field_serializers[var_name] = Decorator.build(
491 model_dc, cls_var_name=var_name, shim=var_value.shim, info=info
492 )
493 elif isinstance(info, ModelValidatorDecoratorInfo):
494 res.model_validators[var_name] = Decorator.build(
495 model_dc, cls_var_name=var_name, shim=var_value.shim, info=info
496 )
497 elif isinstance(info, ModelSerializerDecoratorInfo):
498 res.model_serializers[var_name] = Decorator.build(
499 model_dc, cls_var_name=var_name, shim=var_value.shim, info=info
500 )
501 else:
502 from ..fields import ComputedFieldInfo
503
504 isinstance(var_value, ComputedFieldInfo)
505 res.computed_fields[var_name] = Decorator.build(
506 model_dc, cls_var_name=var_name, shim=None, info=info
507 )
508 to_replace.append((var_name, var_value.wrapped))
509 if to_replace:
510 # If we can save `__pydantic_decorators__` on the class we'll be able to check for it above
511 # so then we don't need to re-process the type, which means we can discard our descriptor wrappers
512 # and replace them with the thing they are wrapping (see the other setattr call below)
513 # which allows validator class methods to also function as regular class methods
514 model_dc.__pydantic_decorators__ = res
515 for name, value in to_replace:
516 setattr(model_dc, name, value)
517 return res
518
519 def update_from_config(self, config_wrapper: ConfigWrapper) -> None:
520 """Update the decorator infos from the configuration of the class they are attached to."""
521 for name, computed_field_dec in self.computed_fields.items():
522 computed_field_dec.info._update_from_config(config_wrapper, name)
523
524
525def inspect_validator(
526 validator: Callable[..., Any], *, mode: FieldValidatorModes, type: Literal['field', 'model']
527) -> bool:
528 """Look at a field or model validator function and determine whether it takes an info argument.
529
530 An error is raised if the function has an invalid signature.
531
532 Args:
533 validator: The validator function to inspect.
534 mode: The proposed validator mode.
535 type: The type of validator, either 'field' or 'model'.
536
537 Returns:
538 Whether the validator takes an info argument.
539 """
540 try:
541 sig = _signature_no_eval(validator)
542 except (ValueError, TypeError):
543 # `inspect.signature` might not be able to infer a signature, e.g. with C objects.
544 # In this case, we assume no info argument is present:
545 return False
546 n_positional = count_positional_required_params(sig)
547 if mode == 'wrap':
548 if n_positional == 3:
549 return True
550 elif n_positional == 2:
551 return False
552 else:
553 assert mode in {'before', 'after', 'plain'}, f"invalid mode: {mode!r}, expected 'before', 'after' or 'plain"
554 if n_positional == 2:
555 return True
556 elif n_positional == 1:
557 return False
558
559 raise PydanticUserError(
560 f'Unrecognized {type} validator function signature for {validator} with `mode={mode}`: {sig}',
561 code='validator-signature',
562 )
563
564
565def inspect_field_serializer(serializer: Callable[..., Any], mode: Literal['plain', 'wrap']) -> tuple[bool, bool]:
566 """Look at a field serializer function and determine if it is a field serializer,
567 and whether it takes an info argument.
568
569 An error is raised if the function has an invalid signature.
570
571 Args:
572 serializer: The serializer function to inspect.
573 mode: The serializer mode, either 'plain' or 'wrap'.
574
575 Returns:
576 Tuple of (is_field_serializer, info_arg).
577 """
578 try:
579 sig = _signature_no_eval(serializer)
580 except (ValueError, TypeError):
581 # `inspect.signature` might not be able to infer a signature, e.g. with C objects.
582 # In this case, we assume no info argument is present and this is not a method:
583 return (False, False)
584
585 first = next(iter(sig.parameters.values()), None)
586 is_field_serializer = first is not None and first.name == 'self'
587
588 n_positional = count_positional_required_params(sig)
589 if is_field_serializer:
590 # -1 to correct for self parameter
591 info_arg = _serializer_info_arg(mode, n_positional - 1)
592 else:
593 info_arg = _serializer_info_arg(mode, n_positional)
594
595 if info_arg is None:
596 raise PydanticUserError(
597 f'Unrecognized field_serializer function signature for {serializer} with `mode={mode}`:{sig}',
598 code='field-serializer-signature',
599 )
600
601 return is_field_serializer, info_arg
602
603
604def inspect_annotated_serializer(serializer: Callable[..., Any], mode: Literal['plain', 'wrap']) -> bool:
605 """Look at a serializer function used via `Annotated` and determine whether it takes an info argument.
606
607 An error is raised if the function has an invalid signature.
608
609 Args:
610 serializer: The serializer function to check.
611 mode: The serializer mode, either 'plain' or 'wrap'.
612
613 Returns:
614 info_arg
615 """
616 try:
617 sig = _signature_no_eval(serializer)
618 except (ValueError, TypeError):
619 # `inspect.signature` might not be able to infer a signature, e.g. with C objects.
620 # In this case, we assume no info argument is present:
621 return False
622 info_arg = _serializer_info_arg(mode, count_positional_required_params(sig))
623 if info_arg is None:
624 raise PydanticUserError(
625 f'Unrecognized field_serializer function signature for {serializer} with `mode={mode}`:{sig}',
626 code='field-serializer-signature',
627 )
628 else:
629 return info_arg
630
631
632def inspect_model_serializer(serializer: Callable[..., Any], mode: Literal['plain', 'wrap']) -> bool:
633 """Look at a model serializer function and determine whether it takes an info argument.
634
635 An error is raised if the function has an invalid signature.
636
637 Args:
638 serializer: The serializer function to check.
639 mode: The serializer mode, either 'plain' or 'wrap'.
640
641 Returns:
642 `info_arg` - whether the function expects an info argument.
643 """
644 if isinstance(serializer, (staticmethod, classmethod)) or not is_instance_method_from_sig(serializer):
645 raise PydanticUserError(
646 '`@model_serializer` must be applied to instance methods', code='model-serializer-instance-method'
647 )
648
649 sig = _signature_no_eval(serializer)
650 info_arg = _serializer_info_arg(mode, count_positional_required_params(sig))
651 if info_arg is None:
652 raise PydanticUserError(
653 f'Unrecognized model_serializer function signature for {serializer} with `mode={mode}`:{sig}',
654 code='model-serializer-signature',
655 )
656 else:
657 return info_arg
658
659
660def _serializer_info_arg(mode: Literal['plain', 'wrap'], n_positional: int) -> bool | None:
661 if mode == 'plain':
662 if n_positional == 1:
663 # (input_value: Any, /) -> Any
664 return False
665 elif n_positional == 2:
666 # (model: Any, input_value: Any, /) -> Any
667 return True
668 else:
669 assert mode == 'wrap', f"invalid mode: {mode!r}, expected 'plain' or 'wrap'"
670 if n_positional == 2:
671 # (input_value: Any, serializer: SerializerFunctionWrapHandler, /) -> Any
672 return False
673 elif n_positional == 3:
674 # (input_value: Any, serializer: SerializerFunctionWrapHandler, info: SerializationInfo, /) -> Any
675 return True
676
677 return None
678
679
680AnyDecoratorCallable: TypeAlias = (
681 'Union[classmethod[Any, Any, Any], staticmethod[Any, Any], partialmethod[Any], Callable[..., Any]]'
682)
683
684
685def is_instance_method_from_sig(function: AnyDecoratorCallable) -> bool:
686 """Whether the function is an instance method.
687
688 It will consider a function as instance method if the first parameter of
689 function is `self`.
690
691 Args:
692 function: The function to check.
693
694 Returns:
695 `True` if the function is an instance method, `False` otherwise.
696 """
697 sig = _signature_no_eval(unwrap_wrapped_function(function))
698 first = next(iter(sig.parameters.values()), None)
699 if first and first.name == 'self':
700 return True
701 return False
702
703
704def ensure_classmethod_based_on_signature(function: AnyDecoratorCallable) -> Any:
705 """Apply the `@classmethod` decorator on the function.
706
707 Args:
708 function: The function to apply the decorator on.
709
710 Return:
711 The `@classmethod` decorator applied function.
712 """
713 if not isinstance(
714 unwrap_wrapped_function(function, unwrap_class_static_method=False), classmethod
715 ) and _is_classmethod_from_sig(function):
716 return classmethod(function) # type: ignore[arg-type]
717 return function
718
719
720def _is_classmethod_from_sig(function: AnyDecoratorCallable) -> bool:
721 sig = _signature_no_eval(unwrap_wrapped_function(function))
722 first = next(iter(sig.parameters.values()), None)
723 if first and first.name == 'cls':
724 return True
725 return False
726
727
728def unwrap_wrapped_function(
729 func: Any,
730 *,
731 unwrap_partial: bool = True,
732 unwrap_class_static_method: bool = True,
733) -> Any:
734 """Recursively unwraps a wrapped function until the underlying function is reached.
735 This handles property, functools.partial, functools.partialmethod, staticmethod, and classmethod.
736
737 Args:
738 func: The function to unwrap.
739 unwrap_partial: If True (default), unwrap partial and partialmethod decorators.
740 unwrap_class_static_method: If True (default), also unwrap classmethod and staticmethod
741 decorators. If False, only unwrap partial and partialmethod decorators.
742
743 Returns:
744 The underlying function of the wrapped function.
745 """
746 # Define the types we want to check against as a single tuple.
747 unwrap_types = (
748 (property, cached_property)
749 + ((partial, partialmethod) if unwrap_partial else ())
750 + ((staticmethod, classmethod) if unwrap_class_static_method else ())
751 )
752
753 while isinstance(func, unwrap_types):
754 if unwrap_class_static_method and isinstance(func, (classmethod, staticmethod)):
755 func = func.__func__
756 elif isinstance(func, (partial, partialmethod)):
757 func = func.func
758 elif isinstance(func, property):
759 func = func.fget # arbitrary choice, convenient for computed fields
760 else:
761 # Make coverage happy as it can only get here in the last possible case
762 assert isinstance(func, cached_property)
763 func = func.func # type: ignore
764
765 return func
766
767
768_function_like = (
769 partial,
770 partialmethod,
771 types.FunctionType,
772 types.BuiltinFunctionType,
773 types.MethodType,
774 types.WrapperDescriptorType,
775 types.MethodWrapperType,
776 types.MemberDescriptorType,
777)
778
779
780def get_callable_return_type(
781 callable_obj: Any,
782 globalns: GlobalsNamespace | None = None,
783 localns: MappingNamespace | None = None,
784) -> Any | PydanticUndefinedType:
785 """Get the callable return type.
786
787 Args:
788 callable_obj: The callable to analyze.
789 globalns: The globals namespace to use during type annotation evaluation.
790 localns: The locals namespace to use during type annotation evaluation.
791
792 Returns:
793 The function return type.
794 """
795 if isinstance(callable_obj, type):
796 # types are callables, and we assume the return type
797 # is the type itself (e.g. `int()` results in an instance of `int`).
798 return callable_obj
799
800 if not isinstance(callable_obj, _function_like):
801 call_func = getattr(type(callable_obj), '__call__', None) # noqa: B004
802 if call_func is not None:
803 callable_obj = call_func
804
805 hints = get_function_type_hints(
806 unwrap_wrapped_function(callable_obj),
807 include_keys={'return'},
808 globalns=globalns,
809 localns=localns,
810 )
811 return hints.get('return', PydanticUndefined)
812
813
814def count_positional_required_params(sig: Signature) -> int:
815 """Get the number of positional (required) arguments of a signature.
816
817 This function should only be used to inspect signatures of validation and serialization functions.
818 The first argument (the value being serialized or validated) is counted as a required argument
819 even if a default value exists.
820
821 Returns:
822 The number of positional arguments of a signature.
823 """
824 parameters = list(sig.parameters.values())
825 return sum(
826 1
827 for param in parameters
828 if can_be_positional(param)
829 # First argument is the value being validated/serialized, and can have a default value
830 # (e.g. `float`, which has signature `(x=0, /)`). We assume other parameters (the info arg
831 # for instance) should be required, and thus without any default value.
832 and (param.default is Parameter.empty or param is parameters[0])
833 )
834
835
836def ensure_property(f: Any) -> Any:
837 """Ensure that a function is a `property` or `cached_property`, or is a valid descriptor.
838
839 Args:
840 f: The function to check.
841
842 Returns:
843 The function, or a `property` or `cached_property` instance wrapping the function.
844 """
845 if ismethoddescriptor(f) or isdatadescriptor(f):
846 return f
847 else:
848 return property(f)
849
850
851def _signature_no_eval(f: Callable[..., Any]) -> Signature:
852 """Get the signature of a callable without evaluating any annotations."""
853 if sys.version_info >= (3, 14):
854 from annotationlib import Format
855
856 return signature(f, annotation_format=Format.FORWARDREF)
857 else:
858 return signature(f)