Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/validate.py: 44%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

200 statements  

1"""User input parameter validation. 

2 

3This module handles user input parameter validation 

4against a provided input model. 

5 

6Note that the objects in this module do *not* mutate any 

7arguments. No type version happens here. It is up to another 

8layer to properly convert arguments to any required types. 

9 

10Validation Errors 

11----------------- 

12 

13 

14""" 

15 

16import decimal 

17import json 

18from datetime import datetime 

19 

20from botocore.exceptions import ParamValidationError 

21from botocore.utils import is_json_value_header, parse_to_aware_datetime 

22 

23 

24def validate_parameters(params, shape): 

25 """Validates input parameters against a schema. 

26 

27 This is a convenience function that validates parameters against a schema. 

28 You can also instantiate and use the ParamValidator class directly if you 

29 want more control. 

30 

31 If there are any validation errors then a ParamValidationError 

32 will be raised. If there are no validation errors than no exception 

33 is raised and a value of None is returned. 

34 

35 :param params: The user provided input parameters. 

36 

37 :type shape: botocore.model.Shape 

38 :param shape: The schema which the input parameters should 

39 adhere to. 

40 

41 :raise: ParamValidationError 

42 

43 """ 

44 validator = ParamValidator() 

45 report = validator.validate(params, shape) 

46 if report.has_errors(): 

47 raise ParamValidationError(report=report.generate_report()) 

48 

49 

50def type_check(valid_types): 

51 def _create_type_check_guard(func): 

52 def _on_passes_type_check(self, param, shape, errors, name): 

53 if _type_check(param, errors, name): 

54 return func(self, param, shape, errors, name) 

55 

56 def _type_check(param, errors, name): 

57 if not isinstance(param, valid_types): 

58 valid_type_names = [str(t) for t in valid_types] 

59 errors.report( 

60 name, 

61 'invalid type', 

62 param=param, 

63 valid_types=valid_type_names, 

64 ) 

65 return False 

66 return True 

67 

68 return _on_passes_type_check 

69 

70 return _create_type_check_guard 

71 

72 

73def range_check(name, value, shape, error_type, errors): 

74 failed = False 

75 min_allowed = float('-inf') 

76 if 'min' in shape.metadata: 

77 min_allowed = shape.metadata['min'] 

78 if value < min_allowed: 

79 failed = True 

80 elif hasattr(shape, 'serialization'): 

81 # Members that can be bound to the host have an implicit min of 1 

82 if shape.serialization.get('hostLabel'): 

83 min_allowed = 1 

84 if value < min_allowed: 

85 failed = True 

86 if failed: 

87 errors.report(name, error_type, param=value, min_allowed=min_allowed) 

88 

89 

90class ValidationErrors: 

91 def __init__(self): 

92 self._errors = [] 

93 

94 def has_errors(self): 

95 if self._errors: 

96 return True 

97 return False 

98 

99 def generate_report(self): 

100 error_messages = [] 

101 for error in self._errors: 

102 error_messages.append(self._format_error(error)) 

103 return '\n'.join(error_messages) 

104 

105 def _format_error(self, error): 

106 error_type, name, additional = error 

107 name = self._get_name(name) 

108 if error_type == 'missing required field': 

109 return ( 

110 f"Missing required parameter in {name}: " 

111 f"\"{additional['required_name']}\"" 

112 ) 

113 elif error_type == 'unknown field': 

114 unknown_param = additional['unknown_param'] 

115 valid_names = ', '.join(additional['valid_names']) 

116 return ( 

117 f'Unknown parameter in {name}: "{unknown_param}", ' 

118 f'must be one of: {valid_names}' 

119 ) 

120 elif error_type == 'invalid type': 

121 param = additional['param'] 

122 param_type = type(param) 

123 valid_types = ', '.join(additional['valid_types']) 

124 return ( 

125 f'Invalid type for parameter {name}, value: {param}, ' 

126 f'type: {param_type}, valid types: {valid_types}' 

127 ) 

128 elif error_type == 'invalid range': 

129 param = additional['param'] 

130 min_allowed = additional['min_allowed'] 

131 return ( 

132 f'Invalid value for parameter {name}, value: {param}, ' 

133 f'valid min value: {min_allowed}' 

134 ) 

135 elif error_type == 'invalid length': 

136 param = additional['param'] 

