Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/botocore/validate.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

214 statements  

1"""User input parameter validation. 

2 

3This module handles user input parameter validation 

4against a provided input model. 

5 

6Note that the objects in this module do *not* mutate any 

7arguments. No type version happens here. It is up to another 

8layer to properly convert arguments to any required types. 

9 

10Validation Errors 

11----------------- 

12 

13 

14""" 

15 

16import decimal 

17import json 

18from datetime import datetime 

19 

20from botocore.exceptions import ParamValidationError 

21from botocore.utils import is_json_value_header, parse_to_aware_datetime 

22 

23 

24def validate_parameters(params, shape): 

25 """Validates input parameters against a schema. 

26 

27 This is a convenience function that validates parameters against a schema. 

28 You can also instantiate and use the ParamValidator class directly if you 

29 want more control. 

30 

31 If there are any validation errors then a ParamValidationError 

32 will be raised. If there are no validation errors than no exception 

33 is raised and a value of None is returned. 

34 

35 :param params: The user provided input parameters. 

36 

37 :type shape: botocore.model.Shape 

38 :param shape: The schema which the input parameters should 

39 adhere to. 

40 

41 :raise: ParamValidationError 

42 

43 """ 

44 validator = ParamValidator() 

45 report = validator.validate(params, shape) 

46 if report.has_errors(): 

47 raise ParamValidationError(report=report.generate_report()) 

48 

49 

50def type_check(valid_types): 

51 def _create_type_check_guard(func): 

52 def _on_passes_type_check(self, param, shape, errors, name): 

53 if _type_check(param, errors, name): 

54 return func(self, param, shape, errors, name) 

55 

56 def _type_check(param, errors, name): 

57 if not isinstance(param, valid_types): 

58 valid_type_names = [str(t) for t in valid_types] 

59 errors.report( 

60 name, 

61 'invalid type', 

62 param=param, 

63 valid_types=valid_type_names, 

64 ) 

65 return False 

66 return True 

67 

68 return _on_passes_type_check 

69 

70 return _create_type_check_guard 

71 

72 

73def range_check(name, value, shape, error_type, errors): 

74 failed = False 

75 min_allowed = float('-inf') 

76 if 'min' in shape.metadata: 

77 min_allowed = shape.metadata['min'] 

78 if value < min_allowed: 

79 failed = True 

80 elif hasattr(shape, 'serialization'): 

81 # Members that can be bound to the host have an implicit min of 1 

82 if shape.serialization.get('hostLabel'): 

83 min_allowed = 1 

84 if value < min_allowed: 

85 failed = True 

86 if failed: 

87 errors.report(name, error_type, param=value, min_allowed=min_allowed) 

88 

89 

90class ValidationErrors: 

91 def __init__(self): 

92 self._errors = [] 

93 

94 def has_errors(self): 

95 if self._errors: 

96 return True 

97 return False 

98 

99 def generate_report(self): 

100 error_messages = [] 

101 for error in self._errors: 

102 error_messages.append(self._format_error(error)) 

103 return '\n'.join(error_messages) 

104 

105 def _format_error(self, error): 

106 error_type, name, additional = error 

107 name = self._get_name(name) 

108 if error_type == 'missing required field': 

109 return ( 

110 f"Missing required parameter in {name}: " 

111 f"\"{additional['required_name']}\"" 

112 ) 

113 elif error_type == 'unknown field': 

114 unknown_param = additional['unknown_param'] 

115 valid_names = ', '.join(additional['valid_names']) 

116 return ( 

117 f'Unknown parameter in {name}: "{unknown_param}", ' 

118 f'must be one of: {valid_names}' 

119 ) 

120 elif error_type == 'invalid type': 

121 param = additional['param'] 

122 param_type = type(param) 

123 valid_types = ', '.join(additional['valid_types']) 

124 return ( 

125 f'Invalid type for parameter {name}, value: {param}, ' 

126 f'type: {param_type}, valid types: {valid_types}' 

127 ) 

128 elif error_type == 'invalid range': 

129 param = additional['param'] 

130 min_allowed = additional['min_allowed'] 

131 return ( 

132 f'Invalid value for parameter {name}, value: {param}, ' 

133 f'valid min value: {min_allowed}' 

134 ) 

135 elif error_type == 'invalid length': 

136 param = additional['param'] 

137 min_allowed = additional['min_allowed'] 

138 return ( 

139 f'Invalid length for parameter {name}, value: {param}, ' 

140 f'valid min length: {min_allowed}' 

141 ) 

142 elif error_type == 'unable to encode to json': 

143 return 'Invalid parameter {} must be json serializable: {}'.format( 

144 name, 

145 additional['type_error'], 

146 ) 

147 elif error_type == 'invalid type for document': 

148 param = additional['param'] 

149 param_type = type(param) 

150 valid_types = ', '.join(additional['valid_types']) 

