1from __future__ import annotations
2
3import inspect
4import os
5from collections.abc import Mapping, Sequence
6from typing import TYPE_CHECKING, Any, Union
7
8from pydantic_core import CoreSchema, core_schema
9from pydantic_core import validate_core_schema as _validate_core_schema
10from typing_extensions import TypeGuard, get_args, get_origin
11from typing_inspection import typing_objects
12
13from . import _repr
14from ._typing_extra import is_generic_alias
15
16if TYPE_CHECKING:
17 from rich.console import Console
18
19AnyFunctionSchema = Union[
20 core_schema.AfterValidatorFunctionSchema,
21 core_schema.BeforeValidatorFunctionSchema,
22 core_schema.WrapValidatorFunctionSchema,
23 core_schema.PlainValidatorFunctionSchema,
24]
25
26
27FunctionSchemaWithInnerSchema = Union[
28 core_schema.AfterValidatorFunctionSchema,
29 core_schema.BeforeValidatorFunctionSchema,
30 core_schema.WrapValidatorFunctionSchema,
31]
32
33CoreSchemaField = Union[
34 core_schema.ModelField, core_schema.DataclassField, core_schema.TypedDictField, core_schema.ComputedField
35]
36CoreSchemaOrField = Union[core_schema.CoreSchema, CoreSchemaField]
37
38_CORE_SCHEMA_FIELD_TYPES = {'typed-dict-field', 'dataclass-field', 'model-field', 'computed-field'}
39_FUNCTION_WITH_INNER_SCHEMA_TYPES = {'function-before', 'function-after', 'function-wrap'}
40_LIST_LIKE_SCHEMA_WITH_ITEMS_TYPES = {'list', 'set', 'frozenset'}
41
42
43def is_core_schema(
44 schema: CoreSchemaOrField,
45) -> TypeGuard[CoreSchema]:
46 return schema['type'] not in _CORE_SCHEMA_FIELD_TYPES
47
48
49def is_core_schema_field(
50 schema: CoreSchemaOrField,
51) -> TypeGuard[CoreSchemaField]:
52 return schema['type'] in _CORE_SCHEMA_FIELD_TYPES
53
54
55def is_function_with_inner_schema(
56 schema: CoreSchemaOrField,
57) -> TypeGuard[FunctionSchemaWithInnerSchema]:
58 return schema['type'] in _FUNCTION_WITH_INNER_SCHEMA_TYPES
59
60
61def is_list_like_schema_with_items_schema(
62 schema: CoreSchema,
63) -> TypeGuard[core_schema.ListSchema | core_schema.SetSchema | core_schema.FrozenSetSchema]:
64 return schema['type'] in _LIST_LIKE_SCHEMA_WITH_ITEMS_TYPES
65
66
67def get_type_ref(type_: Any, args_override: tuple[type[Any], ...] | None = None) -> str:
68 """Produces the ref to be used for this type by pydantic_core's core schemas.
69
70 This `args_override` argument was added for the purpose of creating valid recursive references
71 when creating generic models without needing to create a concrete class.
72 """
73 origin = get_origin(type_) or type_
74
75 args = get_args(type_) if is_generic_alias(type_) else (args_override or ())
76 generic_metadata = getattr(type_, '__pydantic_generic_metadata__', None)
77 if generic_metadata:
78 origin = generic_metadata['origin'] or origin
79 args = generic_metadata['args'] or args
80
81 module_name = getattr(origin, '__module__', '<No __module__>')
82 if typing_objects.is_typealiastype(origin):
83 type_ref = f'{module_name}.{origin.__name__}:{id(origin)}'
84 else:
85 try:
86 qualname = getattr(origin, '__qualname__', f'<No __qualname__: {origin}>')
87 except Exception:
88 qualname = getattr(origin, '__qualname__', '<No __qualname__>')
89 type_ref = f'{module_name}.{qualname}:{id(origin)}'
90
91 arg_refs: list[str] = []
92 for arg in args:
93 if isinstance(arg, str):
94 # Handle string literals as a special case; we may be able to remove this special handling if we
95 # wrap them in a ForwardRef at some point.
96 arg_ref = f'{arg}:str-{id(arg)}'
97 else:
98 arg_ref = f'{_repr.display_as_type(arg)}:{id(arg)}'
99 arg_refs.append(arg_ref)
100 if arg_refs:
101 type_ref = f'{type_ref}[{",".join(arg_refs)}]'
102 return type_ref
103
104
105def get_ref(s: core_schema.CoreSchema) -> None | str:
106 """Get the ref from the schema if it has one.
107 This exists just for type checking to work correctly.
108 """
109 return s.get('ref', None)
110
111
112def validate_core_schema(schema: CoreSchema) -> CoreSchema:
113 if os.getenv('PYDANTIC_VALIDATE_CORE_SCHEMAS'):
114 return _validate_core_schema(schema)
115 return schema
116
117
118def _clean_schema_for_pretty_print(obj: Any, strip_metadata: bool = True) -> Any: # pragma: no cover
119 """A utility function to remove irrelevant information from a core schema."""
120 if isinstance(obj, Mapping):
121 new_dct = {}
122 for k, v in obj.items():
123 if k == 'metadata' and strip_metadata:
124 new_metadata = {}
125
126 for meta_k, meta_v in v.items():
127 if meta_k in ('pydantic_js_functions', 'pydantic_js_annotation_functions'):
128 new_metadata['js_metadata'] = '<stripped>'
129 else:
130 new_metadata[meta_k] = _clean_schema_for_pretty_print(meta_v, strip_metadata=strip_metadata)
131
132 if list(new_metadata.keys()) == ['js_metadata']:
133 new_metadata = {'<stripped>'}
134
135 new_dct[k] = new_metadata
136 # Remove some defaults:
137 elif k in ('custom_init', 'root_model') and not v:
138 continue
139 else:
140 new_dct[k] = _clean_schema_for_pretty_print(v, strip_metadata=strip_metadata)
141
142 return new_dct
143 elif isinstance(obj, Sequence) and not isinstance(obj, str):
144 return [_clean_schema_for_pretty_print(v, strip_metadata=strip_metadata) for v in obj]
145 else:
146 return obj
147
148
149def pretty_print_core_schema(
150 val: Any,
151 *,
152 console: Console | None = None,
153 max_depth: int | None = None,
154 strip_metadata: bool = True,
155) -> None: # pragma: no cover
156 """Pretty-print a core schema using the `rich` library.
157
158 Args:
159 val: The core schema to print, or a Pydantic model/dataclass/type adapter
160 (in which case the cached core schema is fetched and printed).
161 console: A rich console to use when printing. Defaults to the global rich console instance.
162 max_depth: The number of nesting levels which may be printed.
163 strip_metadata: Whether to strip metadata in the output. If `True` any known core metadata
164 attributes will be stripped (but custom attributes are kept). Defaults to `True`.
165 """
166 # lazy import:
167 from rich.pretty import pprint
168
169 # circ. imports:
170 from pydantic import BaseModel, TypeAdapter
171 from pydantic.dataclasses import is_pydantic_dataclass
172
173 if (inspect.isclass(val) and issubclass(val, BaseModel)) or is_pydantic_dataclass(val):
174 val = val.__pydantic_core_schema__
175 if isinstance(val, TypeAdapter):
176 val = val.core_schema
177 cleaned_schema = _clean_schema_for_pretty_print(val, strip_metadata=strip_metadata)
178
179 pprint(cleaned_schema, console=console, max_depth=max_depth)
180
181
182pps = pretty_print_core_schema