Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pydantic/_internal/_validators.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

275 statements  

1"""Validator functions for standard library types. 

2 

3Import of this module is deferred since it contains imports of many standard library modules. 

4""" 

5 

6from __future__ import annotations as _annotations 

7 

8import collections.abc 

9import math 

10import re 

11import typing 

12from decimal import Decimal 

13from fractions import Fraction 

14from ipaddress import IPv4Address, IPv4Interface, IPv4Network, IPv6Address, IPv6Interface, IPv6Network 

15from typing import Any, Callable, Union, cast, get_origin 

16from zoneinfo import ZoneInfo, ZoneInfoNotFoundError 

17 

18import typing_extensions 

19from pydantic_core import PydanticCustomError, core_schema 

20from pydantic_core._pydantic_core import PydanticKnownError 

21from typing_inspection import typing_objects 

22 

23from pydantic._internal._import_utils import import_cached_field_info 

24from pydantic.errors import PydanticSchemaGenerationError 

25 

26 

27def sequence_validator( 

28 input_value: typing.Sequence[Any], 

29 /, 

30 validator: core_schema.ValidatorFunctionWrapHandler, 

31) -> typing.Sequence[Any]: 

32 """Validator for `Sequence` types, isinstance(v, Sequence) has already been called.""" 

33 value_type = type(input_value) 

34 

35 # We don't accept any plain string as a sequence 

36 # Relevant issue: https://github.com/pydantic/pydantic/issues/5595 

37 if issubclass(value_type, (str, bytes)): 

38 raise PydanticCustomError( 

39 'sequence_str', 

40 "'{type_name}' instances are not allowed as a Sequence value", 

41 {'type_name': value_type.__name__}, 

42 ) 

43 

44 # TODO: refactor sequence validation to validate with either a list or a tuple 

45 # schema, depending on the type of the value. 

46 # Additionally, we should be able to remove one of either this validator or the 

47 # SequenceValidator in _std_types_schema.py (preferably this one, while porting over some logic). 

48 # Effectively, a refactor for sequence validation is needed. 

49 if value_type is tuple: 

50 input_value = list(input_value) 

51 

52 v_list = validator(input_value) 

53 

54 # the rest of the logic is just re-creating the original type from `v_list` 

55 if value_type is list: 

56 return v_list 

57 elif issubclass(value_type, range): 

58 # return the list as we probably can't re-create the range 

59 return v_list 

60 elif value_type is tuple: 

61 return tuple(v_list) 

62 else: 

63 # best guess at how to re-create the original type, more custom construction logic might be required 

64 return value_type(v_list) # type: ignore[call-arg] 

65 

66 

67def import_string(value: Any) -> Any: 

68 if isinstance(value, str): 

69 try: 

70 return _import_string_logic(value) 

71 except ImportError as e: 

72 raise PydanticCustomError('import_error', 'Invalid python path: {error}', {'error': str(e)}) from e 

73 else: 

74 # otherwise we just return the value and let the next validator do the rest of the work 

75 return value 

76 

77 

78def _import_string_logic(dotted_path: str) -> Any: 

79 """Inspired by uvicorn — dotted paths should include a colon before the final item if that item is not a module. 

80 (This is necessary to distinguish between a submodule and an attribute when there is a conflict.). 

81 

82 If the dotted path does not include a colon and the final item is not a valid module, importing as an attribute 

83 rather than a submodule will be attempted automatically. 

84 

85 So, for example, the following values of `dotted_path` result in the following returned values: 

86 * 'collections': <module 'collections'> 

87 * 'collections.abc': <module 'collections.abc'> 

88 * 'collections.abc:Mapping': <class 'collections.abc.Mapping'> 

89 * `collections.abc.Mapping`: <class 'collections.abc.Mapping'> (though this is a bit slower than the previous line) 

90 

91 An error will be raised under any of the following scenarios: 

92 * `dotted_path` contains more than one colon (e.g., 'collections:abc:Mapping') 

93 * the substring of `dotted_path` before the colon is not a valid module in the environment (e.g., '123:Mapping') 

94 * the substring of `dotted_path` after the colon is not an attribute of the module (e.g., 'collections:abc123') 

95 """ 

96 from importlib import import_module 

97 

98 components = dotted_path.strip().split(':') 

99 if len(components) > 2: 

100 raise ImportError(f"Import strings should have at most one ':'; received {dotted_path!r}") 

101 

102 module_path = components[0] 

103 if not module_path: 

104 raise ImportError(f'Import strings should have a nonempty module name; received {dotted_path!r}') 