151 return ( 

152 f'Invalid type for document parameter {name}, value: {param}, ' 

153 f'type: {param_type}, valid types: {valid_types}' 

154 ) 

155 elif error_type == 'more than one input': 

156 members = ', '.join(additional['members']) 

157 return ( 

158 f'Invalid number of parameters set for tagged union structure ' 

159 f'{name}. Can only set one of the following keys: ' 

160 f'{members}.' 

161 ) 

162 elif error_type == 'empty input': 

163 members = ', '.join(additional['members']) 

164 return ( 

165 f'Must set one of the following keys for tagged union' 

166 f'structure {name}: {members}.' 

167 ) 

168 

169 def _get_name(self, name): 

170 if not name: 

171 return 'input' 

172 elif name.startswith('.'): 

173 return name[1:] 

174 else: 

175 return name 

176 

177 def report(self, name, reason, **kwargs): 

178 self._errors.append((reason, name, kwargs)) 

179 

180 

181class ParamValidator: 

182 """Validates parameters against a shape model.""" 

183 

184 # Valid Python types for scalar c2j types 

185 SCALAR_TYPES = { 

186 'float': (float, decimal.Decimal, int), 

187 'double': (float, decimal.Decimal, int), 

188 'integer': (int,), 

189 'long': (int,), 

190 'boolean': (bool,), 

191 'string': (str,), 

192 } 

193 

194 # Valid Python types for container c2j types 

195 CONTAINER_TYPES = { 

196 'structure': (dict,), 

197 'map': (dict,), 

198 'list': (list, tuple), 

199 } 

200 

201 # Metadata attributes that we validate beyond type checking 

202 VALIDATED_METADATA_ATTRS = {'required', 'min', 'document', 'union'} 

203 

204 def _shape_has_constraints(self, shape): 

205 """Whether the shape has validated constraints beyond type checking.""" 

206 return bool(self.VALIDATED_METADATA_ATTRS & set(shape.metadata.keys())) 

207 

208 def validate(self, params, shape): 

209 """Validate parameters against a shape model. 

210 

211 This method will validate the parameters against a provided shape model. 

212 All errors will be collected before returning to the caller. This means 

213 that this method will not stop at the first error, it will return all 

214 possible errors. 

215 

216 :param params: User provided dict of parameters 

217 :param shape: A shape model describing the expected input. 

218 

219 :return: A list of errors. 

220 

221 """ 

222 errors = ValidationErrors() 

223 self._validate(params, shape, errors, name='') 

224 return errors 

225 

226 def _check_special_validation_cases(self, shape): 

227 if is_json_value_header(shape): 

228 return self._validate_jsonvalue_string 

229 if shape.type_name == 'structure' and shape.is_document_type: 

230 return self._validate_document 

231 

232 def _validate(self, params, shape, errors, name): 

233 special_validator = self._check_special_validation_cases(shape) 

234 if special_validator: 

235 special_validator(params, shape, errors, name) 

236 else: 

237 getattr(self, f'_validate_{shape.type_name}')( 

238 params, shape, errors, name 

239 ) 

240 

241 def _validate_jsonvalue_string(self, params, shape, errors, name): 

242 # Check to see if a value marked as a jsonvalue can be dumped to 

243 # a json string. 

244 try: 

245 json.dumps(params) 

246 except (ValueError, TypeError) as e: 

247 errors.report(name, 'unable to encode to json', type_error=e) 

248 

249 def _validate_document(self, params, shape, errors, name): 

250 if params is None: 

251 return 

252 

253 if isinstance(params, dict): 

254 for key in params: 

255 self._validate_document(params[key], shape, errors, key) 

256 elif isinstance(params, list): 

257 for index, entity in enumerate(params): 

258 self._validate_document( 

259 entity, shape, errors, f'{name}[{index}]' 

260 ) 

261 elif not isinstance(params, ((str,), int, bool, float)): 

262 valid_types = (str, int, bool, float, list, dict) 

263 valid_type_names = [str(t) for t in valid_types] 

264 errors.report( 

265 name, 

266 'invalid type for document', 

267 param=params, 

268 param_type=type(params), 

269 valid_types=valid_type_names, 

270 ) 

271 

272 @type_check(valid_types=CONTAINER_TYPES['structure']) 

273 def _validate_structure(self, params, shape, errors, name): 

274 if shape.is_tagged_union: 

275 if len(params) == 0: 

276 errors.report(name, 'empty input', members=shape.members) 

277 elif len(params) > 1: 

278 errors.report( 

279 name, 'more than one input', members=shape.members 

280 ) 

281 

282 # Validate required fields. 

283 for required_member in shape.metadata.get('required', []): 

284 if required_member not in params: 

285 errors.report( 

286 name, 

287 'missing required field', 

288 required_name=required_member, 

289 user_params=params, 

290 ) 

291 members = shape.members 

292 known_params = [] 

293 # Validate known params. 