137 min_allowed = additional['min_allowed'] 

138 return ( 

139 f'Invalid length for parameter {name}, value: {param}, ' 

140 f'valid min length: {min_allowed}' 

141 ) 

142 elif error_type == 'unable to encode to json': 

143 return 'Invalid parameter {} must be json serializable: {}'.format( 

144 name, 

145 additional['type_error'], 

146 ) 

147 elif error_type == 'invalid type for document': 

148 param = additional['param'] 

149 param_type = type(param) 

150 valid_types = ', '.join(additional['valid_types']) 

151 return ( 

152 f'Invalid type for document parameter {name}, value: {param}, ' 

153 f'type: {param_type}, valid types: {valid_types}' 

154 ) 

155 elif error_type == 'more than one input': 

156 members = ', '.join(additional['members']) 

157 return ( 

158 f'Invalid number of parameters set for tagged union structure ' 

159 f'{name}. Can only set one of the following keys: ' 

160 f'{members}.' 

161 ) 

162 elif error_type == 'empty input': 

163 members = ', '.join(additional['members']) 

164 return ( 

165 f'Must set one of the following keys for tagged union' 

166 f'structure {name}: {members}.' 

167 ) 

168 

169 def _get_name(self, name): 

170 if not name: 

171 return 'input' 

172 elif name.startswith('.'): 

173 return name[1:] 

174 else: 

175 return name 

176 

177 def report(self, name, reason, **kwargs): 

178 self._errors.append((reason, name, kwargs)) 

179 

180 

181class ParamValidator: 

182 """Validates parameters against a shape model.""" 

183 

184 def validate(self, params, shape): 

185 """Validate parameters against a shape model. 

186 

187 This method will validate the parameters against a provided shape model. 

188 All errors will be collected before returning to the caller. This means 

189 that this method will not stop at the first error, it will return all 

190 possible errors. 

191 

192 :param params: User provided dict of parameters 

193 :param shape: A shape model describing the expected input. 

194 

195 :return: A list of errors. 

196 

197 """ 

198 errors = ValidationErrors() 

199 self._validate(params, shape, errors, name='') 

200 return errors 

201 

202 def _check_special_validation_cases(self, shape): 

203 if is_json_value_header(shape): 

204 return self._validate_jsonvalue_string 

205 if shape.type_name == 'structure' and shape.is_document_type: 

206 return self._validate_document 

207 

208 def _validate(self, params, shape, errors, name): 

209 special_validator = self._check_special_validation_cases(shape) 

210 if special_validator: 

211 special_validator(params, shape, errors, name) 

212 else: 

213 getattr(self, f'_validate_{shape.type_name}')( 

214 params, shape, errors, name 

215 ) 

216 

217 def _validate_jsonvalue_string(self, params, shape, errors, name): 

218 # Check to see if a value marked as a jsonvalue can be dumped to 

219 # a json string. 

220 try: 

221 json.dumps(params) 

222 except (ValueError, TypeError) as e: 

223 errors.report(name, 'unable to encode to json', type_error=e) 

224 

225 def _validate_document(self, params, shape, errors, name): 

226 if params is None: 

227 return 

228 

229 if isinstance(params, dict): 

230 for key in params: 

231 self._validate_document(params[key], shape, errors, key) 

232 elif isinstance(params, list): 

233 for index, entity in enumerate(params): 

234 self._validate_document( 

235 entity, shape, errors, '%s[%d]' % (name, index) 

236 ) 

237 elif not isinstance(params, ((str,), int, bool, float)): 

238 valid_types = (str, int, bool, float, list, dict) 

239 valid_type_names = [str(t) for t in valid_types] 

240 errors.report( 

241 name, 

242 'invalid type for document', 

243 param=params, 

244 param_type=type(params), 

245 valid_types=valid_type_names, 

246 ) 

247 

248 @type_check(valid_types=(dict,)) 

249 def _validate_structure(self, params, shape, errors, name): 

250 if shape.is_tagged_union: 

251 if len(params) == 0: 

252 errors.report(name, 'empty input', members=shape.members) 

253 elif len(params) > 1: 

254 errors.report( 

255 name, 'more than one input', members=shape.members 

256 ) 

257 

258 # Validate required fields. 

259 for required_member in shape.metadata.get('required', []): 

260 if required_member not in params: 

