Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pydantic/_internal/_core_utils.py: 83%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

59 statements  

1from __future__ import annotations 

2 

3import inspect 

4import os 

5from collections.abc import Mapping, Sequence 

6from typing import TYPE_CHECKING, Any, Union 

7 

8from pydantic_core import CoreSchema, core_schema 

9from pydantic_core import validate_core_schema as _validate_core_schema 

10from typing_extensions import TypeGuard, get_args, get_origin 

11from typing_inspection import typing_objects 

12 

13from . import _repr 

14from ._typing_extra import is_generic_alias 

15 

16if TYPE_CHECKING: 

17 from rich.console import Console 

18 

19AnyFunctionSchema = Union[ 

20 core_schema.AfterValidatorFunctionSchema, 

21 core_schema.BeforeValidatorFunctionSchema, 

22 core_schema.WrapValidatorFunctionSchema, 

23 core_schema.PlainValidatorFunctionSchema, 

24] 

25 

26 

27FunctionSchemaWithInnerSchema = Union[ 

28 core_schema.AfterValidatorFunctionSchema, 

29 core_schema.BeforeValidatorFunctionSchema, 

30 core_schema.WrapValidatorFunctionSchema, 

31] 

32 

33CoreSchemaField = Union[ 

34 core_schema.ModelField, core_schema.DataclassField, core_schema.TypedDictField, core_schema.ComputedField 

35] 

36CoreSchemaOrField = Union[core_schema.CoreSchema, CoreSchemaField] 

37 

38_CORE_SCHEMA_FIELD_TYPES = {'typed-dict-field', 'dataclass-field', 'model-field', 'computed-field'} 

39_FUNCTION_WITH_INNER_SCHEMA_TYPES = {'function-before', 'function-after', 'function-wrap'} 

40_LIST_LIKE_SCHEMA_WITH_ITEMS_TYPES = {'list', 'set', 'frozenset'} 

41 

42 

43def is_core_schema( 

44 schema: CoreSchemaOrField, 

45) -> TypeGuard[CoreSchema]: 

46 return schema['type'] not in _CORE_SCHEMA_FIELD_TYPES 

47 

48 

49def is_core_schema_field( 

50 schema: CoreSchemaOrField, 

51) -> TypeGuard[CoreSchemaField]: 

52 return schema['type'] in _CORE_SCHEMA_FIELD_TYPES 

53 

54 

55def is_function_with_inner_schema( 

56 schema: CoreSchemaOrField, 

57) -> TypeGuard[FunctionSchemaWithInnerSchema]: 

58 return schema['type'] in _FUNCTION_WITH_INNER_SCHEMA_TYPES 

59 

60 

61def is_list_like_schema_with_items_schema( 

62 schema: CoreSchema, 

63) -> TypeGuard[core_schema.ListSchema | core_schema.SetSchema | core_schema.FrozenSetSchema]: 

64 return schema['type'] in _LIST_LIKE_SCHEMA_WITH_ITEMS_TYPES 

65 

66 

67def get_type_ref(type_: Any, args_override: tuple[type[Any], ...] | None = None) -> str: 

68 """Produces the ref to be used for this type by pydantic_core's core schemas. 

69 

70 This `args_override` argument was added for the purpose of creating valid recursive references 

71 when creating generic models without needing to create a concrete class. 

72 """ 

73 origin = get_origin(type_) or type_ 

74 

75 args = get_args(type_) if is_generic_alias(type_) else (args_override or ()) 

76 generic_metadata = getattr(type_, '__pydantic_generic_metadata__', None) 

77 if generic_metadata: 

78 origin = generic_metadata['origin'] or origin 

79 args = generic_metadata['args'] or args 

80 

81 module_name = getattr(origin, '__module__', '<No __module__>') 

82 if typing_objects.is_typealiastype(origin): 

83 type_ref = f'{module_name}.{origin.__name__}:{id(origin)}' 

84 else: 

85 try: 

86 qualname = getattr(origin, '__qualname__', f'<No __qualname__: {origin}>') 

87 except Exception: 

88 qualname = getattr(origin, '__qualname__', '<No __qualname__>') 

