Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pydantic/_internal/_known_annotated_metadata.py: 57%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

159 statements  

1from __future__ import annotations 

2 

3from collections import defaultdict 

4from collections.abc import Iterable 

5from copy import copy 

6from functools import lru_cache, partial 

7from typing import TYPE_CHECKING, Any 

8 

9from pydantic_core import CoreSchema, PydanticCustomError, ValidationError, to_jsonable_python 

10from pydantic_core import core_schema as cs 

11 

12from ._fields import PydanticMetadata 

13from ._import_utils import import_cached_field_info 

14 

15if TYPE_CHECKING: 

16 pass 

17 

18STRICT = {'strict'} 

19FAIL_FAST = {'fail_fast'} 

20LENGTH_CONSTRAINTS = {'min_length', 'max_length'} 

21INEQUALITY = {'le', 'ge', 'lt', 'gt'} 

22NUMERIC_CONSTRAINTS = {'multiple_of', *INEQUALITY} 

23ALLOW_INF_NAN = {'allow_inf_nan'} 

24 

25STR_CONSTRAINTS = { 

26 *LENGTH_CONSTRAINTS, 

27 *STRICT, 

28 'strip_whitespace', 

29 'to_lower', 

30 'to_upper', 

31 'pattern', 

32 'coerce_numbers_to_str', 

33} 

34BYTES_CONSTRAINTS = {*LENGTH_CONSTRAINTS, *STRICT} 

35 

36LIST_CONSTRAINTS = {*LENGTH_CONSTRAINTS, *STRICT, *FAIL_FAST} 

37TUPLE_CONSTRAINTS = {*LENGTH_CONSTRAINTS, *STRICT, *FAIL_FAST} 

38SET_CONSTRAINTS = {*LENGTH_CONSTRAINTS, *STRICT, *FAIL_FAST} 

39DICT_CONSTRAINTS = {*LENGTH_CONSTRAINTS, *STRICT} 

40GENERATOR_CONSTRAINTS = {*LENGTH_CONSTRAINTS, *STRICT} 

41SEQUENCE_CONSTRAINTS = {*LENGTH_CONSTRAINTS, *FAIL_FAST} 

42 

43FLOAT_CONSTRAINTS = {*NUMERIC_CONSTRAINTS, *ALLOW_INF_NAN, *STRICT} 

44DECIMAL_CONSTRAINTS = {'max_digits', 'decimal_places', *FLOAT_CONSTRAINTS} 

45INT_CONSTRAINTS = {*NUMERIC_CONSTRAINTS, *ALLOW_INF_NAN, *STRICT} 

46BOOL_CONSTRAINTS = STRICT 

47UUID_CONSTRAINTS = STRICT 

48 

49DATE_TIME_CONSTRAINTS = {*NUMERIC_CONSTRAINTS, *STRICT} 

50TIMEDELTA_CONSTRAINTS = {*NUMERIC_CONSTRAINTS, *STRICT} 

51TIME_CONSTRAINTS = {*NUMERIC_CONSTRAINTS, *STRICT} 

52LAX_OR_STRICT_CONSTRAINTS = STRICT 

53ENUM_CONSTRAINTS = STRICT 

54COMPLEX_CONSTRAINTS = STRICT 

55 

56UNION_CONSTRAINTS = {'union_mode'} 

57URL_CONSTRAINTS = { 

58 'max_length', 

59 'allowed_schemes', 

60 'host_required', 

61 'default_host', 

62 'default_port', 

63 'default_path', 

64} 

65 

66TEXT_SCHEMA_TYPES = ('str', 'bytes', 'url', 'multi-host-url') 

67SEQUENCE_SCHEMA_TYPES = ('list', 'tuple', 'set', 'frozenset', 'generator', *TEXT_SCHEMA_TYPES) 

68NUMERIC_SCHEMA_TYPES = ('float', 'int', 'date', 'time', 'timedelta', 'datetime') 

69 

70CONSTRAINTS_TO_ALLOWED_SCHEMAS: dict[str, set[str]] = defaultdict(set) 

71 