105 

106 try: 

107 module = import_module(module_path) 

108 except ModuleNotFoundError as e: 

109 if '.' in module_path: 

110 # Check if it would be valid if the final item was separated from its module with a `:` 

111 maybe_module_path, maybe_attribute = dotted_path.strip().rsplit('.', 1) 

112 try: 

113 return _import_string_logic(f'{maybe_module_path}:{maybe_attribute}') 

114 except ImportError: 

115 pass 

116 raise ImportError(f'No module named {module_path!r}') from e 

117 raise e 

118 

119 if len(components) > 1: 

120 attribute = components[1] 

121 try: 

122 return getattr(module, attribute) 

123 except AttributeError as e: 

124 raise ImportError(f'cannot import name {attribute!r} from {module_path!r}') from e 

125 else: 

126 return module 

127 

128 

129def pattern_either_validator(input_value: Any, /) -> typing.Pattern[Any]: 

130 if isinstance(input_value, typing.Pattern): 

131 return input_value 

132 elif isinstance(input_value, (str, bytes)): 

133 # todo strict mode 

134 return compile_pattern(input_value) # type: ignore 

135 else: 

136 raise PydanticCustomError('pattern_type', 'Input should be a valid pattern') 

137 

138 

139def pattern_str_validator(input_value: Any, /) -> typing.Pattern[str]: 

140 if isinstance(input_value, typing.Pattern): 

141 if isinstance(input_value.pattern, str): 

142 return input_value 

143 else: 

144 raise PydanticCustomError('pattern_str_type', 'Input should be a string pattern') 

145 elif isinstance(input_value, str): 

146 return compile_pattern(input_value) 

147 elif isinstance(input_value, bytes): 

148 raise PydanticCustomError('pattern_str_type', 'Input should be a string pattern') 

149 else: 

150 raise PydanticCustomError('pattern_type', 'Input should be a valid pattern') 

151 

152 

153def pattern_bytes_validator(input_value: Any, /) -> typing.Pattern[bytes]: 

154 if isinstance(input_value, typing.Pattern): 

155 if isinstance(input_value.pattern, bytes): 

156 return input_value 

157 else: 

158 raise PydanticCustomError('pattern_bytes_type', 'Input should be a bytes pattern') 

159 elif isinstance(input_value, bytes): 

160 return compile_pattern(input_value) 

161 elif isinstance(input_value, str): 

162 raise PydanticCustomError('pattern_bytes_type', 'Input should be a bytes pattern') 

163 else: 

164 raise PydanticCustomError('pattern_type', 'Input should be a valid pattern') 

165 

166 

167PatternType = typing.TypeVar('PatternType', str, bytes) 

168 

169 

170def compile_pattern(pattern: PatternType) -> typing.Pattern[PatternType]: 

171 try: 

172 return re.compile(pattern) 

173 except re.error: 

174 raise PydanticCustomError('pattern_regex', 'Input should be a valid regular expression') 

175 

176 

177def ip_v4_address_validator(input_value: Any, /) -> IPv4Address: 

178 if isinstance(input_value, IPv4Address): 

179 return input_value 

180 

181 try: 

182 return IPv4Address(input_value) 

183 except ValueError: 

184 raise PydanticCustomError('ip_v4_address', 'Input is not a valid IPv4 address') 

185 

186 

187def ip_v6_address_validator(input_value: Any, /) -> IPv6Address: 

188 if isinstance(input_value, IPv6Address): 

189 return input_value 

190 

191 try: 

192 return IPv6Address(input_value) 

193 except ValueError: 

194 raise PydanticCustomError('ip_v6_address', 'Input is not a valid IPv6 address') 

195 

196 

197def ip_v4_network_validator(input_value: Any, /) -> IPv4Network: 

198 """Assume IPv4Network initialised with a default `strict` argument. 

199 

200 See more: 

201 https://docs.python.org/library/ipaddress.html#ipaddress.IPv4Network 

202 """ 

203 if isinstance(input_value, IPv4Network): 

204 return input_value 

205 

206 try: 

207 return IPv4Network(input_value) 

208 except ValueError: 

209 raise PydanticCustomError('ip_v4_network', 'Input is not a valid IPv4 network') 

210 

211 

212def ip_v6_network_validator(input_value: Any, /) -> IPv6Network: 

213 """Assume IPv6Network initialised with a default `strict` argument. 

214 

215 See more: 

216 https://docs.python.org/library/ipaddress.html#ipaddress.IPv6Network 

217 """ 

