Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/connexion/validators/parameter.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

81 statements  

1import collections 

2import copy 

3import logging 

4 

5from jsonschema import Draft4Validator, ValidationError 

6 

7from connexion.exceptions import BadRequestProblem, ExtraParameterProblem 

8from connexion.lifecycle import ConnexionRequest 

9from connexion.utils import boolean, is_null, is_nullable 

10 

11logger = logging.getLogger("connexion.validators.parameter") 

12 

13TYPE_MAP = {"integer": int, "number": float, "boolean": boolean, "object": dict} 

14 

15try: 

16 draft4_format_checker = Draft4Validator.FORMAT_CHECKER # type: ignore 

17except AttributeError: # jsonschema < 4.5.0 

18 from jsonschema import draft4_format_checker 

19 

20 

21class ParameterValidator: 

22 def __init__( 

23 self, 

24 parameters, 

25 uri_parser, 

26 strict_validation=False, 

27 security_query_params=None, 

28 ): 

29 """ 

30 :param parameters: List of request parameter dictionaries 

31 :param uri_parser: class to use for uri parsing 

32 :param strict_validation: Flag indicating if parameters not in spec are allowed 

33 :param security_query_params: List of query parameter names used for security. 

34 These parameters will be ignored when checking for extra parameters in case of 

35 strict validation. 

36 """ 

37 self.parameters = collections.defaultdict(list) 

38 for p in parameters: 

39 self.parameters[p["in"]].append(p) 

40 

41 self.uri_parser = uri_parser 

42 self.strict_validation = strict_validation 

43 self.security_query_params = set(security_query_params or []) 

44 

45 @staticmethod 

46 def validate_parameter(parameter_type, value, param, param_name=None): 

47 if is_nullable(param) and is_null(value): 

48 return 

49 

50 elif value is not None: 

51 param = copy.deepcopy(param) 

52 param = param.get("schema", param) 

53 try: 

54 Draft4Validator(param, format_checker=draft4_format_checker).validate( 

55 value 

56 ) 

57 except ValidationError as exception: 

58 return str(exception) 

59 

60 elif param.get("required"): 

61 return "Missing {parameter_type} parameter '{param[name]}'".format( 

62 **locals() 

63 ) 

64 

65 @staticmethod 

66 def validate_parameter_list(request_params, spec_params): 

67 request_params = set(request_params) 

68 spec_params = set(spec_params) 

69 

70 return request_params.difference(spec_params) 

71 

72 def validate_query_parameter_list(self, request, security_params=None): 

73 request_params = request.query_params.keys() 

74 spec_params = [x["name"] for x in self.parameters.get("query", [])] 

75 spec_params.extend(security_params or []) 

76 return self.validate_parameter_list(request_params, spec_params) 

77 

78 def validate_query_parameter(self, param, request): 

79 """ 

80 Validate a single query parameter (request.args in Flask) 

81 

82 :type param: dict 

83 :rtype: str 

84 """ 

85 val = request.query_params.get(param["name"]) 

86 return self.validate_parameter("query", val, param) 

87 

88 def validate_path_parameter(self, param, request): 

89 val = request.path_params.get(param["name"].replace("-", "_")) 

90 return self.validate_parameter("path", val, param) 

91 

92 def validate_header_parameter(self, param, request): 

93 val = request.headers.get(param["name"]) 

94 return self.validate_parameter("header", val, param) 

95 

96 def validate_cookie_parameter(self, param, request): 

97 val = request.cookies.get(param["name"]) 

98 return self.validate_parameter("cookie", val, param) 

99 

100 def validate(self, scope): 

101 logger.debug("%s validating parameters...", scope.get("path")) 

102 

103 request = ConnexionRequest(scope, uri_parser=self.uri_parser) 

104 self.validate_request(request) 

105 

106 def validate_request(self, request): 

107 if self.strict_validation: 

108 query_errors = self.validate_query_parameter_list( 

109 request, security_params=self.security_query_params 

110 ) 

111 

112 if query_errors: 

113 raise ExtraParameterProblem( 

114 param_type="query", extra_params=query_errors 

115 ) 

116 

117 for param in self.parameters.get("query", []): 

118 error = self.validate_query_parameter(param, request) 

119 if error: 

120 raise BadRequestProblem(detail=error) 

121 

122 for param in self.parameters.get("path", []): 

123 error = self.validate_path_parameter(param, request) 

124 if error: 

125 raise BadRequestProblem(detail=error) 

126 

127 for param in self.parameters.get("header", []): 

128 error = self.validate_header_parameter(param, request) 

129 if error: 

130 raise BadRequestProblem(detail=error) 

131 

132 for param in self.parameters.get("cookie", []): 

133 error = self.validate_cookie_parameter(param, request) 

134 if error: 

135 raise BadRequestProblem(detail=error)