72constraint_schema_pairings: list[tuple[set[str], tuple[str, ...]]] = [ 

73 (STR_CONSTRAINTS, TEXT_SCHEMA_TYPES), 

74 (BYTES_CONSTRAINTS, ('bytes',)), 

75 (LIST_CONSTRAINTS, ('list',)), 

76 (TUPLE_CONSTRAINTS, ('tuple',)), 

77 (SET_CONSTRAINTS, ('set', 'frozenset')), 

78 (DICT_CONSTRAINTS, ('dict',)), 

79 (GENERATOR_CONSTRAINTS, ('generator',)), 

80 (FLOAT_CONSTRAINTS, ('float',)), 

81 (INT_CONSTRAINTS, ('int',)), 

82 (DATE_TIME_CONSTRAINTS, ('date', 'time', 'datetime', 'timedelta')), 

83 # TODO: this is a bit redundant, we could probably avoid some of these 

84 (STRICT, (*TEXT_SCHEMA_TYPES, *SEQUENCE_SCHEMA_TYPES, *NUMERIC_SCHEMA_TYPES, 'typed-dict', 'model')), 

85 (UNION_CONSTRAINTS, ('union',)), 

86 (URL_CONSTRAINTS, ('url', 'multi-host-url')), 

87 (BOOL_CONSTRAINTS, ('bool',)), 

88 (UUID_CONSTRAINTS, ('uuid',)), 

89 (LAX_OR_STRICT_CONSTRAINTS, ('lax-or-strict',)), 

90 (ENUM_CONSTRAINTS, ('enum',)), 

91 (DECIMAL_CONSTRAINTS, ('decimal',)), 

92 (COMPLEX_CONSTRAINTS, ('complex',)), 

93] 

94 

95for constraints, schemas in constraint_schema_pairings: 

96 for c in constraints: 

97 CONSTRAINTS_TO_ALLOWED_SCHEMAS[c].update(schemas) 

98 

99 

100def as_jsonable_value(v: Any) -> Any: 

101 if type(v) not in (int, str, float, bytes, bool, type(None)): 

102 return to_jsonable_python(v) 

103 return v 

104 

105 

106def expand_grouped_metadata(annotations: Iterable[Any]) -> Iterable[Any]: 

107 """Expand the annotations. 

108 

109 Args: 

110 annotations: An iterable of annotations. 

111 

112 Returns: 

113 An iterable of expanded annotations. 

114 

115 Example: 

116 ```python 

117 from annotated_types import Ge, Len 

118 

119 from pydantic._internal._known_annotated_metadata import expand_grouped_metadata 

120 

121 print(list(expand_grouped_metadata([Ge(4), Len(5)]))) 

122 #> [Ge(ge=4), MinLen(min_length=5)] 

123 ``` 

124 """ 

125 import annotated_types as at 

126 

127 FieldInfo = import_cached_field_info() 

128 

129 for annotation in annotations: 

130 if isinstance(annotation, at.GroupedMetadata): 

131 yield from annotation 

132 elif isinstance(annotation, FieldInfo): 

133 yield from annotation.metadata 

134 # this is a bit problematic in that it results in duplicate metadata 

135 # all of our "consumers" can handle it, but it is not ideal 

136 # we probably should split up FieldInfo into: 

137 # - annotated types metadata 

138 # - individual metadata known only to Pydantic 

139 annotation = copy(annotation) 

140 annotation.metadata = [] 

141 yield annotation 

142 else: 

143 yield annotation 

144 

145 

146@lru_cache 

147def _get_at_to_constraint_map() -> dict[type, str]: 

148 """Return a mapping of annotated types to constraints. 

149 

150 Normally, we would define a mapping like this in the module scope, but we can't do that 

151 because we don't permit module level imports of `annotated_types`, in an attempt to speed up 

152 the import time of `pydantic`. We still only want to have this dictionary defined in one place, 

153 so we use this function to cache the result. 

154 """ 

155 import annotated_types as at 

156 

157 return { 

158 at.Gt: 'gt', 

159 at.Ge: 'ge', 

160 at.Lt: 'lt', 

161 at.Le: 'le', 

162 at.MultipleOf: 'multiple_of', 

163 at.MinLen: 'min_length', 

164 at.MaxLen: 'max_length', 

165 } 

166 

167 

168def apply_known_metadata(annotation: Any, schema: CoreSchema) -> CoreSchema | None: # noqa: C901 

169 """Apply `annotation` to `schema` if it is an annotation we know about (Gt, Le, etc.). 

170 Otherwise return `None`. 

171 

172 This does not handle all known annotations. If / when it does, it can always 

173 return a CoreSchema and return the unmodified schema if the annotation should be ignored. 

174 

175 Assumes that GroupedMetadata has already been expanded via `expand_grouped_metadata`. 

176 

177 Args: 

178 annotation: The annotation. 

179 schema: The schema. 

180 

181 Returns: 

182 An updated schema with annotation if it is an annotation we know about, `None` otherwise. 

183 

184 Raises: 

185 PydanticCustomError: If `Predicate` fails. 

186 """ 

187 import annotated_types as at 

188 

189 from ._validators import NUMERIC_VALIDATOR_LOOKUP, forbid_inf_nan_check 

190 

191 schema = schema.copy() 

192 schema_update, other_metadata = collect_known_metadata([annotation]) 

