Coverage for /pythoncovmergedfiles/medio/medio/src/pydantic/pydantic/_internal/_validators.py: 18%

228 statements  

« prev     ^ index     » next       coverage.py v7.2.3, created at 2023-04-27 07:38 +0000

1""" 

2Validator functions for standard library types. 

3 

4Import of this module is deferred since it contains imports of many standard library modules. 

5""" 

6 

7from __future__ import annotations as _annotations 

8 

9import re 

10import typing 

11from collections import OrderedDict, defaultdict, deque 

12from decimal import Decimal, DecimalException 

13from ipaddress import IPv4Address, IPv4Interface, IPv4Network, IPv6Address, IPv6Interface, IPv6Network 

14from pathlib import Path 

15from typing import Any 

16from uuid import UUID 

17 

18from pydantic_core import PydanticCustomError, PydanticKnownError, core_schema 

19 

20from . import _fields 

21 

22 

23def mapping_validator( 

24 __input_value: typing.Mapping[Any, Any], 

25 validator: core_schema.ValidatorFunctionWrapHandler, 

26 info: core_schema.ValidationInfo, 

27) -> typing.Mapping[Any, Any]: 

28 """ 

29 Validator for `Mapping` types, if required `isinstance(v, Mapping)` has already been called. 

30 """ 

31 v_dict = validator(__input_value) 

32 value_type = type(__input_value) 

33 

34 # the rest of the logic is just re-creating the original type from `v_dict` 

35 if value_type == dict: 

36 return v_dict 

37 elif issubclass(value_type, defaultdict): 

38 default_factory = __input_value.default_factory # type: ignore[attr-defined] 

39 return value_type(default_factory, v_dict) 

40 else: 

41 # best guess at how to re-create the original type, more custom construction logic might be required 

42 return value_type(v_dict) # type: ignore[call-arg] 

43 

44 

45def construct_counter(__input_value: typing.Mapping[Any, Any], _: core_schema.ValidationInfo) -> typing.Counter[Any]: 

46 """ 

47 Validator for `Counter` types, if required `isinstance(v, Counter)` has already been called. 

48 """ 

49 return typing.Counter(__input_value) 

50 

51 

52def sequence_validator( 

53 __input_value: typing.Sequence[Any], 

54 validator: core_schema.ValidatorFunctionWrapHandler, 

55 _: core_schema.ValidationInfo, 

56) -> typing.Sequence[Any]: 

57 """ 

58 Validator for `Sequence` types, isinstance(v, Sequence) has already been called. 

59 """ 

60 value_type = type(__input_value) 

61 v_list = validator(__input_value) 

62 

63 # the rest of the logic is just re-creating the original type from `v_list` 

64 if value_type == list: 

65 return v_list 

66 elif issubclass(value_type, str): 

67 try: 

68 return ''.join(v_list) 

69 except TypeError: 

70 # can happen if you pass a string like '123' to `Sequence[int]` 

71 raise PydanticKnownError('string_type') 

72 elif issubclass(value_type, bytes): 

73 try: 

74 return b''.join(v_list) 

75 except TypeError: 

76 # can happen if you pass a string like '123' to `Sequence[int]` 

77 raise PydanticKnownError('bytes_type') 

78 elif issubclass(value_type, range): 

79 # return the list as we probably can't re-create the range 

80 return v_list 

81 else: 

82 # best guess at how to re-create the original type, more custom construction logic might be required 

83 return value_type(v_list) # type: ignore[call-arg] 

84 

85 

86def import_string(value: Any) -> Any: 

87 if isinstance(value, str): 

88 try: 

89 return _import_string_logic(value) 

90 except ImportError as e: 

91 raise PydanticCustomError('import_error', 'Invalid python path: {error}', {'error': str(e)}) 

92 else: 

93 # otherwise we just return the value and let the next validator do the rest of the work 

94 return value 

95 

96 

97def _import_string_logic(dotted_path: str) -> Any: 

98 """ 

99 Stolen approximately from django. Import a dotted module path and return the attribute/class designated by the 

100 last name in the path. Raise ImportError if the import fails. 

101 """ 

102 from importlib import import_module 

103 

104 try: 

105 module_path, class_name = dotted_path.strip(' ').rsplit('.', 1) 

106 except ValueError as e: 

107 raise ImportError(f'"{dotted_path}" doesn\'t look like a module path') from e 

108 

109 module = import_module(module_path) 

110 try: 

111 return getattr(module, class_name) 

112 except AttributeError as e: 