218 if isinstance(input_value, IPv6Network): 

219 return input_value 

220 

221 try: 

222 return IPv6Network(input_value) 

223 except ValueError: 

224 raise PydanticCustomError('ip_v6_network', 'Input is not a valid IPv6 network') 

225 

226 

227def ip_v4_interface_validator(input_value: Any, /) -> IPv4Interface: 

228 if isinstance(input_value, IPv4Interface): 

229 return input_value 

230 

231 try: 

232 return IPv4Interface(input_value) 

233 except ValueError: 

234 raise PydanticCustomError('ip_v4_interface', 'Input is not a valid IPv4 interface') 

235 

236 

237def ip_v6_interface_validator(input_value: Any, /) -> IPv6Interface: 

238 if isinstance(input_value, IPv6Interface): 

239 return input_value 

240 

241 try: 

242 return IPv6Interface(input_value) 

243 except ValueError: 

244 raise PydanticCustomError('ip_v6_interface', 'Input is not a valid IPv6 interface') 

245 

246 

247def fraction_validator(input_value: Any, /) -> Fraction: 

248 if isinstance(input_value, Fraction): 

249 return input_value 

250 

251 try: 

252 return Fraction(input_value) 

253 except ValueError: 

254 raise PydanticCustomError('fraction_parsing', 'Input is not a valid fraction') 

255 

256 

257def forbid_inf_nan_check(x: Any) -> Any: 

258 if not math.isfinite(x): 

259 raise PydanticKnownError('finite_number') 

260 return x 

261 

262 

263def _safe_repr(v: Any) -> int | float | str: 

264 """The context argument for `PydanticKnownError` requires a number or str type, so we do a simple repr() coercion for types like timedelta. 

265 

266 See tests/test_types.py::test_annotated_metadata_any_order for some context. 

267 """ 

268 if isinstance(v, (int, float, str)): 

269 return v 

270 return repr(v) 

271 

272 

273def greater_than_validator(x: Any, gt: Any) -> Any: 

274 try: 

275 if not (x > gt): 

276 raise PydanticKnownError('greater_than', {'gt': _safe_repr(gt)}) 

277 return x 

278 except TypeError: 

279 raise TypeError(f"Unable to apply constraint 'gt' to supplied value {x}") 

280 

281 

282def greater_than_or_equal_validator(x: Any, ge: Any) -> Any: 

283 try: 

284 if not (x >= ge): 

285 raise PydanticKnownError('greater_than_equal', {'ge': _safe_repr(ge)}) 

286 return x 

287 except TypeError: 

288 raise TypeError(f"Unable to apply constraint 'ge' to supplied value {x}") 

289 

290 

291def less_than_validator(x: Any, lt: Any) -> Any: 

292 try: 

293 if not (x < lt): 

294 raise PydanticKnownError('less_than', {'lt': _safe_repr(lt)}) 

295 return x 

296 except TypeError: 

297 raise TypeError(f"Unable to apply constraint 'lt' to supplied value {x}") 

298 

299 

300def less_than_or_equal_validator(x: Any, le: Any) -> Any: 

301 try: 

302 if not (x <= le): 

303 raise PydanticKnownError('less_than_equal', {'le': _safe_repr(le)}) 

304 return x 

305 except TypeError: 

306 raise TypeError(f"Unable to apply constraint 'le' to supplied value {x}") 

307 

308 

309def multiple_of_validator(x: Any, multiple_of: Any) -> Any: 

310 try: 

311 if x % multiple_of: 

312 raise PydanticKnownError('multiple_of', {'multiple_of': _safe_repr(multiple_of)}) 

313 return x 

314 except TypeError: 

315 raise TypeError(f"Unable to apply constraint 'multiple_of' to supplied value {x}") 

316 

317 

318def min_length_validator(x: Any, min_length: Any) -> Any: 

319 try: 

320 if not (len(x) >= min_length): 

321 raise PydanticKnownError( 

322 'too_short', {'field_type': 'Value', 'min_length': min_length, 'actual_length': len(x)} 

323 ) 

324 return x 

325 except TypeError: 

326 raise TypeError(f"Unable to apply constraint 'min_length' to supplied value {x}") 

327 

328 

329def max_length_validator(x: Any, max_length: Any) -> Any: 

330 try: 

331 if len(x) > max_length: 

332 raise PydanticKnownError( 

333 'too_long', 

334 {'field_type': 'Value', 'max_length': max_length, 'actual_length': len(x)}, 

335 ) 

