Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pydantic/_internal/_known_annotated_metadata.py: 56%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

162 statements  

1from __future__ import annotations 

2 

3from collections import defaultdict 

4from collections.abc import Iterable 

5from copy import copy 

6from functools import lru_cache, partial 

7from typing import TYPE_CHECKING, Any 

8 

9from pydantic_core import CoreSchema, PydanticCustomError, ValidationError, to_jsonable_python 

10from pydantic_core import core_schema as cs 

11 

12from ._fields import PydanticMetadata 

13from ._import_utils import import_cached_field_info 

14 

15if TYPE_CHECKING: 

16 pass 

17 

18STRICT = {'strict'} 

19FAIL_FAST = {'fail_fast'} 

20LENGTH_CONSTRAINTS = {'min_length', 'max_length'} 

21INEQUALITY = {'le', 'ge', 'lt', 'gt'} 

22NUMERIC_CONSTRAINTS = {'multiple_of', *INEQUALITY} 

23ALLOW_INF_NAN = {'allow_inf_nan'} 

24 

25STR_CONSTRAINTS = { 

26 *LENGTH_CONSTRAINTS, 

27 *STRICT, 

28 'strip_whitespace', 

29 'to_lower', 

30 'to_upper', 

31 'pattern', 

32 'coerce_numbers_to_str', 

33} 

34BYTES_CONSTRAINTS = {*LENGTH_CONSTRAINTS, *STRICT} 

35 

36LIST_CONSTRAINTS = {*LENGTH_CONSTRAINTS, *STRICT, *FAIL_FAST} 

37TUPLE_CONSTRAINTS = {*LENGTH_CONSTRAINTS, *STRICT, *FAIL_FAST} 

38SET_CONSTRAINTS = {*LENGTH_CONSTRAINTS, *STRICT, *FAIL_FAST} 

39DICT_CONSTRAINTS = {*LENGTH_CONSTRAINTS, *STRICT} 

40GENERATOR_CONSTRAINTS = {*LENGTH_CONSTRAINTS, *STRICT} 

41SEQUENCE_CONSTRAINTS = {*LENGTH_CONSTRAINTS, *FAIL_FAST} 

42 

43FLOAT_CONSTRAINTS = {*NUMERIC_CONSTRAINTS, *ALLOW_INF_NAN, *STRICT} 

44DECIMAL_CONSTRAINTS = {'max_digits', 'decimal_places', *FLOAT_CONSTRAINTS} 

45INT_CONSTRAINTS = {*NUMERIC_CONSTRAINTS, *ALLOW_INF_NAN, *STRICT} 

46BOOL_CONSTRAINTS = STRICT 

47UUID_CONSTRAINTS = STRICT 

48 

49DATE_TIME_CONSTRAINTS = {*NUMERIC_CONSTRAINTS, *STRICT} 

50TIMEDELTA_CONSTRAINTS = {*NUMERIC_CONSTRAINTS, *STRICT} 

51TIME_CONSTRAINTS = {*NUMERIC_CONSTRAINTS, *STRICT} 

52LAX_OR_STRICT_CONSTRAINTS = STRICT 

53ENUM_CONSTRAINTS = STRICT 

54COMPLEX_CONSTRAINTS = STRICT 

55 

56UNION_CONSTRAINTS = {'union_mode'} 

57URL_CONSTRAINTS = { 

58 'max_length', 

59 'allowed_schemes', 

60 'host_required', 

61 'default_host', 

62 'default_port', 

63 'default_path', 

64} 

65 

66TEXT_SCHEMA_TYPES = ('str', 'bytes', 'url', 'multi-host-url') 

67SEQUENCE_SCHEMA_TYPES = ('list', 'tuple', 'set', 'frozenset', 'generator', *TEXT_SCHEMA_TYPES) 

68NUMERIC_SCHEMA_TYPES = ('float', 'int', 'date', 'time', 'timedelta', 'datetime') 

