Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/connexion/middleware/security.py: 22%

107 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:12 +0000

1import logging 

2import typing as t 

3from collections import defaultdict 

4 

5from starlette.types import ASGIApp, Receive, Scope, Send 

6 

7from connexion.exceptions import ProblemException 

8from connexion.lifecycle import ASGIRequest 

9from connexion.middleware.abstract import RoutedAPI, RoutedMiddleware 

10from connexion.operations import AbstractOperation 

11from connexion.security import SecurityHandlerFactory 

12 

13logger = logging.getLogger("connexion.middleware.security") 

14 

15 

16class SecurityOperation: 

17 def __init__( 

18 self, 

19 next_app: ASGIApp, 

20 *, 

21 security_handler_factory: SecurityHandlerFactory, 

22 security: list, 

23 security_schemes: dict, 

24 ): 

25 self.next_app = next_app 

26 self.security_handler_factory = security_handler_factory 

27 self.security = security 

28 self.security_schemes = security_schemes 

29 self.verification_fn = self._get_verification_fn() 

30 

31 @classmethod 

32 def from_operation( 

33 cls, 

34 operation: AbstractOperation, 

35 *, 

36 next_app: ASGIApp, 

37 security_handler_factory: SecurityHandlerFactory, 

38 ) -> "SecurityOperation": 

39 return cls( 

40 next_app=next_app, 

41 security_handler_factory=security_handler_factory, 

42 security=operation.security, 

43 security_schemes=operation.security_schemes, 

44 ) 

45 

46 def _get_verification_fn(self): 

47 logger.debug("... Security: %s", self.security, extra=vars(self)) 

48 if not self.security: 

49 return self.security_handler_factory.security_passthrough 

50 

51 auth_funcs = [] 

52 for security_req in self.security: 

53 if not security_req: 

54 auth_funcs.append(self.security_handler_factory.verify_none()) 

55 continue 

56 

57 sec_req_funcs = {} 

58 oauth = False 

59 for scheme_name, required_scopes in security_req.items(): 

60 security_scheme = self.security_schemes[scheme_name] 

61 

62 if security_scheme["type"] == "oauth2": 

63 if oauth: 

64 logger.warning( 

65 "... multiple OAuth2 security schemes in AND fashion not supported", 

66 extra=vars(self), 

67 ) 

68 break 

69 oauth = True 

70 token_info_func = self.security_handler_factory.get_tokeninfo_func( 

71 security_scheme 

72 ) 

73 scope_validate_func = ( 

74 self.security_handler_factory.get_scope_validate_func( 

75 security_scheme 

76 ) 

77 ) 

78 if not token_info_func: 

79 logger.warning("... x-tokenInfoFunc missing", extra=vars(self)) 

80 break 

81 

82 sec_req_funcs[ 

83 scheme_name 

84 ] = self.security_handler_factory.verify_oauth( 

85 token_info_func, scope_validate_func, required_scopes 

86 ) 

87 

88 # Swagger 2.0 

89 elif security_scheme["type"] == "basic": 

90 basic_info_func = self.security_handler_factory.get_basicinfo_func( 

91 security_scheme 

92 ) 

93 if not basic_info_func: 

94 logger.warning("... x-basicInfoFunc missing", extra=vars(self)) 

95 break 

96 

97 sec_req_funcs[ 

98 scheme_name 

99 ] = self.security_handler_factory.verify_basic(basic_info_func) 

100 

101 # OpenAPI 3.0.0 

102 elif security_scheme["type"] == "http": 

103 scheme = security_scheme["scheme"].lower() 

104 if scheme == "basic": 

105 basic_info_func = ( 

106 self.security_handler_factory.get_basicinfo_func( 

107 security_scheme 

108 ) 

109 ) 

110 if not basic_info_func: 

111 logger.warning( 

112 "... x-basicInfoFunc missing", extra=vars(self) 

113 ) 

114 break 

115 

116 sec_req_funcs[ 

117 scheme_name 

118 ] = self.security_handler_factory.verify_basic(basic_info_func) 