193 schema_type = schema['type'] 

194 

195 chain_schema_constraints: set[str] = { 

196 'pattern', 

197 'strip_whitespace', 

198 'to_lower', 

199 'to_upper', 

200 'coerce_numbers_to_str', 

201 } 

202 chain_schema_steps: list[CoreSchema] = [] 

203 

204 for constraint, value in schema_update.items(): 

205 if constraint not in CONSTRAINTS_TO_ALLOWED_SCHEMAS: 

206 raise ValueError(f'Unknown constraint {constraint}') 

207 allowed_schemas = CONSTRAINTS_TO_ALLOWED_SCHEMAS[constraint] 

208 

209 # if it becomes necessary to handle more than one constraint 

210 # in this recursive case with function-after or function-wrap, we should refactor 

211 # this is a bit challenging because we sometimes want to apply constraints to the inner schema, 

212 # whereas other times we want to wrap the existing schema with a new one that enforces a new constraint. 

213 if schema_type in {'function-before', 'function-wrap', 'function-after'} and constraint == 'strict': 

214 schema['schema'] = apply_known_metadata(annotation, schema['schema']) # type: ignore # schema is function schema 

215 return schema 

216 

217 # if we're allowed to apply constraint directly to the schema, like le to int, do that 

218 if schema_type in allowed_schemas: 

219 if constraint == 'union_mode' and schema_type == 'union': 

220 schema['mode'] = value # type: ignore # schema is UnionSchema 

221 else: 

222 schema[constraint] = value 

223 continue 

224 

225 # else, apply a function after validator to the schema to enforce the corresponding constraint 

226 if constraint in chain_schema_constraints: 

227 

228 def _apply_constraint_with_incompatibility_info( 

229 value: Any, handler: cs.ValidatorFunctionWrapHandler 

230 ) -> Any: 

231 try: 

232 x = handler(value) 

233 except ValidationError as ve: 

234 # if the error is about the type, it's likely that the constraint is incompatible the type of the field 

235 # for example, the following invalid schema wouldn't be caught during schema build, but rather at this point 

236 # with a cryptic 'string_type' error coming from the string validator, 

237 # that we'd rather express as a constraint incompatibility error (TypeError) 

238 # Annotated[list[int], Field(pattern='abc')] 

239 if 'type' in ve.errors()[0]['type']: 

240 raise TypeError( 

241 f"Unable to apply constraint '{constraint}' to supplied value {value} for schema of type '{schema_type}'" # noqa: B023 

242 ) 

243 raise ve 

244 return x 

245 

246 chain_schema_steps.append( 

247 cs.no_info_wrap_validator_function( 

248 _apply_constraint_with_incompatibility_info, cs.str_schema(**{constraint: value}) 

249 ) 

250 ) 

251 elif constraint in NUMERIC_VALIDATOR_LOOKUP: 

252 if constraint in LENGTH_CONSTRAINTS: 

253 inner_schema = schema 

254 while inner_schema['type'] in {'function-before', 'function-wrap', 'function-after'}: 

255 inner_schema = inner_schema['schema'] # type: ignore 

256 inner_schema_type = inner_schema['type'] 

257 if inner_schema_type == 'list' or ( 

258 inner_schema_type == 'json-or-python' and inner_schema['json_schema']['type'] == 'list' # type: ignore 

259 ): 

260 js_constraint_key = 'minItems' if constraint == 'min_length' else 'maxItems' 

261 else: 

262 js_constraint_key = 'minLength' if constraint == 'min_length' else 'maxLength' 

263 else: 

264 js_constraint_key = constraint 

265 

266 schema = cs.no_info_after_validator_function( 

267 partial(NUMERIC_VALIDATOR_LOOKUP[constraint], **{constraint: value}), schema 

268 ) 

269 metadata = schema.get('metadata', {}) 

270 if (existing_json_schema_updates := metadata.get('pydantic_js_updates')) is not None: 

271 metadata['pydantic_js_updates'] = { 

272 **existing_json_schema_updates, 

273 **{js_constraint_key: as_jsonable_value(value)}, 

274 } 

275 else: 

276 metadata['pydantic_js_updates'] = {js_constraint_key: as_jsonable_value(value)} 

277 schema['metadata'] = metadata 

278 elif constraint == 'allow_inf_nan' and value is False: 

279 schema = cs.no_info_after_validator_function( 

280 forbid_inf_nan_check, 

281 schema, 

282 ) 

283 else: 

284 # It's rare that we'd get here, but it's possible if we add a new constraint and forget to handle it 

285 # Most constraint errors are caught at runtime during attempted application 

286 raise RuntimeError(f"Unable to apply constraint '{constraint}' to schema of type '{schema_type}'") 

287 