69 

70CONSTRAINTS_TO_ALLOWED_SCHEMAS: dict[str, set[str]] = defaultdict(set) 

71 

72constraint_schema_pairings: list[tuple[set[str], tuple[str, ...]]] = [ 

73 (STR_CONSTRAINTS, TEXT_SCHEMA_TYPES), 

74 (BYTES_CONSTRAINTS, ('bytes',)), 

75 (LIST_CONSTRAINTS, ('list',)), 

76 (TUPLE_CONSTRAINTS, ('tuple',)), 

77 (SET_CONSTRAINTS, ('set', 'frozenset')), 

78 (DICT_CONSTRAINTS, ('dict',)), 

79 (GENERATOR_CONSTRAINTS, ('generator',)), 

80 (FLOAT_CONSTRAINTS, ('float',)), 

81 (INT_CONSTRAINTS, ('int',)), 

82 (DATE_TIME_CONSTRAINTS, ('date', 'time', 'datetime', 'timedelta')), 

83 # TODO: this is a bit redundant, we could probably avoid some of these 

84 (STRICT, (*TEXT_SCHEMA_TYPES, *SEQUENCE_SCHEMA_TYPES, *NUMERIC_SCHEMA_TYPES, 'typed-dict', 'model')), 

85 (UNION_CONSTRAINTS, ('union',)), 

86 (URL_CONSTRAINTS, ('url', 'multi-host-url')), 

87 (BOOL_CONSTRAINTS, ('bool',)), 

88 (UUID_CONSTRAINTS, ('uuid',)), 

89 (LAX_OR_STRICT_CONSTRAINTS, ('lax-or-strict',)), 

90 (ENUM_CONSTRAINTS, ('enum',)), 

91 (DECIMAL_CONSTRAINTS, ('decimal',)), 

92 (COMPLEX_CONSTRAINTS, ('complex',)), 

93] 

94 

95for constraints, schemas in constraint_schema_pairings: 

96 for c in constraints: 

97 CONSTRAINTS_TO_ALLOWED_SCHEMAS[c].update(schemas) 

98 

99 

100def as_jsonable_value(v: Any) -> Any: 

101 if type(v) not in (int, str, float, bytes, bool, type(None)): 

102 return to_jsonable_python(v) 

103 return v 

104 

105 

106def expand_grouped_metadata(annotations: Iterable[Any]) -> Iterable[Any]: 

107 """Expand the annotations. 

108 

109 Args: 

110 annotations: An iterable of annotations. 

111 

112 Returns: 

113 An iterable of expanded annotations. 

114 

115 Example: 

116 ```python 

117 from annotated_types import Ge, Len 

118 

119 from pydantic._internal._known_annotated_metadata import expand_grouped_metadata 

120 

121 print(list(expand_grouped_metadata([Ge(4), Len(5)]))) 

122 #> [Ge(ge=4), MinLen(min_length=5)] 

123 ``` 

124 """ 

125 import annotated_types as at 

126 

127 FieldInfo = import_cached_field_info() 

128 

129 for annotation in annotations: 

130 if isinstance(annotation, at.GroupedMetadata): 

131 yield from annotation 

132 elif isinstance(annotation, FieldInfo): 

133 yield from annotation.metadata 

134 # this is a bit problematic in that it results in duplicate metadata 

135 # all of our "consumers" can handle it, but it is not ideal 

136 # we probably should split up FieldInfo into: 

137 # - annotated types metadata 

138 # - individual metadata known only to Pydantic 

139 annotation = copy(annotation) 

140 annotation.metadata = [] 

141 yield annotation 

142 else: 

143 yield annotation 

144 

145 

146@lru_cache 

147def _get_at_to_constraint_map() -> dict[type, str]: 

