Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/connexion/middleware/security.py: 22%
107 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
1import logging
2import typing as t
3from collections import defaultdict
5from starlette.types import ASGIApp, Receive, Scope, Send
7from connexion.exceptions import ProblemException
8from connexion.lifecycle import ASGIRequest
9from connexion.middleware.abstract import RoutedAPI, RoutedMiddleware
10from connexion.operations import AbstractOperation
11from connexion.security import SecurityHandlerFactory
13logger = logging.getLogger("connexion.middleware.security")
16class SecurityOperation:
17 def __init__(
18 self,
19 next_app: ASGIApp,
20 *,
21 security_handler_factory: SecurityHandlerFactory,
22 security: list,
23 security_schemes: dict,
24 ):
25 self.next_app = next_app
26 self.security_handler_factory = security_handler_factory
27 self.security = security
28 self.security_schemes = security_schemes
29 self.verification_fn = self._get_verification_fn()
31 @classmethod
32 def from_operation(
33 cls,
34 operation: AbstractOperation,
35 *,
36 next_app: ASGIApp,
37 security_handler_factory: SecurityHandlerFactory,
38 ) -> "SecurityOperation":
39 return cls(
40 next_app=next_app,
41 security_handler_factory=security_handler_factory,
42 security=operation.security,
43 security_schemes=operation.security_schemes,
44 )
46 def _get_verification_fn(self):
47 logger.debug("... Security: %s", self.security, extra=vars(self))
48 if not self.security:
49 return self.security_handler_factory.security_passthrough
51 auth_funcs = []
52 for security_req in self.security:
53 if not security_req:
54 auth_funcs.append(self.security_handler_factory.verify_none())
55 continue
57 sec_req_funcs = {}
58 oauth = False
59 for scheme_name, required_scopes in security_req.items():
60 security_scheme = self.security_schemes[scheme_name]
62 if security_scheme["type"] == "oauth2":
63 if oauth:
64 logger.warning(
65 "... multiple OAuth2 security schemes in AND fashion not supported",
66 extra=vars(self),
67 )
68 break
69 oauth = True
70 token_info_func = self.security_handler_factory.get_tokeninfo_func(
71 security_scheme
72 )
73 scope_validate_func = (
74 self.security_handler_factory.get_scope_validate_func(
75 security_scheme
76 )
77 )
78 if not token_info_func:
79 logger.warning("... x-tokenInfoFunc missing", extra=vars(self))
80 break
82 sec_req_funcs[
83 scheme_name
84 ] = self.security_handler_factory.verify_oauth(
85 token_info_func, scope_validate_func, required_scopes
86 )
88 # Swagger 2.0
89 elif security_scheme["type"] == "basic":
90 basic_info_func = self.security_handler_factory.get_basicinfo_func(
91 security_scheme
92 )
93 if not basic_info_func:
94 logger.warning("... x-basicInfoFunc missing", extra=vars(self))
95 break
97 sec_req_funcs[
98 scheme_name
99 ] = self.security_handler_factory.verify_basic(basic_info_func)
101 # OpenAPI 3.0.0
102 elif security_scheme["type"] == "http":
103 scheme = security_scheme["scheme"].lower()
104 if scheme == "basic":
105 basic_info_func = (
106 self.security_handler_factory.get_basicinfo_func(
107 security_scheme
108 )
109 )
110 if not basic_info_func:
111 logger.warning(
112 "... x-basicInfoFunc missing", extra=vars(self)
113 )
114 break
116 sec_req_funcs[
117 scheme_name
118 ] = self.security_handler_factory.verify_basic(basic_info_func)
119 elif scheme == "bearer":
120 bearer_info_func = (
121 self.security_handler_factory.get_bearerinfo_func(
122 security_scheme
123 )
124 )
125 if not bearer_info_func:
126 logger.warning(
127 "... x-bearerInfoFunc missing", extra=vars(self)
128 )
129 break
130 sec_req_funcs[
131 scheme_name
132 ] = self.security_handler_factory.verify_bearer(
133 bearer_info_func
134 )
135 else:
136 logger.warning(
137 "... Unsupported http authorization scheme %s" % scheme,
138 extra=vars(self),
139 )
140 break
142 elif security_scheme["type"] == "apiKey":
143 scheme = security_scheme.get("x-authentication-scheme", "").lower()
144 if scheme == "bearer":
145 bearer_info_func = (
146 self.security_handler_factory.get_bearerinfo_func(
147 security_scheme
148 )
149 )
150 if not bearer_info_func:
151 logger.warning(
152 "... x-bearerInfoFunc missing", extra=vars(self)
153 )
154 break
155 sec_req_funcs[
156 scheme_name
157 ] = self.security_handler_factory.verify_bearer(
158 bearer_info_func
159 )
160 else:
161 apikey_info_func = (
162 self.security_handler_factory.get_apikeyinfo_func(
163 security_scheme
164 )
165 )
166 if not apikey_info_func:
167 logger.warning(
168 "... x-apikeyInfoFunc missing", extra=vars(self)
169 )
170 break
172 sec_req_funcs[
173 scheme_name
174 ] = self.security_handler_factory.verify_api_key(
175 apikey_info_func,
176 security_scheme["in"],
177 security_scheme["name"],
178 )
180 else:
181 logger.warning(
182 "... Unsupported security scheme type %s"
183 % security_scheme["type"],
184 extra=vars(self),
185 )
186 break
187 else:
188 # No break encountered: no missing funcs
189 if len(sec_req_funcs) == 1:
190 (func,) = sec_req_funcs.values()
191 auth_funcs.append(func)
192 else:
193 auth_funcs.append(
194 self.security_handler_factory.verify_multiple_schemes(
195 sec_req_funcs
196 )
197 )
199 return self.security_handler_factory.verify_security(auth_funcs)
201 async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
202 request = ASGIRequest(scope)
203 await self.verification_fn(request)
204 await self.next_app(scope, receive, send)
207class SecurityAPI(RoutedAPI[SecurityOperation]):
208 def __init__(self, *args, auth_all_paths: bool = False, **kwargs):
209 super().__init__(*args, **kwargs)
211 self.security_handler_factory = SecurityHandlerFactory()
213 if auth_all_paths:
214 self.add_auth_on_not_found()
215 else:
216 self.operations: t.MutableMapping[str, SecurityOperation] = {}
218 self.add_paths()
220 def add_auth_on_not_found(self) -> None:
221 """Register a default SecurityOperation for routes that are not found."""
222 default_operation = self.make_operation(self.specification)
223 self.operations = defaultdict(lambda: default_operation)
225 def make_operation(self, operation: AbstractOperation) -> SecurityOperation:
226 return SecurityOperation.from_operation(
227 operation,
228 next_app=self.next_app,
229 security_handler_factory=self.security_handler_factory,
230 )
233class SecurityMiddleware(RoutedMiddleware[SecurityAPI]):
234 """Middleware to check if operation is accessible on scope."""
236 api_cls = SecurityAPI
239class MissingSecurityOperation(ProblemException):
240 pass