288 for annotation in other_metadata: 

289 if (annotation_type := type(annotation)) in (at_to_constraint_map := _get_at_to_constraint_map()): 

290 constraint = at_to_constraint_map[annotation_type] 

291 validator = NUMERIC_VALIDATOR_LOOKUP.get(constraint) 

292 if validator is None: 

293 raise ValueError(f'Unknown constraint {constraint}') 

294 schema = cs.no_info_after_validator_function( 

295 partial(validator, {constraint: getattr(annotation, constraint)}), schema 

296 ) 

297 continue 

298 elif isinstance(annotation, (at.Predicate, at.Not)): 

299 predicate_name = f'{annotation.func.__qualname__}' if hasattr(annotation.func, '__qualname__') else '' 

300 

301 def val_func(v: Any) -> Any: 

302 predicate_satisfied = annotation.func(v) # noqa: B023 

303 

304 # annotation.func may also raise an exception, let it pass through 

305 if isinstance(annotation, at.Predicate): # noqa: B023 

306 if not predicate_satisfied: 

307 raise PydanticCustomError( 

308 'predicate_failed', 

309 f'Predicate {predicate_name} failed', # type: ignore # noqa: B023 

310 ) 

311 else: 

312 if predicate_satisfied: 

313 raise PydanticCustomError( 

314 'not_operation_failed', 

315 f'Not of {predicate_name} failed', # type: ignore # noqa: B023 

316 ) 

317 

318 return v 

319 

320 schema = cs.no_info_after_validator_function(val_func, schema) 

321 else: 

322 # ignore any other unknown metadata 

323 return None 

324 

325 if chain_schema_steps: 

326 chain_schema_steps = [schema] + chain_schema_steps 

327 return cs.chain_schema(chain_schema_steps) 

328 

329 return schema 

330 

331 

332def collect_known_metadata(annotations: Iterable[Any]) -> tuple[dict[str, Any], list[Any]]: 

333 """Split `annotations` into known metadata and unknown annotations. 

334 

335 Args: 

336 annotations: An iterable of annotations. 

337 

338 Returns: 

339 A tuple contains a dict of known metadata and a list of unknown annotations. 

340 

341 Example: 

342 ```python 

343 from annotated_types import Gt, Len 

344 

345 from pydantic._internal._known_annotated_metadata import collect_known_metadata 

346 

347 print(collect_known_metadata([Gt(1), Len(42), ...])) 

348 #> ({'gt': 1, 'min_length': 42}, [Ellipsis]) 

349 ``` 

350 """ 

351 annotations = expand_grouped_metadata(annotations) 

352 

353 res: dict[str, Any] = {} 

354 remaining: list[Any] = [] 

355 

356 for annotation in annotations: 

357 # isinstance(annotation, PydanticMetadata) also covers ._fields:_PydanticGeneralMetadata 

358 if isinstance(annotation, PydanticMetadata): 

359 res.update(annotation.__dict__) 

360 # we don't use dataclasses.asdict because that recursively calls asdict on the field values 

361 elif (annotation_type := type(annotation)) in (at_to_constraint_map := _get_at_to_constraint_map()): 

362 constraint = at_to_constraint_map[annotation_type] 

363 res[constraint] = getattr(annotation, constraint) 

364 elif isinstance(annotation, type) and issubclass(annotation, PydanticMetadata): 

365 # also support PydanticMetadata classes being used without initialisation, 

366 # e.g. `Annotated[int, Strict]` as well as `Annotated[int, Strict()]` 

367 res.update({k: v for k, v in vars(annotation).items() if not k.startswith('_')}) 

368 else: 

369 remaining.append(annotation) 

370 # Nones can sneak in but pydantic-core will reject them 

371 # it'd be nice to clean things up so we don't put in None (we probably don't _need_ to, it was just easier) 

372 # but this is simple enough to kick that can down the road 

373 res = {k: v for k, v in res.items() if v is not None} 

374 return res, remaining 

375 

376 

377def check_metadata(metadata: dict[str, Any], allowed: Iterable[str], source_type: Any) -> None: 

378 """A small utility function to validate that the given metadata can be applied to the target. 

379 More than saving lines of code, this gives us a consistent error message for all of our internal implementations. 

380 

381 Args: 

382 metadata: A dict of metadata. 

383 allowed: An iterable of allowed metadata. 

384 source_type: The source type. 

385 

386 Raises: 

387 TypeError: If there is metadatas that can't be applied on source type. 

388 """ 

389 unknown = metadata.keys() - set(allowed) 

390 if unknown: 

391 raise TypeError( 

392 f'The following constraints cannot be applied to {source_type!r}: {", ".join([f"{k!r}" for k in unknown])}' 

393 )