1import collections
2import copy
3import logging
4
5from jsonschema import Draft4Validator, ValidationError
6
7from connexion.exceptions import BadRequestProblem, ExtraParameterProblem
8from connexion.lifecycle import ConnexionRequest
9from connexion.utils import boolean, is_null, is_nullable
10
11logger = logging.getLogger("connexion.validators.parameter")
12
13TYPE_MAP = {"integer": int, "number": float, "boolean": boolean, "object": dict}
14
15try:
16 draft4_format_checker = Draft4Validator.FORMAT_CHECKER # type: ignore
17except AttributeError: # jsonschema < 4.5.0
18 from jsonschema import draft4_format_checker
19
20
21class ParameterValidator:
22 def __init__(
23 self,
24 parameters,
25 uri_parser,
26 strict_validation=False,
27 security_query_params=None,
28 ):
29 """
30 :param parameters: List of request parameter dictionaries
31 :param uri_parser: class to use for uri parsing
32 :param strict_validation: Flag indicating if parameters not in spec are allowed
33 :param security_query_params: List of query parameter names used for security.
34 These parameters will be ignored when checking for extra parameters in case of
35 strict validation.
36 """
37 self.parameters = collections.defaultdict(list)
38 for p in parameters:
39 self.parameters[p["in"]].append(p)
40
41 self.uri_parser = uri_parser
42 self.strict_validation = strict_validation
43 self.security_query_params = set(security_query_params or [])
44
45 @staticmethod
46 def validate_parameter(parameter_type, value, param, param_name=None):
47 if is_nullable(param) and is_null(value):
48 return
49
50 elif value is not None:
51 param = copy.deepcopy(param)
52 param = param.get("schema", param)
53 try:
54 Draft4Validator(param, format_checker=draft4_format_checker).validate(
55 value
56 )
57 except ValidationError as exception:
58 return str(exception)
59
60 elif param.get("required"):
61 return "Missing {parameter_type} parameter '{param[name]}'".format(
62 **locals()
63 )
64
65 @staticmethod
66 def validate_parameter_list(request_params, spec_params):
67 request_params = set(request_params)
68 spec_params = set(spec_params)
69
70 return request_params.difference(spec_params)
71
72 def validate_query_parameter_list(self, request, security_params=None):
73 request_params = request.query_params.keys()
74 spec_params = [x["name"] for x in self.parameters.get("query", [])]
75 spec_params.extend(security_params or [])
76 return self.validate_parameter_list(request_params, spec_params)
77
78 def validate_query_parameter(self, param, request):
79 """
80 Validate a single query parameter (request.args in Flask)
81
82 :type param: dict
83 :rtype: str
84 """
85 val = request.query_params.get(param["name"])
86 return self.validate_parameter("query", val, param)
87
88 def validate_path_parameter(self, param, request):
89 val = request.path_params.get(param["name"].replace("-", "_"))
90 return self.validate_parameter("path", val, param)
91
92 def validate_header_parameter(self, param, request):
93 val = request.headers.get(param["name"])
94 return self.validate_parameter("header", val, param)
95
96 def validate_cookie_parameter(self, param, request):
97 val = request.cookies.get(param["name"])
98 return self.validate_parameter("cookie", val, param)
99
100 def validate(self, scope):
101 logger.debug("%s validating parameters...", scope.get("path"))
102
103 request = ConnexionRequest(scope, uri_parser=self.uri_parser)
104 self.validate_request(request)
105
106 def validate_request(self, request):
107 if self.strict_validation:
108 query_errors = self.validate_query_parameter_list(
109 request, security_params=self.security_query_params
110 )
111
112 if query_errors:
113 raise ExtraParameterProblem(
114 param_type="query", extra_params=query_errors
115 )
116
117 for param in self.parameters.get("query", []):
118 error = self.validate_query_parameter(param, request)
119 if error:
120 raise BadRequestProblem(detail=error)
121
122 for param in self.parameters.get("path", []):
123 error = self.validate_path_parameter(param, request)
124 if error:
125 raise BadRequestProblem(detail=error)
126
127 for param in self.parameters.get("header", []):
128 error = self.validate_header_parameter(param, request)
129 if error:
130 raise BadRequestProblem(detail=error)
131
132 for param in self.parameters.get("cookie", []):
133 error = self.validate_cookie_parameter(param, request)
134 if error:
135 raise BadRequestProblem(detail=error)