148 """Return a mapping of annotated types to constraints. 

149 

150 Normally, we would define a mapping like this in the module scope, but we can't do that 

151 because we don't permit module level imports of `annotated_types`, in an attempt to speed up 

152 the import time of `pydantic`. We still only want to have this dictionary defined in one place, 

153 so we use this function to cache the result. 

154 """ 

155 import annotated_types as at 

156 

157 return { 

158 at.Gt: 'gt', 

159 at.Ge: 'ge', 

160 at.Lt: 'lt', 

161 at.Le: 'le', 

162 at.MultipleOf: 'multiple_of', 

163 at.MinLen: 'min_length', 

164 at.MaxLen: 'max_length', 

165 } 

166 

167 

168def apply_known_metadata(annotation: Any, schema: CoreSchema) -> CoreSchema | None: # noqa: C901 

169 """Apply `annotation` to `schema` if it is an annotation we know about (Gt, Le, etc.). 

170 Otherwise return `None`. 

171 

172 This does not handle all known annotations. If / when it does, it can always 

173 return a CoreSchema and return the unmodified schema if the annotation should be ignored. 

174 

175 Assumes that GroupedMetadata has already been expanded via `expand_grouped_metadata`. 

176 

177 Args: 

178 annotation: The annotation. 

179 schema: The schema. 

180 

181 Returns: 

182 An updated schema with annotation if it is an annotation we know about, `None` otherwise. 

183 

184 Raises: 

185 RuntimeError: If a constraint can't be applied to a specific schema type. 

186 ValueError: If an unknown constraint is encountered. 

187 """ 

188 import annotated_types as at 

189 

190 from ._validators import NUMERIC_VALIDATOR_LOOKUP, forbid_inf_nan_check 

191 

192 schema = schema.copy() 

193 schema_update, other_metadata = collect_known_metadata([annotation]) 

194 schema_type = schema['type'] 

195 

196 chain_schema_constraints: set[str] = { 

197 'pattern', 

198 'strip_whitespace', 

199 'to_lower', 

200 'to_upper', 

201 'coerce_numbers_to_str', 

202 } 

203 chain_schema_steps: list[CoreSchema] = [] 

204 

205 for constraint, value in schema_update.items(): 

206 if constraint not in CONSTRAINTS_TO_ALLOWED_SCHEMAS: 

207 raise ValueError(f'Unknown constraint {constraint}') 

208 allowed_schemas = CONSTRAINTS_TO_ALLOWED_SCHEMAS[constraint] 

209 

210 # if it becomes necessary to handle more than one constraint 

211 # in this recursive case with function-after or function-wrap, we should refactor 

212 # this is a bit challenging because we sometimes want to apply constraints to the inner schema, 

213 # whereas other times we want to wrap the existing schema with a new one that enforces a new constraint. 

214 if schema_type in {'function-before', 'function-wrap', 'function-after'} and constraint == 'strict': 

215 schema['schema'] = apply_known_metadata(annotation, schema['schema']) # type: ignore # schema is function schema 

216 return schema 

217 

218 # if we're allowed to apply constraint directly to the schema, like le to int, do that 

219 if schema_type in allowed_schemas: 

220 if constraint == 'union_mode' and schema_type == 'union': 

221 schema['mode'] = value # type: ignore # schema is UnionSchema 

222 else: 

223 schema[constraint] = value 

224 continue 

225 

226 # else, apply a function after validator to the schema to enforce the corresponding constraint 

227 if constraint in chain_schema_constraints: 

228 

229 def _apply_constraint_with_incompatibility_info( 

230 value: Any, handler: cs.ValidatorFunctionWrapHandler 

231 ) -> Any: 

232 try: 

233 x = handler(value) 

234 except ValidationError as ve: 

235 # if the error is about the type, it's likely that the constraint is incompatible the type of the field 

236 # for example, the following invalid schema wouldn't be caught during schema build, but rather at this point 

237 # with a cryptic 'string_type' error coming from the string validator, 