261 errors.report( 

262 name, 

263 'missing required field', 

264 required_name=required_member, 

265 user_params=params, 

266 ) 

267 members = shape.members 

268 known_params = [] 

269 # Validate known params. 

270 for param in params: 

271 if param not in members: 

272 errors.report( 

273 name, 

274 'unknown field', 

275 unknown_param=param, 

276 valid_names=list(members), 

277 ) 

278 else: 

279 known_params.append(param) 

280 # Validate structure members. 

281 for param in known_params: 

282 self._validate( 

283 params[param], 

284 shape.members[param], 

285 errors, 

286 f'{name}.{param}', 

287 ) 

288 

289 @type_check(valid_types=(str,)) 

290 def _validate_string(self, param, shape, errors, name): 

291 # Validate range. For a string, the min/max constraints 

292 # are of the string length. 

293 # Looks like: 

294 # "WorkflowId":{ 

295 # "type":"string", 

296 # "min":1, 

297 # "max":256 

298 # } 

299 range_check(name, len(param), shape, 'invalid length', errors) 

300 

301 @type_check(valid_types=(list, tuple)) 

302 def _validate_list(self, param, shape, errors, name): 

303 member_shape = shape.member 

304 range_check(name, len(param), shape, 'invalid length', errors) 

305 for i, item in enumerate(param): 

306 self._validate(item, member_shape, errors, f'{name}[{i}]') 

307 

308 @type_check(valid_types=(dict,)) 

309 def _validate_map(self, param, shape, errors, name): 

310 key_shape = shape.key 

311 value_shape = shape.value 

312 for key, value in param.items(): 

313 self._validate(key, key_shape, errors, f"{name} (key: {key})") 

314 self._validate(value, value_shape, errors, f'{name}.{key}') 

315 

316 @type_check(valid_types=(int,)) 

317 def _validate_integer(self, param, shape, errors, name): 

318 range_check(name, param, shape, 'invalid range', errors) 

319 

320 def _validate_blob(self, param, shape, errors, name): 

321 if isinstance(param, (bytes, bytearray, str)): 

322 return 

323 elif hasattr(param, 'read'): 

324 # File like objects are also allowed for blob types. 

325 return 

326 else: 

327 errors.report( 

328 name, 

329 'invalid type', 

330 param=param, 

331 valid_types=[str(bytes), str(bytearray), 'file-like object'], 

332 ) 

333 

334 @type_check(valid_types=(bool,)) 

335 def _validate_boolean(self, param, shape, errors, name): 

336 pass 

337 

338 @type_check(valid_types=(float, decimal.Decimal) + (int,)) 

339 def _validate_double(self, param, shape, errors, name): 

340 range_check(name, param, shape, 'invalid range', errors) 

341 

342 _validate_float = _validate_double 

343 

344 @type_check(valid_types=(int,)) 

345 def _validate_long(self, param, shape, errors, name): 

346 range_check(name, param, shape, 'invalid range', errors) 

347 

348 def _validate_timestamp(self, param, shape, errors, name): 

349 # We don't use @type_check because datetimes are a bit 

350 # more flexible. You can either provide a datetime 

351 # object, or a string that parses to a datetime. 

352 is_valid_type = self._type_check_datetime(param) 

353 if not is_valid_type: 

354 valid_type_names = [str(datetime), 'timestamp-string'] 

355 errors.report( 

356 name, 'invalid type', param=param, valid_types=valid_type_names 

357 ) 

358 

359 def _type_check_datetime(self, value): 

360 try: 

361 parse_to_aware_datetime(value) 

362 return True 

363 except (TypeError, ValueError, AttributeError): 

364 # Yes, dateutil can sometimes raise an AttributeError 

365 # when parsing timestamps. 

366 return False 

367 

368 

369class ParamValidationDecorator: 

370 def __init__(self, param_validator, serializer): 

371 self._param_validator = param_validator 

372 self._serializer = serializer 

373 

374 def serialize_to_request(self, parameters, operation_model): 

375 input_shape = operation_model.input_shape 

376 if input_shape is not None: 

377 report = self._param_validator.validate( 

378 parameters, operation_model.input_shape 

379 ) 

380 if report.has_errors(): 

381 raise ParamValidationError(report=report.generate_report()) 

382 return self._serializer.serialize_to_request( 

383 parameters, operation_model 

384 )