Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/connexion/middleware/request_validation.py: 33%

64 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:12 +0000

1""" 

2Validation Middleware. 

3""" 

4import logging 

5import typing as t 

6 

7from starlette.types import ASGIApp, Receive, Scope, Send 

8 

9from connexion import utils 

10from connexion.datastructures import MediaTypeDict 

11from connexion.exceptions import UnsupportedMediaTypeProblem 

12from connexion.middleware.abstract import RoutedAPI, RoutedMiddleware 

13from connexion.operations import AbstractOperation 

14from connexion.validators import VALIDATOR_MAP 

15 

16logger = logging.getLogger("connexion.middleware.validation") 

17 

18 

19class RequestValidationOperation: 

20 def __init__( 

21 self, 

22 next_app: ASGIApp, 

23 *, 

24 operation: AbstractOperation, 

25 strict_validation: bool = False, 

26 validator_map: t.Optional[dict] = None, 

27 ) -> None: 

28 self.next_app = next_app 

29 self._operation = operation 

30 self.strict_validation = strict_validation 

31 self._validator_map = VALIDATOR_MAP.copy() 

32 self._validator_map.update(validator_map or {}) 

33 

34 def extract_content_type( 

35 self, headers: t.List[t.Tuple[bytes, bytes]] 

36 ) -> t.Tuple[str, str]: 

37 """Extract the mime type and encoding from the content type headers. 

38 

39 :param headers: Headers from ASGI scope 

40 

41 :return: A tuple of mime type, encoding 

42 """ 

43 mime_type, encoding = utils.extract_content_type(headers) 

44 if mime_type is None: 

45 # Content-type header is not required. Take a best guess. 

46 try: 

47 mime_type = self._operation.consumes[0] 

48 except IndexError: 

49 mime_type = "application/octet-stream" 

50 if encoding is None: 

51 encoding = "utf-8" 

52 

53 return mime_type, encoding 

54 

55 def validate_mime_type(self, mime_type: str) -> None: 

56 """Validate the mime type against the spec if it defines which mime types are accepted. 

57 

58 :param mime_type: mime type from content type header 

59 """ 

60 if not self._operation.consumes: 

61 return 

62 

63 # Convert to MediaTypeDict to handle media-ranges 

64 media_type_dict = MediaTypeDict( 

65 [(c.lower(), None) for c in self._operation.consumes] 

66 ) 

67 if mime_type.lower() not in media_type_dict: 

68 raise UnsupportedMediaTypeProblem( 

69 detail=f"Invalid Content-type ({mime_type}), " 

70 f"expected {self._operation.consumes}" 

71 ) 

72 

73 async def __call__(self, scope: Scope, receive: Receive, send: Send): 

74 # Validate parameters & headers 

75 uri_parser_class = self._operation._uri_parser_class 

76 uri_parser = uri_parser_class( 

77 self._operation.parameters, self._operation.body_definition() 

78 ) 

79 parameter_validator_cls = self._validator_map["parameter"] 

80 parameter_validator = parameter_validator_cls( # type: ignore 

81 self._operation.parameters, 

82 uri_parser=uri_parser, 

83 strict_validation=self.strict_validation, 

84 ) 

85 parameter_validator.validate(scope) 

86 

87 # Extract content type 

88 headers = scope["headers"] 

89 mime_type, encoding = self.extract_content_type(headers) 

90 self.validate_mime_type(mime_type) 

91 

92 # Validate body 

93 schema = self._operation.body_schema(mime_type) 

94 if schema: 

95 try: 

96 body_validator = self._validator_map["body"][mime_type] # type: ignore 

97 except KeyError: 

98 logging.info( 

99 f"Skipping validation. No validator registered for content type: " 

100 f"{mime_type}." 

101 ) 

102 else: 

103 validator = body_validator( 

104 schema=schema, 

105 required=self._operation.request_body.get("required", False), 

106 nullable=utils.is_nullable( 

107 self._operation.body_definition(mime_type) 

108 ), 

109 encoding=encoding, 

110 strict_validation=self.strict_validation, 

111 uri_parser=self._operation.uri_parser_class( 

112 self._operation.parameters, self._operation.body_definition() 

113 ), 

114 ) 

115 receive = await validator.wrap_receive(receive, scope=scope) 

116 

117 await self.next_app(scope, receive, send) 

118 

119 

120class RequestValidationAPI(RoutedAPI[RequestValidationOperation]): 

121 """Validation API.""" 

122 

123 def __init__( 

124 self, 

125 *args, 

126 strict_validation=False, 

127 validator_map=None, 

128 uri_parser_class=None, 

129 **kwargs, 

130 ): 

131 super().__init__(*args, **kwargs) 

132 self.validator_map = validator_map 

133 

134 logger.debug("Strict Request Validation: %s", str(strict_validation)) 

135 self.strict_validation = strict_validation 

136 

137 self.uri_parser_class = uri_parser_class 

138 

139 self.add_paths() 

140 

141 def make_operation( 

142 self, operation: AbstractOperation 

143 ) -> RequestValidationOperation: 

144 return RequestValidationOperation( 

145 self.next_app, 

146 operation=operation, 

147 strict_validation=self.strict_validation, 

148 validator_map=self.validator_map, 

149 ) 

150 

151 

152class RequestValidationMiddleware(RoutedMiddleware[RequestValidationAPI]): 

153 """Middleware for validating requests according to the API contract.""" 

154 

155 api_cls = RequestValidationAPI 

156 

157 

158class MissingValidationOperation(Exception): 

159 """Missing validation operation"""