Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/connexion/validators/parameter.py: 29%
82 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
1import collections
2import copy
3import logging
5from jsonschema import Draft4Validator, ValidationError
6from starlette.requests import Request
8from connexion.exceptions import BadRequestProblem, ExtraParameterProblem
9from connexion.utils import boolean, is_null, is_nullable
11logger = logging.getLogger("connexion.validators.parameter")
13TYPE_MAP = {"integer": int, "number": float, "boolean": boolean, "object": dict}
15try:
16 draft4_format_checker = Draft4Validator.FORMAT_CHECKER # type: ignore
17except AttributeError: # jsonschema < 4.5.0
18 from jsonschema import draft4_format_checker
21class ParameterValidator:
22 def __init__(self, parameters, uri_parser, strict_validation=False):
23 """
24 :param parameters: List of request parameter dictionaries
25 :param uri_parser: class to use for uri parsing
26 :param strict_validation: Flag indicating if parameters not in spec are allowed
27 """
28 self.parameters = collections.defaultdict(list)
29 for p in parameters:
30 self.parameters[p["in"]].append(p)
32 self.uri_parser = uri_parser
33 self.strict_validation = strict_validation
35 @staticmethod
36 def validate_parameter(parameter_type, value, param, param_name=None):
37 if is_nullable(param) and is_null(value):
38 return
40 elif value is not None:
41 param = copy.deepcopy(param)
42 param = param.get("schema", param)
43 try:
44 Draft4Validator(param, format_checker=draft4_format_checker).validate(
45 value
46 )
47 except ValidationError as exception:
48 return str(exception)
50 elif param.get("required"):
51 return "Missing {parameter_type} parameter '{param[name]}'".format(
52 **locals()
53 )
55 @staticmethod
56 def validate_parameter_list(request_params, spec_params):
57 request_params = set(request_params)
58 spec_params = set(spec_params)
60 return request_params.difference(spec_params)
62 def validate_query_parameter_list(self, request):
63 request_params = request.query_params.keys()
64 spec_params = [x["name"] for x in self.parameters.get("query", [])]
65 return self.validate_parameter_list(request_params, spec_params)
67 def validate_query_parameter(self, param, request):
68 """
69 Validate a single query parameter (request.args in Flask)
71 :type param: dict
72 :rtype: str
73 """
74 # Convert to dict of lists
75 query_params = {
76 k: request.query_params.getlist(k) for k in request.query_params
77 }
78 query_params = self.uri_parser.resolve_query(query_params)
79 val = query_params.get(param["name"])
80 return self.validate_parameter("query", val, param)
82 def validate_path_parameter(self, param, request):
83 path_params = self.uri_parser.resolve_path(request.path_params)
84 val = path_params.get(param["name"].replace("-", "_"))
85 return self.validate_parameter("path", val, param)
87 def validate_header_parameter(self, param, request):
88 val = request.headers.get(param["name"])
89 return self.validate_parameter("header", val, param)
91 def validate_cookie_parameter(self, param, request):
92 val = request.cookies.get(param["name"])
93 return self.validate_parameter("cookie", val, param)
95 def validate(self, scope):
96 logger.debug("%s validating parameters...", scope.get("path"))
98 request = Request(scope)
99 self.validate_request(request)
101 def validate_request(self, request):
103 if self.strict_validation:
104 query_errors = self.validate_query_parameter_list(request)
106 if query_errors:
107 raise ExtraParameterProblem(
108 param_type="query", extra_params=query_errors
109 )
111 for param in self.parameters.get("query", []):
112 error = self.validate_query_parameter(param, request)
113 if error:
114 raise BadRequestProblem(detail=error)
116 for param in self.parameters.get("path", []):
117 error = self.validate_path_parameter(param, request)
118 if error:
119 raise BadRequestProblem(detail=error)
121 for param in self.parameters.get("header", []):
122 error = self.validate_header_parameter(param, request)
123 if error:
124 raise BadRequestProblem(detail=error)
126 for param in self.parameters.get("cookie", []):
127 error = self.validate_cookie_parameter(param, request)
128 if error:
129 raise BadRequestProblem(detail=error)