1import collections 
    2import copy 
    3import logging 
    4 
    5from jsonschema import Draft4Validator, ValidationError 
    6 
    7from connexion.exceptions import BadRequestProblem, ExtraParameterProblem 
    8from connexion.lifecycle import ConnexionRequest 
    9from connexion.utils import boolean, is_null, is_nullable 
    10 
    11logger = logging.getLogger("connexion.validators.parameter") 
    12 
    13TYPE_MAP = {"integer": int, "number": float, "boolean": boolean, "object": dict} 
    14 
    15try: 
    16    draft4_format_checker = Draft4Validator.FORMAT_CHECKER  # type: ignore 
    17except AttributeError:  # jsonschema < 4.5.0 
    18    from jsonschema import draft4_format_checker 
    19 
    20 
    21class ParameterValidator: 
    22    def __init__( 
    23        self, 
    24        parameters, 
    25        uri_parser, 
    26        strict_validation=False, 
    27        security_query_params=None, 
    28    ): 
    29        """ 
    30        :param parameters: List of request parameter dictionaries 
    31        :param uri_parser: class to use for uri parsing 
    32        :param strict_validation: Flag indicating if parameters not in spec are allowed 
    33        :param security_query_params: List of query parameter names used for security. 
    34            These parameters will be ignored when checking for extra parameters in case of 
    35            strict validation. 
    36        """ 
    37        self.parameters = collections.defaultdict(list) 
    38        for p in parameters: 
    39            self.parameters[p["in"]].append(p) 
    40 
    41        self.uri_parser = uri_parser 
    42        self.strict_validation = strict_validation 
    43        self.security_query_params = set(security_query_params or []) 
    44 
    45    @staticmethod 
    46    def validate_parameter(parameter_type, value, param, param_name=None): 
    47        if is_nullable(param) and is_null(value): 
    48            return 
    49 
    50        elif value is not None: 
    51            param = copy.deepcopy(param) 
    52            param = param.get("schema", param) 
    53            try: 
    54                Draft4Validator(param, format_checker=draft4_format_checker).validate( 
    55                    value 
    56                ) 
    57            except ValidationError as exception: 
    58                return str(exception) 
    59 
    60        elif param.get("required"): 
    61            return "Missing {parameter_type} parameter '{param[name]}'".format( 
    62                **locals() 
    63            ) 
    64 
    65    @staticmethod 
    66    def validate_parameter_list(request_params, spec_params): 
    67        request_params = set(request_params) 
    68        spec_params = set(spec_params) 
    69 
    70        return request_params.difference(spec_params) 
    71 
    72    def validate_query_parameter_list(self, request, security_params=None): 
    73        request_params = request.query_params.keys() 
    74        spec_params = [x["name"] for x in self.parameters.get("query", [])] 
    75        spec_params.extend(security_params or []) 
    76        return self.validate_parameter_list(request_params, spec_params) 
    77 
    78    def validate_query_parameter(self, param, request): 
    79        """ 
    80        Validate a single query parameter (request.args in Flask) 
    81 
    82        :type param: dict 
    83        :rtype: str 
    84        """ 
    85        val = request.query_params.get(param["name"]) 
    86        return self.validate_parameter("query", val, param) 
    87 
    88    def validate_path_parameter(self, param, request): 
    89        val = request.path_params.get(param["name"].replace("-", "_")) 
    90        return self.validate_parameter("path", val, param) 
    91 
    92    def validate_header_parameter(self, param, request): 
    93        val = request.headers.get(param["name"]) 
    94        return self.validate_parameter("header", val, param) 
    95 
    96    def validate_cookie_parameter(self, param, request): 
    97        val = request.cookies.get(param["name"]) 
    98        return self.validate_parameter("cookie", val, param) 
    99 
    100    def validate(self, scope): 
    101        logger.debug("%s validating parameters...", scope.get("path")) 
    102 
    103        request = ConnexionRequest(scope, uri_parser=self.uri_parser) 
    104        self.validate_request(request) 
    105 
    106    def validate_request(self, request): 
    107        if self.strict_validation: 
    108            query_errors = self.validate_query_parameter_list( 
    109                request, security_params=self.security_query_params 
    110            ) 
    111 
    112            if query_errors: 
    113                raise ExtraParameterProblem( 
    114                    param_type="query", extra_params=query_errors 
    115                ) 
    116 
    117        for param in self.parameters.get("query", []): 
    118            error = self.validate_query_parameter(param, request) 
    119            if error: 
    120                raise BadRequestProblem(detail=error) 
    121 
    122        for param in self.parameters.get("path", []): 
    123            error = self.validate_path_parameter(param, request) 
    124            if error: 
    125                raise BadRequestProblem(detail=error) 
    126 
    127        for param in self.parameters.get("header", []): 
    128            error = self.validate_header_parameter(param, request) 
    129            if error: 
    130                raise BadRequestProblem(detail=error) 
    131 
    132        for param in self.parameters.get("cookie", []): 
    133            error = self.validate_cookie_parameter(param, request) 
    134            if error: 
    135                raise BadRequestProblem(detail=error)