113 raise ImportError(f'Module "{module_path}" does not define a "{class_name}" attribute') from e 

114 

115 

116class DecimalValidator(_fields.CustomValidator): 

117 __slots__ = ( 

118 'gt', 

119 'ge', 

120 'lt', 

121 'le', 

122 'max_digits', 

123 'decimal_places', 

124 'multiple_of', 

125 'allow_inf_nan', 

126 'check_digits', 

127 'strict', 

128 ) 

129 

130 def __init__(self) -> None: 

131 self.gt: int | Decimal | None = None 

132 self.ge: int | Decimal | None = None 

133 self.lt: int | Decimal | None = None 

134 self.le: int | Decimal | None = None 

135 self.max_digits: int | None = None 

136 self.decimal_places: int | None = None 

137 self.multiple_of: int | Decimal | None = None 

138 self.allow_inf_nan: bool = False 

139 self.check_digits: bool = False 

140 self.strict: bool = False 

141 

142 def json_schema_override_schema(self) -> core_schema.CoreSchema: 

143 """ 

144 This function is used to produce an "override schema" for generating the JSON schema of fields of type Decimal. 

145 

146 The purpose of an override schema is to use the pre-existing approach to producing a JSON schema from a 

147 CoreSchema, where we know we want to use a different CoreSchema for the purposes of JSON schema generation. 

148 (Generally because we know what we want and an appropriately simplified CoreSchema will produce it.) 

149 """ 

150 return core_schema.float_schema( 

151 allow_inf_nan=self.allow_inf_nan, 

152 multiple_of=None if self.multiple_of is None else float(self.multiple_of), 

153 le=None if self.le is None else float(self.le), 

154 ge=None if self.ge is None else float(self.ge), 

155 lt=None if self.lt is None else float(self.lt), 

156 gt=None if self.gt is None else float(self.gt), 

157 ) 

158 

159 def __pydantic_update_schema__(self, schema: core_schema.CoreSchema, **kwargs: Any) -> None: 

160 self._update_attrs(kwargs) 

161 

162 self.check_digits = self.max_digits is not None or self.decimal_places is not None 

163 if self.check_digits and self.allow_inf_nan: 

164 raise ValueError('allow_inf_nan=True cannot be used with max_digits or decimal_places') 

165 

166 def __call__( # noqa: C901 (ignore complexity) 

167 self, __input_value: int | float | str, _: core_schema.ValidationInfo 

168 ) -> Decimal: 

169 if isinstance(__input_value, Decimal): 

170 value = __input_value 

171 else: 

172 try: 

173 value = Decimal(str(__input_value)) 

174 except DecimalException: 

175 raise PydanticCustomError('decimal_parsing', 'Input should be a valid decimal') 

176 

177 if not self.allow_inf_nan or self.check_digits: 

178 _1, digit_tuple, exponent = value.as_tuple() 

179 if not self.allow_inf_nan and exponent in {'F', 'n', 'N'}: 

180 raise PydanticKnownError('finite_number') 

181 

182 if self.check_digits: 

183 if isinstance(exponent, str): 

184 raise PydanticKnownError('finite_number') 

185 elif exponent >= 0: 

186 # A positive exponent adds that many trailing zeros. 

187 digits = len(digit_tuple) + exponent 

188 decimals = 0 

189 else: 

190 # If the absolute value of the negative exponent is larger than the 

191 # number of digits, then it's the same as the number of digits, 

192 # because it'll consume all the digits in digit_tuple and then 

193 # add abs(exponent) - len(digit_tuple) leading zeros after the 

194 # decimal point. 

195 if abs(exponent) > len(digit_tuple): 

196 digits = decimals = abs(exponent) 

197 else: 

198 digits = len(digit_tuple) 

199 decimals = abs(exponent) 

200 

201 if self.max_digits is not None and digits > self.max_digits: 

202 raise PydanticCustomError( 

203 'decimal_max_digits', 

204 'ensure that there are no more than {max_digits} digits in total', 

205 {'max_digits': self.max_digits}, 

206 ) 

207 

208 if self.decimal_places is not None and decimals > self.decimal_places: 

209 raise PydanticCustomError( 

210 'decimal_max_places', 

211 'ensure that there are no more than {decimal_places} decimal places', 

212 {'decimal_places': self.decimal_places}, 

213 ) 

214 

215 if self.max_digits is not None and self.decimal_places is not None: 

216 whole_digits = digits - decimals 

217 expected = self.max_digits - self.decimal_places 

218 if whole_digits > expected: 