238 # that we'd rather express as a constraint incompatibility error (TypeError) 

239 # Annotated[list[int], Field(pattern='abc')] 

240 if 'type' in ve.errors()[0]['type']: 

241 raise TypeError( 

242 f"Unable to apply constraint '{constraint}' to supplied value {value} for schema of type '{schema_type}'" # noqa: B023 

243 ) 

244 raise ve 

245 return x 

246 

247 chain_schema_steps.append( 

248 cs.no_info_wrap_validator_function( 

249 _apply_constraint_with_incompatibility_info, cs.str_schema(**{constraint: value}) 

250 ) 

251 ) 

252 elif constraint in NUMERIC_VALIDATOR_LOOKUP: 

253 if constraint in LENGTH_CONSTRAINTS: 

254 inner_schema = schema 

255 while inner_schema['type'] in {'function-before', 'function-wrap', 'function-after'}: 

256 inner_schema = inner_schema['schema'] # type: ignore 

257 inner_schema_type = inner_schema['type'] 

258 if inner_schema_type == 'list' or ( 

259 inner_schema_type == 'json-or-python' and inner_schema['json_schema']['type'] == 'list' # type: ignore 

260 ): 

261 js_constraint_key = 'minItems' if constraint == 'min_length' else 'maxItems' 

262 else: 

263 js_constraint_key = 'minLength' if constraint == 'min_length' else 'maxLength' 

264 else: 

265 js_constraint_key = constraint 

266 

267 schema = cs.no_info_after_validator_function( 

268 partial(NUMERIC_VALIDATOR_LOOKUP[constraint], **{constraint: value}), schema 

269 ) 

270 metadata = schema.get('metadata', {}) 

271 if (existing_json_schema_updates := metadata.get('pydantic_js_updates')) is not None: 

272 metadata['pydantic_js_updates'] = { 

273 **existing_json_schema_updates, 

274 **{js_constraint_key: as_jsonable_value(value)}, 

275 } 

276 else: 

277 metadata['pydantic_js_updates'] = {js_constraint_key: as_jsonable_value(value)} 

278 schema['metadata'] = metadata 

279 elif constraint == 'allow_inf_nan' and value is False: 

280 schema = cs.no_info_after_validator_function( 

281 forbid_inf_nan_check, 

282 schema, 

283 ) 

284 else: 

285 # It's rare that we'd get here, but it's possible if we add a new constraint and forget to handle it 

286 # Most constraint errors are caught at runtime during attempted application 

287 raise RuntimeError(f"Unable to apply constraint '{constraint}' to schema of type '{schema_type}'") 

288 

289 for annotation in other_metadata: 

290 if (annotation_type := type(annotation)) in (at_to_constraint_map := _get_at_to_constraint_map()): 

291 constraint = at_to_constraint_map[annotation_type] 

292 validator = NUMERIC_VALIDATOR_LOOKUP.get(constraint) 

293 if validator is None: 

294 raise ValueError(f'Unknown constraint {constraint}') 

295 schema = cs.no_info_after_validator_function( 

296 partial(validator, {constraint: getattr(annotation, constraint)}), schema 

297 ) 

298 continue 

299 elif isinstance(annotation, (at.Predicate, at.Not)): 

300 predicate_name = f'{annotation.func.__qualname__!r} ' if hasattr(annotation.func, '__qualname__') else '' 

301 

302 # Note: B023 is ignored because even though we iterate over `other_metadata`, it is guaranteed 

303 # to be of length 1. `apply_known_metadata()` is called from `GenerateSchema`, where annotations 

304 # were already expanded via `expand_grouped_metadata()`. Confusing, but this falls into the annotations 

305 # refactor. 

306 if isinstance(annotation, at.Predicate): 

307 

308 def val_func(v: Any) -> Any: 

309 predicate_satisfied = annotation.func(v) # noqa: B023 

310 if not predicate_satisfied: 