119 elif scheme == "bearer": 

120 bearer_info_func = ( 

121 self.security_handler_factory.get_bearerinfo_func( 

122 security_scheme 

123 ) 

124 ) 

125 if not bearer_info_func: 

126 logger.warning( 

127 "... x-bearerInfoFunc missing", extra=vars(self) 

128 ) 

129 break 

130 sec_req_funcs[ 

131 scheme_name 

132 ] = self.security_handler_factory.verify_bearer( 

133 bearer_info_func 

134 ) 

135 else: 

136 logger.warning( 

137 "... Unsupported http authorization scheme %s" % scheme, 

138 extra=vars(self), 

139 ) 

140 break 

141 

142 elif security_scheme["type"] == "apiKey": 

143 scheme = security_scheme.get("x-authentication-scheme", "").lower() 

144 if scheme == "bearer": 

145 bearer_info_func = ( 

146 self.security_handler_factory.get_bearerinfo_func( 

147 security_scheme 

148 ) 

149 ) 

150 if not bearer_info_func: 

151 logger.warning( 

152 "... x-bearerInfoFunc missing", extra=vars(self) 

153 ) 

154 break 

155 sec_req_funcs[ 

156 scheme_name 

157 ] = self.security_handler_factory.verify_bearer( 

158 bearer_info_func 

159 ) 

160 else: 

161 apikey_info_func = ( 

162 self.security_handler_factory.get_apikeyinfo_func( 

163 security_scheme 

164 ) 

165 ) 

166 if not apikey_info_func: 

167 logger.warning( 

168 "... x-apikeyInfoFunc missing", extra=vars(self) 

169 ) 

170 break 

171 

172 sec_req_funcs[ 

173 scheme_name 

174 ] = self.security_handler_factory.verify_api_key( 

175 apikey_info_func, 

176 security_scheme["in"], 

177 security_scheme["name"], 

178 ) 

179 

180 else: 

181 logger.warning( 

182 "... Unsupported security scheme type %s" 

183 % security_scheme["type"], 

184 extra=vars(self), 

185 ) 

186 break 

187 else: 

188 # No break encountered: no missing funcs 

189 if len(sec_req_funcs) == 1: 

190 (func,) = sec_req_funcs.values() 

191 auth_funcs.append(func) 

192 else: 

193 auth_funcs.append( 

194 self.security_handler_factory.verify_multiple_schemes( 

195 sec_req_funcs 

196 ) 

197 ) 

198 

199 return self.security_handler_factory.verify_security(auth_funcs) 

200 

201 async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: 

202 request = ASGIRequest(scope) 

203 await self.verification_fn(request) 

204 await self.next_app(scope, receive, send) 

205 

206 

207class SecurityAPI(RoutedAPI[SecurityOperation]): 

208 def __init__(self, *args, auth_all_paths: bool = False, **kwargs): 

209 super().__init__(*args, **kwargs) 

210 

211 self.security_handler_factory = SecurityHandlerFactory() 

212 

213 if auth_all_paths: 

214 self.add_auth_on_not_found() 

215 else: 

216 self.operations: t.MutableMapping[str, SecurityOperation] = {} 

217 

218 self.add_paths() 

219 

220 def add_auth_on_not_found(self) -> None: 

221 """Register a default SecurityOperation for routes that are not found.""" 

222 default_operation = self.make_operation(self.specification) 

223 self.operations = defaultdict(lambda: default_operation) 

224 

225 def make_operation(self, operation: AbstractOperation) -> SecurityOperation: 

226 return SecurityOperation.from_operation( 

227 operation, 

228 next_app=self.next_app, 

229 security_handler_factory=self.security_handler_factory, 

230 ) 

231 

232 

233class SecurityMiddleware(RoutedMiddleware[SecurityAPI]): 

234 """Middleware to check if operation is accessible on scope.""" 

235 

236 api_cls = SecurityAPI 

237 

238 

239class MissingSecurityOperation(ProblemException): 

240 pass