219 raise PydanticCustomError( 

220 'decimal_whole_digits', 

221 'ensure that there are no more than {whole_digits} digits before the decimal point', 

222 {'whole_digits': expected}, 

223 ) 

224 

225 if self.multiple_of is not None: 

226 mod = value / self.multiple_of % 1 

227 if mod != 0: 

228 raise PydanticCustomError( 

229 'decimal_multiple_of', 

230 'Input should be a multiple of {multiple_of}', 

231 {'multiple_of': self.multiple_of}, 

232 ) 

233 

234 # these type checks are here to handle the following error: 

235 # Operator ">" not supported for types "( 

236 # <subclass of int and Decimal> 

237 # | <subclass of float and Decimal> 

238 # | <subclass of str and Decimal> 

239 # | Decimal" and "int 

240 # | Decimal" 

241 # ) 

242 if self.gt is not None and not value > self.gt: # type: ignore 

243 raise PydanticKnownError('greater_than', {'gt': self.gt}) 

244 elif self.ge is not None and not value >= self.ge: # type: ignore 

245 raise PydanticKnownError('greater_than_equal', {'ge': self.ge}) 

246 

247 if self.lt is not None and not value < self.lt: # type: ignore 

248 raise PydanticKnownError('less_than', {'lt': self.lt}) 

249 if self.le is not None and not value <= self.le: # type: ignore 

250 raise PydanticKnownError('less_than_equal', {'le': self.le}) 

251 

252 return value 

253 

254 def __repr__(self) -> str: 

255 slots = [(k, getattr(self, k)) for k in self.__slots__] 

256 s = ', '.join(f'{k}={v!r}' for k, v in slots if v is not None) 

257 return f'DecimalValidator({s})' 

258 

259 

260def uuid_validator(__input_value: str | bytes, _: core_schema.ValidationInfo) -> UUID: 

261 try: 

262 if isinstance(__input_value, str): 

263 return UUID(__input_value) 

264 else: 

265 try: 

266 return UUID(__input_value.decode()) 

267 except ValueError: 

268 # 16 bytes in big-endian order as the bytes argument fail 

269 # the above check 

270 return UUID(bytes=__input_value) 

271 except ValueError: 

272 raise PydanticCustomError('uuid_parsing', 'Input should be a valid UUID, unable to parse string as an UUID') 

273 

274 

275def path_validator(__input_value: str, _: core_schema.ValidationInfo) -> Path: 

276 try: 

277 return Path(__input_value) 

278 except TypeError: 

279 raise PydanticCustomError('path_type', 'Input is not a valid path') 

280 

281 

282def pattern_either_validator(__input_value: Any, _: core_schema.ValidationInfo) -> typing.Pattern[Any]: 

283 if isinstance(__input_value, typing.Pattern): 

284 return __input_value # type: ignore 

285 elif isinstance(__input_value, (str, bytes)): 

286 # todo strict mode 

287 return compile_pattern(__input_value) # type: ignore 

288 else: 

289 raise PydanticCustomError('pattern_type', 'Input should be a valid pattern') 

290 

291 

292def pattern_str_validator(__input_value: Any, _: core_schema.ValidationInfo) -> typing.Pattern[str]: 

293 if isinstance(__input_value, typing.Pattern): 

294 if isinstance(__input_value.pattern, str): # type: ignore 

295 return __input_value # type: ignore 

296 else: 

297 raise PydanticCustomError('pattern_str_type', 'Input should be a string pattern') 

298 elif isinstance(__input_value, str): 

299 return compile_pattern(__input_value) 

300 elif isinstance(__input_value, bytes): 

301 raise PydanticCustomError('pattern_str_type', 'Input should be a string pattern') 

302 else: 

303 raise PydanticCustomError('pattern_type', 'Input should be a valid pattern') 

304 

305 

306def pattern_bytes_validator(__input_value: Any, _: core_schema.ValidationInfo) -> Any: 

307 if isinstance(__input_value, typing.Pattern): 

308 if isinstance(__input_value.pattern, bytes): 

309 return __input_value 

310 else: 

311 raise PydanticCustomError('pattern_bytes_type', 'Input should be a bytes pattern') 

312 elif isinstance(__input_value, bytes): 

313 return compile_pattern(__input_value) 

314 elif isinstance(__input_value, str): 

315 raise PydanticCustomError('pattern_bytes_type', 'Input should be a bytes pattern') 

316 else: 

317 raise PydanticCustomError('pattern_type', 'Input should be a valid pattern') 

