1"""Pluggable schema validator for pydantic."""
2
3from __future__ import annotations
4
5import functools
6from collections.abc import Iterable
7from typing import TYPE_CHECKING, Any, Callable, Literal, TypeVar
8
9from pydantic_core import CoreConfig, CoreSchema, SchemaValidator, ValidationError
10from typing_extensions import ParamSpec
11
12if TYPE_CHECKING:
13 from . import BaseValidateHandlerProtocol, PydanticPluginProtocol, SchemaKind, SchemaTypePath
14
15
16P = ParamSpec('P')
17R = TypeVar('R')
18Event = Literal['on_validate_python', 'on_validate_json', 'on_validate_strings']
19events: list[Event] = list(Event.__args__) # type: ignore
20
21
22def create_schema_validator(
23 schema: CoreSchema,
24 schema_type: Any,
25 schema_type_module: str,
26 schema_type_name: str,
27 schema_kind: SchemaKind,
28 config: CoreConfig | None = None,
29 plugin_settings: dict[str, Any] | None = None,
30) -> SchemaValidator | PluggableSchemaValidator:
31 """Create a `SchemaValidator` or `PluggableSchemaValidator` if plugins are installed.
32
33 Returns:
34 If plugins are installed then return `PluggableSchemaValidator`, otherwise return `SchemaValidator`.
35 """
36 from . import SchemaTypePath
37 from ._loader import get_plugins
38
39 plugins = get_plugins()
40 if plugins:
41 return PluggableSchemaValidator(
42 schema,
43 schema_type,
44 SchemaTypePath(schema_type_module, schema_type_name),
45 schema_kind,
46 config,
47 plugins,
48 plugin_settings or {},
49 )
50 else:
51 return SchemaValidator(schema, config)
52
53
54class PluggableSchemaValidator:
55 """Pluggable schema validator."""
56
57 __slots__ = '_schema_validator', 'validate_json', 'validate_python', 'validate_strings'
58
59 def __init__(
60 self,
61 schema: CoreSchema,
62 schema_type: Any,
63 schema_type_path: SchemaTypePath,
64 schema_kind: SchemaKind,
65 config: CoreConfig | None,
66 plugins: Iterable[PydanticPluginProtocol],
67 plugin_settings: dict[str, Any],
68 ) -> None:
69 self._schema_validator = SchemaValidator(schema, config)
70
71 python_event_handlers: list[BaseValidateHandlerProtocol] = []
72 json_event_handlers: list[BaseValidateHandlerProtocol] = []
73 strings_event_handlers: list[BaseValidateHandlerProtocol] = []
74 for plugin in plugins:
75 try:
76 p, j, s = plugin.new_schema_validator(
77 schema, schema_type, schema_type_path, schema_kind, config, plugin_settings
78 )
79 except TypeError as e: # pragma: no cover
80 raise TypeError(f'Error using plugin `{plugin.__module__}:{plugin.__class__.__name__}`: {e}') from e
81 if p is not None:
82 python_event_handlers.append(p)
83 if j is not None:
84 json_event_handlers.append(j)
85 if s is not None:
86 strings_event_handlers.append(s)
87
88 self.validate_python = build_wrapper(self._schema_validator.validate_python, python_event_handlers)
89 self.validate_json = build_wrapper(self._schema_validator.validate_json, json_event_handlers)
90 self.validate_strings = build_wrapper(self._schema_validator.validate_strings, strings_event_handlers)
91
92 def __getattr__(self, name: str) -> Any:
93 return getattr(self._schema_validator, name)
94
95
96def build_wrapper(func: Callable[P, R], event_handlers: list[BaseValidateHandlerProtocol]) -> Callable[P, R]:
97 if not event_handlers:
98 return func
99 else:
100 on_enters = tuple(h.on_enter for h in event_handlers if filter_handlers(h, 'on_enter'))
101 on_successes = tuple(h.on_success for h in event_handlers if filter_handlers(h, 'on_success'))
102 on_errors = tuple(h.on_error for h in event_handlers if filter_handlers(h, 'on_error'))
103 on_exceptions = tuple(h.on_exception for h in event_handlers if filter_handlers(h, 'on_exception'))
104
105 @functools.wraps(func)
106 def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
107 for on_enter_handler in on_enters:
108 on_enter_handler(*args, **kwargs)
109
110 try:
111 result = func(*args, **kwargs)
112 except ValidationError as error:
113 for on_error_handler in on_errors:
114 on_error_handler(error)
115 raise
116 except Exception as exception:
117 for on_exception_handler in on_exceptions:
118 on_exception_handler(exception)
119 raise
120 else:
121 for on_success_handler in on_successes:
122 on_success_handler(result)
123 return result
124
125 return wrapper
126
127
128def filter_handlers(handler_cls: BaseValidateHandlerProtocol, method_name: str) -> bool:
129 """Filter out handler methods which are not implemented by the plugin directly - e.g. are missing
130 or are inherited from the protocol.
131 """
132 handler = getattr(handler_cls, method_name, None)
133 if handler is None:
134 return False
135 elif handler.__module__ == 'pydantic.plugin':
136 # this is the original handler, from the protocol due to runtime inheritance
137 # we don't want to call it
138 return False
139 else:
140 return True