311 raise PydanticCustomError( 

312 'predicate_failed', 

313 f'Predicate {predicate_name}failed', # pyright: ignore[reportArgumentType] # noqa: B023 

314 ) 

315 return v 

316 

317 else: 

318 

319 def val_func(v: Any) -> Any: 

320 predicate_satisfied = annotation.func(v) # noqa: B023 

321 if predicate_satisfied: 

322 raise PydanticCustomError( 

323 'not_operation_failed', 

324 f'Not of {predicate_name}failed', # pyright: ignore[reportArgumentType] # noqa: B023 

325 ) 

326 return v 

327 

328 schema = cs.no_info_after_validator_function(val_func, schema) 

329 else: 

330 # ignore any other unknown metadata 

331 return None 

332 

333 if chain_schema_steps: 

334 chain_schema_steps = [schema] + chain_schema_steps 

335 return cs.chain_schema(chain_schema_steps) 

336 

337 return schema 

338 

339 

340def collect_known_metadata(annotations: Iterable[Any]) -> tuple[dict[str, Any], list[Any]]: 

341 """Split `annotations` into known metadata and unknown annotations. 

342 

343 Args: 

344 annotations: An iterable of annotations. 

345 

346 Returns: 

347 A tuple contains a dict of known metadata and a list of unknown annotations. 

348 

349 Example: 

350 ```python 

351 from annotated_types import Gt, Len 

352 

353 from pydantic._internal._known_annotated_metadata import collect_known_metadata 

354 

355 print(collect_known_metadata([Gt(1), Len(42), ...])) 

356 #> ({'gt': 1, 'min_length': 42}, [Ellipsis]) 

357 ``` 

358 """ 

359 annotations = expand_grouped_metadata(annotations) 

360 

361 res: dict[str, Any] = {} 

362 remaining: list[Any] = [] 

363 

364 for annotation in annotations: 

365 # isinstance(annotation, PydanticMetadata) also covers ._fields:_PydanticGeneralMetadata 

366 if isinstance(annotation, PydanticMetadata): 

367 res.update(annotation.__dict__) 

368 # we don't use dataclasses.asdict because that recursively calls asdict on the field values 

369 elif (annotation_type := type(annotation)) in (at_to_constraint_map := _get_at_to_constraint_map()): 

370 constraint = at_to_constraint_map[annotation_type] 

371 res[constraint] = getattr(annotation, constraint) 

372 elif isinstance(annotation, type) and issubclass(annotation, PydanticMetadata): 

373 # also support PydanticMetadata classes being used without initialisation, 

374 # e.g. `Annotated[int, Strict]` as well as `Annotated[int, Strict()]` 

375 res.update({k: v for k, v in vars(annotation).items() if not k.startswith('_')}) 

376 else: 

377 remaining.append(annotation) 

378 # Nones can sneak in but pydantic-core will reject them 

379 # it'd be nice to clean things up so we don't put in None (we probably don't _need_ to, it was just easier) 

380 # but this is simple enough to kick that can down the road 

381 res = {k: v for k, v in res.items() if v is not None} 

382 return res, remaining 

383 

384 

385def check_metadata(metadata: dict[str, Any], allowed: Iterable[str], source_type: Any) -> None: 

386 """A small utility function to validate that the given metadata can be applied to the target. 

387 More than saving lines of code, this gives us a consistent error message for all of our internal implementations. 

388 

389 Args: 

390 metadata: A dict of metadata. 

391 allowed: An iterable of allowed metadata. 

392 source_type: The source type. 

393 

394 Raises: 

395 TypeError: If there is metadatas that can't be applied on source type. 

396 """ 

397 unknown = metadata.keys() - set(allowed) 

398 if unknown: 

399 raise TypeError( 

400 f'The following constraints cannot be applied to {source_type!r}: {", ".join([f"{k!r}" for k in unknown])}' 

401 )