318 

319 

320PatternType = typing.TypeVar('PatternType', str, bytes) 

321 

322 

323def compile_pattern(pattern: PatternType) -> typing.Pattern[PatternType]: 

324 try: 

325 return re.compile(pattern) 

326 except re.error: 

327 raise PydanticCustomError('pattern_regex', 'Input should be a valid regular expression') 

328 

329 

330def deque_any_validator( 

331 __input_value: Any, validator: core_schema.ValidatorFunctionWrapHandler, _: core_schema.ValidationInfo 

332) -> deque[Any]: 

333 if isinstance(__input_value, deque): 

334 return __input_value 

335 else: 

336 return deque(validator(__input_value)) 

337 

338 

339def deque_typed_validator( 

340 __input_value: Any, validator: core_schema.ValidatorFunctionWrapHandler, _: core_schema.ValidationInfo 

341) -> deque[Any]: 

342 if isinstance(__input_value, deque): 

343 return deque(validator(__input_value), maxlen=__input_value.maxlen) 

344 else: 

345 return deque(validator(__input_value)) 

346 

347 

348def ordered_dict_any_validator( 

349 __input_value: Any, validator: core_schema.ValidatorFunctionWrapHandler, _: core_schema.ValidationInfo 

350) -> OrderedDict[Any, Any]: 

351 if isinstance(__input_value, OrderedDict): 

352 return __input_value 

353 else: 

354 return OrderedDict(validator(__input_value)) 

355 

356 

357def ordered_dict_typed_validator(__input_value: list[Any], _: core_schema.ValidationInfo) -> OrderedDict[Any, Any]: 

358 return OrderedDict(__input_value) 

359 

360 

361def ip_v4_address_validator(__input_value: Any, _: core_schema.ValidationInfo) -> IPv4Address: 

362 if isinstance(__input_value, IPv4Address): 

363 return __input_value 

364 

365 try: 

366 return IPv4Address(__input_value) 

367 except ValueError: 

368 raise PydanticCustomError('ip_v4_address', 'Input is not a valid IPv4 address') 

369 

370 

371def ip_v6_address_validator(__input_value: Any, _: core_schema.ValidationInfo) -> IPv6Address: 

372 if isinstance(__input_value, IPv6Address): 

373 return __input_value 

374 

375 try: 

376 return IPv6Address(__input_value) 

377 except ValueError: 

378 raise PydanticCustomError('ip_v6_address', 'Input is not a valid IPv6 address') 

379 

380 

381def ip_v4_network_validator(__input_value: Any, _: core_schema.ValidationInfo) -> IPv4Network: 

382 """ 

383 Assume IPv4Network initialised with a default `strict` argument 

384 

385 See more: 

386 https://docs.python.org/library/ipaddress.html#ipaddress.IPv4Network 

387 """ 

388 if isinstance(__input_value, IPv4Network): 

389 return __input_value 

390 

391 try: 

392 return IPv4Network(__input_value) 

393 except ValueError: 

394 raise PydanticCustomError('ip_v4_network', 'Input is not a valid IPv4 network') 

395 

396 

397def ip_v6_network_validator(__input_value: Any, _: core_schema.ValidationInfo) -> IPv6Network: 

398 """ 

399 Assume IPv6Network initialised with a default `strict` argument 

400 

401 See more: 

402 https://docs.python.org/library/ipaddress.html#ipaddress.IPv6Network 

403 """ 

404 if isinstance(__input_value, IPv6Network): 

405 return __input_value 

406 

407 try: 

408 return IPv6Network(__input_value) 

409 except ValueError: 

410 raise PydanticCustomError('ip_v6_network', 'Input is not a valid IPv6 network') 

411 

412 

413def ip_v4_interface_validator(__input_value: Any, _: core_schema.ValidationInfo) -> IPv4Interface: 

414 if isinstance(__input_value, IPv4Interface): 

415 return __input_value 

416 

417 try: 

418 return IPv4Interface(__input_value) 

419 except ValueError: 

420 raise PydanticCustomError('ip_v4_interface', 'Input is not a valid IPv4 interface') 

421 

422 

423def ip_v6_interface_validator(__input_value: Any, _: core_schema.ValidationInfo) -> IPv6Interface: 

424 if isinstance(__input_value, IPv6Interface): 

425 return __input_value 

426 

427 try: 

428 return IPv6Interface(__input_value) 

429 except ValueError: 

430 raise PydanticCustomError('ip_v6_interface', 'Input is not a valid IPv6 interface')