336 return x 

337 except TypeError: 

338 raise TypeError(f"Unable to apply constraint 'max_length' to supplied value {x}") 

339 

340 

341def _extract_decimal_digits_info(decimal: Decimal) -> tuple[int, int]: 

342 """Compute the total number of digits and decimal places for a given [`Decimal`][decimal.Decimal] instance. 

343 

344 This function handles both normalized and non-normalized Decimal instances. 

345 Example: Decimal('1.230') -> 4 digits, 3 decimal places 

346 

347 Args: 

348 decimal (Decimal): The decimal number to analyze. 

349 

350 Returns: 

351 tuple[int, int]: A tuple containing the number of decimal places and total digits. 

352 

353 Though this could be divided into two separate functions, the logic is easier to follow if we couple the computation 

354 of the number of decimals and digits together. 

355 """ 

356 try: 

357 decimal_tuple = decimal.as_tuple() 

358 

359 assert isinstance(decimal_tuple.exponent, int) 

360 

361 exponent = decimal_tuple.exponent 

362 num_digits = len(decimal_tuple.digits) 

363 

364 if exponent >= 0: 

365 # A positive exponent adds that many trailing zeros 

366 # Ex: digit_tuple=(1, 2, 3), exponent=2 -> 12300 -> 0 decimal places, 5 digits 

367 num_digits += exponent 

368 decimal_places = 0 

369 else: 

370 # If the absolute value of the negative exponent is larger than the 

371 # number of digits, then it's the same as the number of digits, 

372 # because it'll consume all the digits in digit_tuple and then 

373 # add abs(exponent) - len(digit_tuple) leading zeros after the decimal point. 

374 # Ex: digit_tuple=(1, 2, 3), exponent=-2 -> 1.23 -> 2 decimal places, 3 digits 

375 # Ex: digit_tuple=(1, 2, 3), exponent=-4 -> 0.0123 -> 4 decimal places, 4 digits 

376 decimal_places = abs(exponent) 

377 num_digits = max(num_digits, decimal_places) 

378 

379 return decimal_places, num_digits 

380 except (AssertionError, AttributeError): 

381 raise TypeError(f'Unable to extract decimal digits info from supplied value {decimal}') 

382 

383 

384def max_digits_validator(x: Any, max_digits: Any) -> Any: 

385 try: 

386 _, num_digits = _extract_decimal_digits_info(x) 

387 _, normalized_num_digits = _extract_decimal_digits_info(x.normalize()) 

388 if (num_digits > max_digits) and (normalized_num_digits > max_digits): 

389 raise PydanticKnownError( 

390 'decimal_max_digits', 

391 {'max_digits': max_digits}, 

392 ) 

393 return x 

394 except TypeError: 

395 raise TypeError(f"Unable to apply constraint 'max_digits' to supplied value {x}") 

396 

397 

398def decimal_places_validator(x: Any, decimal_places: Any) -> Any: 

399 try: 

400 decimal_places_, _ = _extract_decimal_digits_info(x) 

401 if decimal_places_ > decimal_places: 

402 normalized_decimal_places, _ = _extract_decimal_digits_info(x.normalize()) 

403 if normalized_decimal_places > decimal_places: 

404 raise PydanticKnownError( 

405 'decimal_max_places', 

406 {'decimal_places': decimal_places}, 

407 ) 

408 return x 

409 except TypeError: 

410 raise TypeError(f"Unable to apply constraint 'decimal_places' to supplied value {x}") 

411 

412 

413def deque_validator(input_value: Any, handler: core_schema.ValidatorFunctionWrapHandler) -> collections.deque[Any]: 

414 return collections.deque(handler(input_value), maxlen=getattr(input_value, 'maxlen', None)) 

415 

416 

417def defaultdict_validator( 

418 input_value: Any, handler: core_schema.ValidatorFunctionWrapHandler, default_default_factory: Callable[[], Any] 

419) -> collections.defaultdict[Any, Any]: 

420 if isinstance(input_value, collections.defaultdict): 

421 default_factory = input_value.default_factory 

422 return collections.defaultdict(default_factory, handler(input_value)) 

423 else: 

424 return collections.defaultdict(default_default_factory, handler(input_value)) 

425 

426 

427def get_defaultdict_default_default_factory(values_source_type: Any) -> Callable[[], Any]: 

428 FieldInfo = import_cached_field_info() 

429 

430 values_type_origin = get_origin(values_source_type) 

431 

432 def infer_default() -> Callable[[], Any]: 