294 for param in params: 

295 if param not in members: 

296 errors.report( 

297 name, 

298 'unknown field', 

299 unknown_param=param, 

300 valid_names=list(members), 

301 ) 

302 else: 

303 known_params.append(param) 

304 # Validate structure members. 

305 for param in known_params: 

306 self._validate( 

307 params[param], 

308 shape.members[param], 

309 errors, 

310 f'{name}.{param}', 

311 ) 

312 

313 @type_check(valid_types=SCALAR_TYPES['string']) 

314 def _validate_string(self, param, shape, errors, name): 

315 # Validate range. For a string, the min/max constraints 

316 # are of the string length. 

317 # Looks like: 

318 # "WorkflowId":{ 

319 # "type":"string", 

320 # "min":1, 

321 # "max":256 

322 # } 

323 range_check(name, len(param), shape, 'invalid length', errors) 

324 

325 @type_check(valid_types=CONTAINER_TYPES['list']) 

326 def _validate_list(self, param, shape, errors, name): 

327 member_shape = shape.member 

328 range_check(name, len(param), shape, 'invalid length', errors) 

329 

330 # If a list member does not have validation constraints, we will only check the type 

331 member_type = member_shape.type_name 

332 if ( 

333 member_type in self.SCALAR_TYPES 

334 and not self._shape_has_constraints(member_shape) 

335 ): 

336 valid_types = self.SCALAR_TYPES[member_type] 

337 for i, item in enumerate(param): 

338 if not isinstance(item, valid_types): 

339 valid_type_names = [str(t) for t in valid_types] 

340 errors.report( 

341 f'{name}[{i}]', 

342 'invalid type', 

343 param=item, 

344 valid_types=valid_type_names, 

345 ) 

346 return 

347 

348 for i, item in enumerate(param): 

349 self._validate(item, member_shape, errors, f'{name}[{i}]') 

350 

351 @type_check(valid_types=CONTAINER_TYPES['map']) 

352 def _validate_map(self, param, shape, errors, name): 

353 key_shape = shape.key 

354 value_shape = shape.value 

355 for key, value in param.items(): 

356 self._validate(key, key_shape, errors, f"{name} (key: {key})") 

357 self._validate(value, value_shape, errors, f'{name}.{key}') 

358 

359 @type_check(valid_types=SCALAR_TYPES['integer']) 

360 def _validate_integer(self, param, shape, errors, name): 

361 range_check(name, param, shape, 'invalid range', errors) 

362 

363 def _validate_blob(self, param, shape, errors, name): 

364 if isinstance(param, (bytes, bytearray, str)): 

365 return 

366 elif hasattr(param, 'read'): 

367 # File like objects are also allowed for blob types. 

368 return 

369 else: 

370 errors.report( 

371 name, 

372 'invalid type', 

373 param=param, 

374 valid_types=[str(bytes), str(bytearray), 'file-like object'], 

375 ) 

376 

377 @type_check(valid_types=SCALAR_TYPES['boolean']) 

378 def _validate_boolean(self, param, shape, errors, name): 

379 pass 

380 

381 @type_check(valid_types=SCALAR_TYPES['double']) 

382 def _validate_double(self, param, shape, errors, name): 

383 range_check(name, param, shape, 'invalid range', errors) 

384 

385 _validate_float = _validate_double 

386 

387 @type_check(valid_types=SCALAR_TYPES['long']) 

388 def _validate_long(self, param, shape, errors, name): 

389 range_check(name, param, shape, 'invalid range', errors) 

390 

391 def _validate_timestamp(self, param, shape, errors, name): 

392 # We don't use @type_check because datetimes are a bit 

393 # more flexible. You can either provide a datetime 

394 # object, or a string that parses to a datetime. 

395 is_valid_type = self._type_check_datetime(param) 

396 if not is_valid_type: 

397 valid_type_names = [str(datetime), 'timestamp-string'] 

398 errors.report( 

399 name, 'invalid type', param=param, valid_types=valid_type_names 

400 ) 

401 

402 def _type_check_datetime(self, value): 

403 try: 

404 parse_to_aware_datetime(value) 

405 return True 

406 except (TypeError, ValueError, AttributeError): 

407 # Yes, dateutil can sometimes raise an AttributeError 

408 # when parsing timestamps. 

409 return False 

410 

411 

412class ParamValidationDecorator: 

413 def __init__(self, param_validator, serializer): 

414 self._param_validator = param_validator 

415 self._serializer = serializer 

416 

417 def serialize_to_request(self, parameters, operation_model): 

418 input_shape = operation_model.input_shape 

419 if input_shape is not None: 

420 report = self._param_validator.validate( 

421 parameters, operation_model.input_shape 

422 ) 

423 if report.has_errors(): 

424 raise ParamValidationError(report=report.generate_report()) 

425 return self._serializer.serialize_to_request( 

426 parameters, operation_model 

427 )