Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/connexion/validators/parameter.py: 29%

82 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:12 +0000

1import collections 

2import copy 

3import logging 

4 

5from jsonschema import Draft4Validator, ValidationError 

6from starlette.requests import Request 

7 

8from connexion.exceptions import BadRequestProblem, ExtraParameterProblem 

9from connexion.utils import boolean, is_null, is_nullable 

10 

11logger = logging.getLogger("connexion.validators.parameter") 

12 

13TYPE_MAP = {"integer": int, "number": float, "boolean": boolean, "object": dict} 

14 

15try: 

16 draft4_format_checker = Draft4Validator.FORMAT_CHECKER # type: ignore 

17except AttributeError: # jsonschema < 4.5.0 

18 from jsonschema import draft4_format_checker 

19 

20 

21class ParameterValidator: 

22 def __init__(self, parameters, uri_parser, strict_validation=False): 

23 """ 

24 :param parameters: List of request parameter dictionaries 

25 :param uri_parser: class to use for uri parsing 

26 :param strict_validation: Flag indicating if parameters not in spec are allowed 

27 """ 

28 self.parameters = collections.defaultdict(list) 

29 for p in parameters: 

30 self.parameters[p["in"]].append(p) 

31 

32 self.uri_parser = uri_parser 

33 self.strict_validation = strict_validation 

34 

35 @staticmethod 

36 def validate_parameter(parameter_type, value, param, param_name=None): 

37 if is_nullable(param) and is_null(value): 

38 return 

39 

40 elif value is not None: 

41 param = copy.deepcopy(param) 

42 param = param.get("schema", param) 

43 try: 

44 Draft4Validator(param, format_checker=draft4_format_checker).validate( 

45 value 

46 ) 

47 except ValidationError as exception: 

48 return str(exception) 

49 

50 elif param.get("required"): 

51 return "Missing {parameter_type} parameter '{param[name]}'".format( 

52 **locals() 

53 ) 

54 

55 @staticmethod 

56 def validate_parameter_list(request_params, spec_params): 

57 request_params = set(request_params) 

58 spec_params = set(spec_params) 

59 

60 return request_params.difference(spec_params) 

61 

62 def validate_query_parameter_list(self, request): 

63 request_params = request.query_params.keys() 

64 spec_params = [x["name"] for x in self.parameters.get("query", [])] 

65 return self.validate_parameter_list(request_params, spec_params) 

66 

67 def validate_query_parameter(self, param, request): 

68 """ 

69 Validate a single query parameter (request.args in Flask) 

70 

71 :type param: dict 

72 :rtype: str 

73 """ 

74 # Convert to dict of lists 

75 query_params = { 

76 k: request.query_params.getlist(k) for k in request.query_params 

77 } 

78 query_params = self.uri_parser.resolve_query(query_params) 

79 val = query_params.get(param["name"]) 

80 return self.validate_parameter("query", val, param) 

81 

82 def validate_path_parameter(self, param, request): 

83 path_params = self.uri_parser.resolve_path(request.path_params) 

84 val = path_params.get(param["name"].replace("-", "_")) 

85 return self.validate_parameter("path", val, param) 

86 

87 def validate_header_parameter(self, param, request): 

88 val = request.headers.get(param["name"]) 

89 return self.validate_parameter("header", val, param) 

90 

91 def validate_cookie_parameter(self, param, request): 

92 val = request.cookies.get(param["name"]) 

93 return self.validate_parameter("cookie", val, param) 

94 

95 def validate(self, scope): 

96 logger.debug("%s validating parameters...", scope.get("path")) 

97 

98 request = Request(scope) 

99 self.validate_request(request) 

100 

101 def validate_request(self, request): 

102 

103 if self.strict_validation: 

104 query_errors = self.validate_query_parameter_list(request) 

105 

106 if query_errors: 

107 raise ExtraParameterProblem( 

108 param_type="query", extra_params=query_errors 

109 ) 

110 

111 for param in self.parameters.get("query", []): 

112 error = self.validate_query_parameter(param, request) 

113 if error: 

114 raise BadRequestProblem(detail=error) 

115 

116 for param in self.parameters.get("path", []): 

117 error = self.validate_path_parameter(param, request) 

118 if error: 

119 raise BadRequestProblem(detail=error) 

120 

121 for param in self.parameters.get("header", []): 

122 error = self.validate_header_parameter(param, request) 

123 if error: 

124 raise BadRequestProblem(detail=error) 

125 

126 for param in self.parameters.get("cookie", []): 

127 error = self.validate_cookie_parameter(param, request) 

128 if error: 

129 raise BadRequestProblem(detail=error)