Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/botocore/validate.py: 25%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""User input parameter validation.
3This module handles user input parameter validation
4against a provided input model.
6Note that the objects in this module do *not* mutate any
7arguments. No type version happens here. It is up to another
8layer to properly convert arguments to any required types.
10Validation Errors
11-----------------
14"""
16import decimal
17import json
18from datetime import datetime
20from botocore.exceptions import ParamValidationError
21from botocore.utils import is_json_value_header, parse_to_aware_datetime
24def validate_parameters(params, shape):
25 """Validates input parameters against a schema.
27 This is a convenience function that validates parameters against a schema.
28 You can also instantiate and use the ParamValidator class directly if you
29 want more control.
31 If there are any validation errors then a ParamValidationError
32 will be raised. If there are no validation errors than no exception
33 is raised and a value of None is returned.
35 :param params: The user provided input parameters.
37 :type shape: botocore.model.Shape
38 :param shape: The schema which the input parameters should
39 adhere to.
41 :raise: ParamValidationError
43 """
44 validator = ParamValidator()
45 report = validator.validate(params, shape)
46 if report.has_errors():
47 raise ParamValidationError(report=report.generate_report())
50def type_check(valid_types):
51 def _create_type_check_guard(func):
52 def _on_passes_type_check(self, param, shape, errors, name):
53 if _type_check(param, errors, name):
54 return func(self, param, shape, errors, name)
56 def _type_check(param, errors, name):
57 if not isinstance(param, valid_types):
58 valid_type_names = [str(t) for t in valid_types]
59 errors.report(
60 name,
61 'invalid type',
62 param=param,
63 valid_types=valid_type_names,
64 )
65 return False
66 return True
68 return _on_passes_type_check
70 return _create_type_check_guard
73def range_check(name, value, shape, error_type, errors):
74 failed = False
75 min_allowed = float('-inf')
76 if 'min' in shape.metadata:
77 min_allowed = shape.metadata['min']
78 if value < min_allowed:
79 failed = True
80 elif hasattr(shape, 'serialization'):
81 # Members that can be bound to the host have an implicit min of 1
82 if shape.serialization.get('hostLabel'):
83 min_allowed = 1
84 if value < min_allowed:
85 failed = True
86 if failed:
87 errors.report(name, error_type, param=value, min_allowed=min_allowed)
90class ValidationErrors:
91 def __init__(self):
92 self._errors = []
94 def has_errors(self):
95 if self._errors:
96 return True
97 return False
99 def generate_report(self):
100 error_messages = []
101 for error in self._errors:
102 error_messages.append(self._format_error(error))
103 return '\n'.join(error_messages)
105 def _format_error(self, error):
106 error_type, name, additional = error
107 name = self._get_name(name)
108 if error_type == 'missing required field':
109 return (
110 f"Missing required parameter in {name}: "
111 f"\"{additional['required_name']}\""
112 )
113 elif error_type == 'unknown field':
114 unknown_param = additional['unknown_param']
115 valid_names = ', '.join(additional['valid_names'])
116 return (
117 f'Unknown parameter in {name}: "{unknown_param}", '
118 f'must be one of: {valid_names}'
119 )
120 elif error_type == 'invalid type':
121 param = additional['param']
122 param_type = type(param)
123 valid_types = ', '.join(additional['valid_types'])
124 return (
125 f'Invalid type for parameter {name}, value: {param}, '
126 f'type: {param_type}, valid types: {valid_types}'
127 )
128 elif error_type == 'invalid range':
129 param = additional['param']
130 min_allowed = additional['min_allowed']
131 return (
132 f'Invalid value for parameter {name}, value: {param}, '
133 f'valid min value: {min_allowed}'
134 )
135 elif error_type == 'invalid length':
136 param = additional['param']
137 min_allowed = additional['min_allowed']
138 return (
139 f'Invalid length for parameter {name}, value: {param}, '
140 f'valid min length: {min_allowed}'
141 )
142 elif error_type == 'unable to encode to json':
143 return 'Invalid parameter {} must be json serializable: {}'.format(
144 name,
145 additional['type_error'],
146 )
147 elif error_type == 'invalid type for document':
148 param = additional['param']
149 param_type = type(param)
150 valid_types = ', '.join(additional['valid_types'])
151 return (
152 f'Invalid type for document parameter {name}, value: {param}, '
153 f'type: {param_type}, valid types: {valid_types}'
154 )
155 elif error_type == 'more than one input':
156 members = ', '.join(additional['members'])
157 return (
158 f'Invalid number of parameters set for tagged union structure '
159 f'{name}. Can only set one of the following keys: '
160 f'{members}.'
161 )
162 elif error_type == 'empty input':
163 members = ', '.join(additional['members'])
164 return (
165 f'Must set one of the following keys for tagged union'
166 f'structure {name}: {members}.'
167 )
169 def _get_name(self, name):
170 if not name:
171 return 'input'
172 elif name.startswith('.'):
173 return name[1:]
174 else:
175 return name
177 def report(self, name, reason, **kwargs):
178 self._errors.append((reason, name, kwargs))
181class ParamValidator:
182 """Validates parameters against a shape model."""
184 # Valid Python types for scalar c2j types
185 SCALAR_TYPES = {
186 'float': (float, decimal.Decimal, int),
187 'double': (float, decimal.Decimal, int),
188 'integer': (int,),
189 'long': (int,),
190 'boolean': (bool,),
191 'string': (str,),
192 }
194 # Valid Python types for container c2j types
195 CONTAINER_TYPES = {
196 'structure': (dict,),
197 'map': (dict,),
198 'list': (list, tuple),
199 }
201 # Metadata attributes that we validate beyond type checking
202 VALIDATED_METADATA_ATTRS = {'required', 'min', 'document', 'union'}
204 def _shape_has_constraints(self, shape):
205 """Whether the shape has validated constraints beyond type checking."""
206 return bool(self.VALIDATED_METADATA_ATTRS & set(shape.metadata.keys()))
208 def validate(self, params, shape):
209 """Validate parameters against a shape model.
211 This method will validate the parameters against a provided shape model.
212 All errors will be collected before returning to the caller. This means
213 that this method will not stop at the first error, it will return all
214 possible errors.
216 :param params: User provided dict of parameters
217 :param shape: A shape model describing the expected input.
219 :return: A list of errors.
221 """
222 errors = ValidationErrors()
223 self._validate(params, shape, errors, name='')
224 return errors
226 def _check_special_validation_cases(self, shape):
227 if is_json_value_header(shape):
228 return self._validate_jsonvalue_string
229 if shape.type_name == 'structure' and shape.is_document_type:
230 return self._validate_document
232 def _validate(self, params, shape, errors, name):
233 special_validator = self._check_special_validation_cases(shape)
234 if special_validator:
235 special_validator(params, shape, errors, name)
236 else:
237 getattr(self, f'_validate_{shape.type_name}')(
238 params, shape, errors, name
239 )
241 def _validate_jsonvalue_string(self, params, shape, errors, name):
242 # Check to see if a value marked as a jsonvalue can be dumped to
243 # a json string.
244 try:
245 json.dumps(params)
246 except (ValueError, TypeError) as e:
247 errors.report(name, 'unable to encode to json', type_error=e)
249 def _validate_document(self, params, shape, errors, name):
250 if params is None:
251 return
253 if isinstance(params, dict):
254 for key in params:
255 self._validate_document(params[key], shape, errors, key)
256 elif isinstance(params, list):
257 for index, entity in enumerate(params):
258 self._validate_document(
259 entity, shape, errors, f'{name}[{index}]'
260 )
261 elif not isinstance(params, ((str,), int, bool, float)):
262 valid_types = (str, int, bool, float, list, dict)
263 valid_type_names = [str(t) for t in valid_types]
264 errors.report(
265 name,
266 'invalid type for document',
267 param=params,
268 param_type=type(params),
269 valid_types=valid_type_names,
270 )
272 @type_check(valid_types=CONTAINER_TYPES['structure'])
273 def _validate_structure(self, params, shape, errors, name):
274 if shape.is_tagged_union:
275 if len(params) == 0:
276 errors.report(name, 'empty input', members=shape.members)
277 elif len(params) > 1:
278 errors.report(
279 name, 'more than one input', members=shape.members
280 )
282 # Validate required fields.
283 for required_member in shape.metadata.get('required', []):
284 if required_member not in params:
285 errors.report(
286 name,
287 'missing required field',
288 required_name=required_member,
289 user_params=params,
290 )
291 members = shape.members
292 known_params = []
293 # Validate known params.
294 for param in params:
295 if param not in members:
296 errors.report(
297 name,
298 'unknown field',
299 unknown_param=param,
300 valid_names=list(members),
301 )
302 else:
303 known_params.append(param)
304 # Validate structure members.
305 for param in known_params:
306 self._validate(
307 params[param],
308 shape.members[param],
309 errors,
310 f'{name}.{param}',
311 )
313 @type_check(valid_types=SCALAR_TYPES['string'])
314 def _validate_string(self, param, shape, errors, name):
315 # Validate range. For a string, the min/max constraints
316 # are of the string length.
317 # Looks like:
318 # "WorkflowId":{
319 # "type":"string",
320 # "min":1,
321 # "max":256
322 # }
323 range_check(name, len(param), shape, 'invalid length', errors)
325 @type_check(valid_types=CONTAINER_TYPES['list'])
326 def _validate_list(self, param, shape, errors, name):
327 member_shape = shape.member
328 range_check(name, len(param), shape, 'invalid length', errors)
330 # If a list member does not have validation constraints, we will only check the type
331 member_type = member_shape.type_name
332 if (
333 member_type in self.SCALAR_TYPES
334 and not self._shape_has_constraints(member_shape)
335 ):
336 valid_types = self.SCALAR_TYPES[member_type]
337 for i, item in enumerate(param):
338 if not isinstance(item, valid_types):
339 valid_type_names = [str(t) for t in valid_types]
340 errors.report(
341 f'{name}[{i}]',
342 'invalid type',
343 param=item,
344 valid_types=valid_type_names,
345 )
346 return
348 for i, item in enumerate(param):
349 self._validate(item, member_shape, errors, f'{name}[{i}]')
351 @type_check(valid_types=CONTAINER_TYPES['map'])
352 def _validate_map(self, param, shape, errors, name):
353 key_shape = shape.key
354 value_shape = shape.value
355 for key, value in param.items():
356 self._validate(key, key_shape, errors, f"{name} (key: {key})")
357 self._validate(value, value_shape, errors, f'{name}.{key}')
359 @type_check(valid_types=SCALAR_TYPES['integer'])
360 def _validate_integer(self, param, shape, errors, name):
361 range_check(name, param, shape, 'invalid range', errors)
363 def _validate_blob(self, param, shape, errors, name):
364 if isinstance(param, (bytes, bytearray, str)):
365 return
366 elif hasattr(param, 'read'):
367 # File like objects are also allowed for blob types.
368 return
369 else:
370 errors.report(
371 name,
372 'invalid type',
373 param=param,
374 valid_types=[str(bytes), str(bytearray), 'file-like object'],
375 )
377 @type_check(valid_types=SCALAR_TYPES['boolean'])
378 def _validate_boolean(self, param, shape, errors, name):
379 pass
381 @type_check(valid_types=SCALAR_TYPES['double'])
382 def _validate_double(self, param, shape, errors, name):
383 range_check(name, param, shape, 'invalid range', errors)
385 _validate_float = _validate_double
387 @type_check(valid_types=SCALAR_TYPES['long'])
388 def _validate_long(self, param, shape, errors, name):
389 range_check(name, param, shape, 'invalid range', errors)
391 def _validate_timestamp(self, param, shape, errors, name):
392 # We don't use @type_check because datetimes are a bit
393 # more flexible. You can either provide a datetime
394 # object, or a string that parses to a datetime.
395 is_valid_type = self._type_check_datetime(param)
396 if not is_valid_type:
397 valid_type_names = [str(datetime), 'timestamp-string']
398 errors.report(
399 name, 'invalid type', param=param, valid_types=valid_type_names
400 )
402 def _type_check_datetime(self, value):
403 try:
404 parse_to_aware_datetime(value)
405 return True
406 except (TypeError, ValueError, AttributeError):
407 # Yes, dateutil can sometimes raise an AttributeError
408 # when parsing timestamps.
409 return False
412class ParamValidationDecorator:
413 def __init__(self, param_validator, serializer):
414 self._param_validator = param_validator
415 self._serializer = serializer
417 def serialize_to_request(self, parameters, operation_model):
418 input_shape = operation_model.input_shape
419 if input_shape is not None:
420 report = self._param_validator.validate(
421 parameters, operation_model.input_shape
422 )
423 if report.has_errors():
424 raise ParamValidationError(report=report.generate_report())
425 return self._serializer.serialize_to_request(
426 parameters, operation_model
427 )