1from __future__ import annotations
2
3import inspect
4from collections.abc import Mapping, Sequence
5from typing import TYPE_CHECKING, Any, Union
6
7from pydantic_core import CoreSchema, core_schema
8from typing_extensions import TypeGuard, get_args, get_origin
9from typing_inspection import typing_objects
10
11from . import _repr
12from ._typing_extra import is_generic_alias
13
14if TYPE_CHECKING:
15 from rich.console import Console
16
17AnyFunctionSchema = Union[
18 core_schema.AfterValidatorFunctionSchema,
19 core_schema.BeforeValidatorFunctionSchema,
20 core_schema.WrapValidatorFunctionSchema,
21 core_schema.PlainValidatorFunctionSchema,
22]
23
24
25FunctionSchemaWithInnerSchema = Union[
26 core_schema.AfterValidatorFunctionSchema,
27 core_schema.BeforeValidatorFunctionSchema,
28 core_schema.WrapValidatorFunctionSchema,
29]
30
31CoreSchemaField = Union[
32 core_schema.ModelField, core_schema.DataclassField, core_schema.TypedDictField, core_schema.ComputedField
33]
34CoreSchemaOrField = Union[core_schema.CoreSchema, CoreSchemaField]
35
36_CORE_SCHEMA_FIELD_TYPES = {'typed-dict-field', 'dataclass-field', 'model-field', 'computed-field'}
37_FUNCTION_WITH_INNER_SCHEMA_TYPES = {'function-before', 'function-after', 'function-wrap'}
38_LIST_LIKE_SCHEMA_WITH_ITEMS_TYPES = {'list', 'set', 'frozenset'}
39
40
41def is_core_schema(
42 schema: CoreSchemaOrField,
43) -> TypeGuard[CoreSchema]:
44 return schema['type'] not in _CORE_SCHEMA_FIELD_TYPES
45
46
47def is_core_schema_field(
48 schema: CoreSchemaOrField,
49) -> TypeGuard[CoreSchemaField]:
50 return schema['type'] in _CORE_SCHEMA_FIELD_TYPES
51
52
53def is_function_with_inner_schema(
54 schema: CoreSchemaOrField,
55) -> TypeGuard[FunctionSchemaWithInnerSchema]:
56 return schema['type'] in _FUNCTION_WITH_INNER_SCHEMA_TYPES
57
58
59def is_list_like_schema_with_items_schema(
60 schema: CoreSchema,
61) -> TypeGuard[core_schema.ListSchema | core_schema.SetSchema | core_schema.FrozenSetSchema]:
62 return schema['type'] in _LIST_LIKE_SCHEMA_WITH_ITEMS_TYPES
63
64
65def get_type_ref(type_: Any, args_override: tuple[type[Any], ...] | None = None) -> str:
66 """Produces the ref to be used for this type by pydantic_core's core schemas.
67
68 This `args_override` argument was added for the purpose of creating valid recursive references
69 when creating generic models without needing to create a concrete class.
70 """
71 origin = get_origin(type_) or type_
72
73 args = get_args(type_) if is_generic_alias(type_) else (args_override or ())
74 generic_metadata = getattr(type_, '__pydantic_generic_metadata__', None)
75 if generic_metadata:
76 origin = generic_metadata['origin'] or origin
77 args = generic_metadata['args'] or args
78
79 module_name = getattr(origin, '__module__', '<No __module__>')
80 if typing_objects.is_typealiastype(origin):
81 type_ref = f'{module_name}.{origin.__name__}:{id(origin)}'
82 else:
83 try:
84 qualname = getattr(origin, '__qualname__', f'<No __qualname__: {origin}>')
85 except Exception:
86 qualname = getattr(origin, '__qualname__', '<No __qualname__>')
87 type_ref = f'{module_name}.{qualname}:{id(origin)}'
88
89 arg_refs: list[str] = []
90 for arg in args:
91 if isinstance(arg, str):
92 # Handle string literals as a special case; we may be able to remove this special handling if we
93 # wrap them in a ForwardRef at some point.
94 arg_ref = f'{arg}:str-{id(arg)}'
95 else:
96 arg_ref = f'{_repr.display_as_type(arg)}:{id(arg)}'
97 arg_refs.append(arg_ref)
98 if arg_refs:
99 type_ref = f'{type_ref}[{",".join(arg_refs)}]'
100 return type_ref
101
102
103def get_ref(s: core_schema.CoreSchema) -> None | str:
104 """Get the ref from the schema if it has one.
105 This exists just for type checking to work correctly.
106 """
107 return s.get('ref', None)
108
109
110def _clean_schema_for_pretty_print(obj: Any, strip_metadata: bool = True) -> Any: # pragma: no cover
111 """A utility function to remove irrelevant information from a core schema."""
112 if isinstance(obj, Mapping):
113 new_dct = {}
114 for k, v in obj.items():
115 if k == 'metadata' and strip_metadata:
116 new_metadata = {}
117
118 for meta_k, meta_v in v.items():
119 if meta_k in ('pydantic_js_functions', 'pydantic_js_annotation_functions'):
120 new_metadata['js_metadata'] = '<stripped>'
121 else:
122 new_metadata[meta_k] = _clean_schema_for_pretty_print(meta_v, strip_metadata=strip_metadata)
123
124 if list(new_metadata.keys()) == ['js_metadata']:
125 new_metadata = {'<stripped>'}
126
127 new_dct[k] = new_metadata
128 # Remove some defaults:
129 elif k in ('custom_init', 'root_model') and not v:
130 continue
131 else:
132 new_dct[k] = _clean_schema_for_pretty_print(v, strip_metadata=strip_metadata)
133
134 return new_dct
135 elif isinstance(obj, Sequence) and not isinstance(obj, str):
136 return [_clean_schema_for_pretty_print(v, strip_metadata=strip_metadata) for v in obj]
137 else:
138 return obj
139
140
141def pretty_print_core_schema(
142 val: Any,
143 *,
144 console: Console | None = None,
145 max_depth: int | None = None,
146 strip_metadata: bool = True,
147) -> None: # pragma: no cover
148 """Pretty-print a core schema using the `rich` library.
149
150 Args:
151 val: The core schema to print, or a Pydantic model/dataclass/type adapter
152 (in which case the cached core schema is fetched and printed).
153 console: A rich console to use when printing. Defaults to the global rich console instance.
154 max_depth: The number of nesting levels which may be printed.
155 strip_metadata: Whether to strip metadata in the output. If `True` any known core metadata
156 attributes will be stripped (but custom attributes are kept). Defaults to `True`.
157 """
158 # lazy import:
159 from rich.pretty import pprint
160
161 # circ. imports:
162 from pydantic import BaseModel, TypeAdapter
163 from pydantic.dataclasses import is_pydantic_dataclass
164
165 if (inspect.isclass(val) and issubclass(val, BaseModel)) or is_pydantic_dataclass(val):
166 val = val.__pydantic_core_schema__
167 if isinstance(val, TypeAdapter):
168 val = val.core_schema
169 cleaned_schema = _clean_schema_for_pretty_print(val, strip_metadata=strip_metadata)
170
171 pprint(cleaned_schema, console=console, max_depth=max_depth)
172
173
174pps = pretty_print_core_schema