89 type_ref = f'{module_name}.{qualname}:{id(origin)}' 

90 

91 arg_refs: list[str] = [] 

92 for arg in args: 

93 if isinstance(arg, str): 

94 # Handle string literals as a special case; we may be able to remove this special handling if we 

95 # wrap them in a ForwardRef at some point. 

96 arg_ref = f'{arg}:str-{id(arg)}' 

97 else: 

98 arg_ref = f'{_repr.display_as_type(arg)}:{id(arg)}' 

99 arg_refs.append(arg_ref) 

100 if arg_refs: 

101 type_ref = f'{type_ref}[{",".join(arg_refs)}]' 

102 return type_ref 

103 

104 

105def get_ref(s: core_schema.CoreSchema) -> None | str: 

106 """Get the ref from the schema if it has one. 

107 This exists just for type checking to work correctly. 

108 """ 

109 return s.get('ref', None) 

110 

111 

112def validate_core_schema(schema: CoreSchema) -> CoreSchema: 

113 if os.getenv('PYDANTIC_VALIDATE_CORE_SCHEMAS'): 

114 return _validate_core_schema(schema) 

115 return schema 

116 

117 

118def _clean_schema_for_pretty_print(obj: Any, strip_metadata: bool = True) -> Any: # pragma: no cover 

119 """A utility function to remove irrelevant information from a core schema.""" 

120 if isinstance(obj, Mapping): 

121 new_dct = {} 

122 for k, v in obj.items(): 

123 if k == 'metadata' and strip_metadata: 

124 new_metadata = {} 

125 

126 for meta_k, meta_v in v.items(): 

127 if meta_k in ('pydantic_js_functions', 'pydantic_js_annotation_functions'): 

128 new_metadata['js_metadata'] = '<stripped>' 

129 else: 

130 new_metadata[meta_k] = _clean_schema_for_pretty_print(meta_v, strip_metadata=strip_metadata) 

131 

132 if list(new_metadata.keys()) == ['js_metadata']: 

133 new_metadata = {'<stripped>'} 

134 

135 new_dct[k] = new_metadata 

136 # Remove some defaults: 

137 elif k in ('custom_init', 'root_model') and not v: 

138 continue 

139 else: 

140 new_dct[k] = _clean_schema_for_pretty_print(v, strip_metadata=strip_metadata) 

141 

142 return new_dct 

143 elif isinstance(obj, Sequence) and not isinstance(obj, str): 

144 return [_clean_schema_for_pretty_print(v, strip_metadata=strip_metadata) for v in obj] 

145 else: 

146 return obj 

147 

148 

149def pretty_print_core_schema( 

150 val: Any, 

151 *, 

152 console: Console | None = None, 

153 max_depth: int | None = None, 

154 strip_metadata: bool = True, 

155) -> None: # pragma: no cover 

156 """Pretty-print a core schema using the `rich` library. 

157 

158 Args: 

159 val: The core schema to print, or a Pydantic model/dataclass/type adapter 

160 (in which case the cached core schema is fetched and printed). 

161 console: A rich console to use when printing. Defaults to the global rich console instance. 

162 max_depth: The number of nesting levels which may be printed. 

163 strip_metadata: Whether to strip metadata in the output. If `True` any known core metadata 

164 attributes will be stripped (but custom attributes are kept). Defaults to `True`. 

165 """ 

166 # lazy import: 

167 from rich.pretty import pprint 

168 

169 # circ. imports: 

170 from pydantic import BaseModel, TypeAdapter 

171 from pydantic.dataclasses import is_pydantic_dataclass 

172 

173 if (inspect.isclass(val) and issubclass(val, BaseModel)) or is_pydantic_dataclass(val): 

174 val = val.__pydantic_core_schema__ 

175 if isinstance(val, TypeAdapter): 

176 val = val.core_schema 

177 cleaned_schema = _clean_schema_for_pretty_print(val, strip_metadata=strip_metadata) 

178 

179 pprint(cleaned_schema, console=console, max_depth=max_depth) 

180 

181 

182pps = pretty_print_core_schema