1"""User input parameter validation.
2
3This module handles user input parameter validation
4against a provided input model.
5
6Note that the objects in this module do *not* mutate any
7arguments. No type version happens here. It is up to another
8layer to properly convert arguments to any required types.
9
10Validation Errors
11-----------------
12
13
14"""
15
16import decimal
17import json
18from datetime import datetime
19
20from botocore.exceptions import ParamValidationError
21from botocore.utils import is_json_value_header, parse_to_aware_datetime
22
23
24def validate_parameters(params, shape):
25 """Validates input parameters against a schema.
26
27 This is a convenience function that validates parameters against a schema.
28 You can also instantiate and use the ParamValidator class directly if you
29 want more control.
30
31 If there are any validation errors then a ParamValidationError
32 will be raised. If there are no validation errors than no exception
33 is raised and a value of None is returned.
34
35 :param params: The user provided input parameters.
36
37 :type shape: botocore.model.Shape
38 :param shape: The schema which the input parameters should
39 adhere to.
40
41 :raise: ParamValidationError
42
43 """
44 validator = ParamValidator()
45 report = validator.validate(params, shape)
46 if report.has_errors():
47 raise ParamValidationError(report=report.generate_report())
48
49
50def type_check(valid_types):
51 def _create_type_check_guard(func):
52 def _on_passes_type_check(self, param, shape, errors, name):
53 if _type_check(param, errors, name):
54 return func(self, param, shape, errors, name)
55
56 def _type_check(param, errors, name):
57 if not isinstance(param, valid_types):
58 valid_type_names = [str(t) for t in valid_types]
59 errors.report(
60 name,
61 'invalid type',
62 param=param,
63 valid_types=valid_type_names,
64 )
65 return False
66 return True
67
68 return _on_passes_type_check
69
70 return _create_type_check_guard
71
72
73def range_check(name, value, shape, error_type, errors):
74 failed = False
75 min_allowed = float('-inf')
76 if 'min' in shape.metadata:
77 min_allowed = shape.metadata['min']
78 if value < min_allowed:
79 failed = True
80 elif hasattr(shape, 'serialization'):
81 # Members that can be bound to the host have an implicit min of 1
82 if shape.serialization.get('hostLabel'):
83 min_allowed = 1
84 if value < min_allowed:
85 failed = True
86 if failed:
87 errors.report(name, error_type, param=value, min_allowed=min_allowed)
88
89
90class ValidationErrors:
91 def __init__(self):
92 self._errors = []
93
94 def has_errors(self):
95 if self._errors:
96 return True
97 return False
98
99 def generate_report(self):
100 error_messages = []
101 for error in self._errors:
102 error_messages.append(self._format_error(error))
103 return '\n'.join(error_messages)
104
105 def _format_error(self, error):
106 error_type, name, additional = error
107 name = self._get_name(name)
108 if error_type == 'missing required field':
109 return (
110 f"Missing required parameter in {name}: "
111 f"\"{additional['required_name']}\""
112 )
113 elif error_type == 'unknown field':
114 unknown_param = additional['unknown_param']
115 valid_names = ', '.join(additional['valid_names'])
116 return (
117 f'Unknown parameter in {name}: "{unknown_param}", '
118 f'must be one of: {valid_names}'
119 )
120 elif error_type == 'invalid type':
121 param = additional['param']
122 param_type = type(param)
123 valid_types = ', '.join(additional['valid_types'])
124 return (
125 f'Invalid type for parameter {name}, value: {param}, '
126 f'type: {param_type}, valid types: {valid_types}'
127 )
128 elif error_type == 'invalid range':
129 param = additional['param']
130 min_allowed = additional['min_allowed']
131 return (
132 f'Invalid value for parameter {name}, value: {param}, '
133 f'valid min value: {min_allowed}'
134 )
135 elif error_type == 'invalid length':
136 param = additional['param']
137 min_allowed = additional['min_allowed']
138 return (
139 f'Invalid length for parameter {name}, value: {param}, '
140 f'valid min length: {min_allowed}'
141 )
142 elif error_type == 'unable to encode to json':
143 return 'Invalid parameter {} must be json serializable: {}'.format(
144 name,
145 additional['type_error'],
146 )
147 elif error_type == 'invalid type for document':
148 param = additional['param']
149 param_type = type(param)
150 valid_types = ', '.join(additional['valid_types'])
151 return (
152 f'Invalid type for document parameter {name}, value: {param}, '
153 f'type: {param_type}, valid types: {valid_types}'
154 )
155 elif error_type == 'more than one input':
156 members = ', '.join(additional['members'])
157 return (
158 f'Invalid number of parameters set for tagged union structure '
159 f'{name}. Can only set one of the following keys: '
160 f'{members}.'
161 )
162 elif error_type == 'empty input':
163 members = ', '.join(additional['members'])
164 return (
165 f'Must set one of the following keys for tagged union'
166 f'structure {name}: {members}.'
167 )
168
169 def _get_name(self, name):
170 if not name:
171 return 'input'
172 elif name.startswith('.'):
173 return name[1:]
174 else:
175 return name
176
177 def report(self, name, reason, **kwargs):
178 self._errors.append((reason, name, kwargs))
179
180
181class ParamValidator:
182 """Validates parameters against a shape model."""
183
184 def validate(self, params, shape):
185 """Validate parameters against a shape model.
186
187 This method will validate the parameters against a provided shape model.
188 All errors will be collected before returning to the caller. This means
189 that this method will not stop at the first error, it will return all
190 possible errors.
191
192 :param params: User provided dict of parameters
193 :param shape: A shape model describing the expected input.
194
195 :return: A list of errors.
196
197 """
198 errors = ValidationErrors()
199 self._validate(params, shape, errors, name='')
200 return errors
201
202 def _check_special_validation_cases(self, shape):
203 if is_json_value_header(shape):
204 return self._validate_jsonvalue_string
205 if shape.type_name == 'structure' and shape.is_document_type:
206 return self._validate_document
207
208 def _validate(self, params, shape, errors, name):
209 special_validator = self._check_special_validation_cases(shape)
210 if special_validator:
211 special_validator(params, shape, errors, name)
212 else:
213 getattr(self, f'_validate_{shape.type_name}')(
214 params, shape, errors, name
215 )
216
217 def _validate_jsonvalue_string(self, params, shape, errors, name):
218 # Check to see if a value marked as a jsonvalue can be dumped to
219 # a json string.
220 try:
221 json.dumps(params)
222 except (ValueError, TypeError) as e:
223 errors.report(name, 'unable to encode to json', type_error=e)
224
225 def _validate_document(self, params, shape, errors, name):
226 if params is None:
227 return
228
229 if isinstance(params, dict):
230 for key in params:
231 self._validate_document(params[key], shape, errors, key)
232 elif isinstance(params, list):
233 for index, entity in enumerate(params):
234 self._validate_document(
235 entity, shape, errors, '%s[%d]' % (name, index)
236 )
237 elif not isinstance(params, ((str,), int, bool, float)):
238 valid_types = (str, int, bool, float, list, dict)
239 valid_type_names = [str(t) for t in valid_types]
240 errors.report(
241 name,
242 'invalid type for document',
243 param=params,
244 param_type=type(params),
245 valid_types=valid_type_names,
246 )
247
248 @type_check(valid_types=(dict,))
249 def _validate_structure(self, params, shape, errors, name):
250 if shape.is_tagged_union:
251 if len(params) == 0:
252 errors.report(name, 'empty input', members=shape.members)
253 elif len(params) > 1:
254 errors.report(
255 name, 'more than one input', members=shape.members
256 )
257
258 # Validate required fields.
259 for required_member in shape.metadata.get('required', []):
260 if required_member not in params:
261 errors.report(
262 name,
263 'missing required field',
264 required_name=required_member,
265 user_params=params,
266 )
267 members = shape.members
268 known_params = []
269 # Validate known params.
270 for param in params:
271 if param not in members:
272 errors.report(
273 name,
274 'unknown field',
275 unknown_param=param,
276 valid_names=list(members),
277 )
278 else:
279 known_params.append(param)
280 # Validate structure members.
281 for param in known_params:
282 self._validate(
283 params[param],
284 shape.members[param],
285 errors,
286 f'{name}.{param}',
287 )
288
289 @type_check(valid_types=(str,))
290 def _validate_string(self, param, shape, errors, name):
291 # Validate range. For a string, the min/max constraints
292 # are of the string length.
293 # Looks like:
294 # "WorkflowId":{
295 # "type":"string",
296 # "min":1,
297 # "max":256
298 # }
299 range_check(name, len(param), shape, 'invalid length', errors)
300
301 @type_check(valid_types=(list, tuple))
302 def _validate_list(self, param, shape, errors, name):
303 member_shape = shape.member
304 range_check(name, len(param), shape, 'invalid length', errors)
305 for i, item in enumerate(param):
306 self._validate(item, member_shape, errors, f'{name}[{i}]')
307
308 @type_check(valid_types=(dict,))
309 def _validate_map(self, param, shape, errors, name):
310 key_shape = shape.key
311 value_shape = shape.value
312 for key, value in param.items():
313 self._validate(key, key_shape, errors, f"{name} (key: {key})")
314 self._validate(value, value_shape, errors, f'{name}.{key}')
315
316 @type_check(valid_types=(int,))
317 def _validate_integer(self, param, shape, errors, name):
318 range_check(name, param, shape, 'invalid range', errors)
319
320 def _validate_blob(self, param, shape, errors, name):
321 if isinstance(param, (bytes, bytearray, str)):
322 return
323 elif hasattr(param, 'read'):
324 # File like objects are also allowed for blob types.
325 return
326 else:
327 errors.report(
328 name,
329 'invalid type',
330 param=param,
331 valid_types=[str(bytes), str(bytearray), 'file-like object'],
332 )
333
334 @type_check(valid_types=(bool,))
335 def _validate_boolean(self, param, shape, errors, name):
336 pass
337
338 @type_check(valid_types=(float, decimal.Decimal) + (int,))
339 def _validate_double(self, param, shape, errors, name):
340 range_check(name, param, shape, 'invalid range', errors)
341
342 _validate_float = _validate_double
343
344 @type_check(valid_types=(int,))
345 def _validate_long(self, param, shape, errors, name):
346 range_check(name, param, shape, 'invalid range', errors)
347
348 def _validate_timestamp(self, param, shape, errors, name):
349 # We don't use @type_check because datetimes are a bit
350 # more flexible. You can either provide a datetime
351 # object, or a string that parses to a datetime.
352 is_valid_type = self._type_check_datetime(param)
353 if not is_valid_type:
354 valid_type_names = [str(datetime), 'timestamp-string']
355 errors.report(
356 name, 'invalid type', param=param, valid_types=valid_type_names
357 )
358
359 def _type_check_datetime(self, value):
360 try:
361 parse_to_aware_datetime(value)
362 return True
363 except (TypeError, ValueError, AttributeError):
364 # Yes, dateutil can sometimes raise an AttributeError
365 # when parsing timestamps.
366 return False
367
368
369class ParamValidationDecorator:
370 def __init__(self, param_validator, serializer):
371 self._param_validator = param_validator
372 self._serializer = serializer
373
374 def serialize_to_request(self, parameters, operation_model):
375 input_shape = operation_model.input_shape
376 if input_shape is not None:
377 report = self._param_validator.validate(
378 parameters, operation_model.input_shape
379 )
380 if report.has_errors():
381 raise ParamValidationError(report=report.generate_report())
382 return self._serializer.serialize_to_request(
383 parameters, operation_model
384 )