433 allowed_default_types: dict[Any, Any] = { 

434 tuple: tuple, 

435 collections.abc.Sequence: tuple, 

436 collections.abc.MutableSequence: list, 

437 list: list, 

438 typing.Sequence: list, 

439 set: set, 

440 typing.MutableSet: set, 

441 collections.abc.MutableSet: set, 

442 collections.abc.Set: frozenset, 

443 typing.MutableMapping: dict, 

444 typing.Mapping: dict, 

445 collections.abc.Mapping: dict, 

446 collections.abc.MutableMapping: dict, 

447 float: float, 

448 int: int, 

449 str: str, 

450 bool: bool, 

451 } 

452 values_type = values_type_origin or values_source_type 

453 instructions = 'set using `DefaultDict[..., Annotated[..., Field(default_factory=...)]]`' 

454 if typing_objects.is_typevar(values_type): 

455 

456 def type_var_default_factory() -> None: 

457 raise RuntimeError( 

458 'Generic defaultdict cannot be used without a concrete value type or an' 

459 ' explicit default factory, ' + instructions 

460 ) 

461 

462 return type_var_default_factory 

463 elif values_type not in allowed_default_types: 

464 # a somewhat subjective set of types that have reasonable default values 

465 allowed_msg = ', '.join([t.__name__ for t in set(allowed_default_types.values())]) 

466 raise PydanticSchemaGenerationError( 

467 f'Unable to infer a default factory for keys of type {values_source_type}.' 

468 f' Only {allowed_msg} are supported, other types require an explicit default factory' 

469 ' ' + instructions 

470 ) 

471 return allowed_default_types[values_type] 

472 

473 # Assume Annotated[..., Field(...)] 

474 if typing_objects.is_annotated(values_type_origin): 

475 field_info = next((v for v in typing_extensions.get_args(values_source_type) if isinstance(v, FieldInfo)), None) 

476 else: 

477 field_info = None 

478 if field_info and field_info.default_factory: 

479 # Assume the default factory does not take any argument: 

480 default_default_factory = cast(Callable[[], Any], field_info.default_factory) 

481 else: 

482 default_default_factory = infer_default() 

483 return default_default_factory 

484 

485 

486def validate_str_is_valid_iana_tz(value: Any, /) -> ZoneInfo: 

487 if isinstance(value, ZoneInfo): 

488 return value 

489 try: 

490 return ZoneInfo(value) 

491 except (ZoneInfoNotFoundError, ValueError, TypeError): 

492 raise PydanticCustomError('zoneinfo_str', 'invalid timezone: {value}', {'value': value}) 

493 

494 

495NUMERIC_VALIDATOR_LOOKUP: dict[str, Callable] = { 

496 'gt': greater_than_validator, 

497 'ge': greater_than_or_equal_validator, 

498 'lt': less_than_validator, 

499 'le': less_than_or_equal_validator, 

500 'multiple_of': multiple_of_validator, 

501 'min_length': min_length_validator, 

502 'max_length': max_length_validator, 

503 'max_digits': max_digits_validator, 

504 'decimal_places': decimal_places_validator, 

505} 

506 

507IpType = Union[IPv4Address, IPv6Address, IPv4Network, IPv6Network, IPv4Interface, IPv6Interface] 

508 

509IP_VALIDATOR_LOOKUP: dict[type[IpType], Callable] = { 

510 IPv4Address: ip_v4_address_validator, 

511 IPv6Address: ip_v6_address_validator, 

512 IPv4Network: ip_v4_network_validator, 

513 IPv6Network: ip_v6_network_validator, 

514 IPv4Interface: ip_v4_interface_validator, 

515 IPv6Interface: ip_v6_interface_validator, 

516} 

517 

518MAPPING_ORIGIN_MAP: dict[Any, Any] = { 

519 typing.DefaultDict: collections.defaultdict, # noqa: UP006 

520 collections.defaultdict: collections.defaultdict, 

521 typing.OrderedDict: collections.OrderedDict, # noqa: UP006 

522 collections.OrderedDict: collections.OrderedDict, 

523 typing_extensions.OrderedDict: collections.OrderedDict, 

524 typing.Counter: collections.Counter, 

525 collections.Counter: collections.Counter, 

526 # this doesn't handle subclasses of these 

527 typing.Mapping: dict, 

528 typing.MutableMapping: dict, 

529 # parametrized typing.{Mutable}Mapping creates one of these 

530 collections.abc.Mapping: dict, 

531 collections.abc.MutableMapping: dict, 

532}