1"""
2This module defines a SecurityHandlerFactory which supports the creation of
3SecurityHandler instances for different security schemes.
4
5It also exposes a `SECURITY_HANDLERS` dictionary which maps security scheme
6types to SecurityHandler classes. This dictionary can be used to register
7custom SecurityHandler classes for custom security schemes, or to overwrite
8existing SecurityHandler classes.
9This can be done by supplying a value for `security_map` argument of the
10SecurityHandlerFactory.
11
12Swagger 2.0 lets you define the following authentication types for an API:
13
14- Basic authentication
15- API key (as a header or a query string parameter)
16- OAuth 2 common flows (authorization code, implicit, resource owner password credentials, client credentials)
17
18
19Changes from OpenAPI 2.0 to OpenAPI 3.0
20If you used OpenAPI 2.0 before, here is a summary of changes to help you get started with OpenAPI 3.0:
21- securityDefinitions were renamed to securitySchemes and moved inside components.
22- type: basic was replaced with type: http and scheme: basic.
23- The new type: http is an umbrella type for all HTTP security schemes, including Basic, Bearer and other,
24and the scheme keyword indicates the scheme type.
25- API keys can now be sent in: cookie.
26- Added support for OpenID Connect Discovery (type: openIdConnect).
27- OAuth 2 security schemes can now define multiple flows.
28- OAuth 2 flows were renamed to match the OAuth 2 Specification: accessCode is now authorizationCode,
29and application is now clientCredentials.
30
31
32OpenAPI uses the term security scheme for authentication and authorization schemes.
33OpenAPI 3.0 lets you describe APIs protected using the following security schemes:
34
35- HTTP authentication schemes (they use the Authorization header):
36 - Basic
37 - Bearer
38 - other HTTP schemes as defined by RFC 7235 and HTTP Authentication Scheme Registry
39- API keys in headers, query string or cookies
40 - Cookie authentication
41- OAuth 2
42- OpenID Connect Discovery
43
44"""
45
46import asyncio
47import base64
48import http.cookies
49import logging
50import os
51import typing as t
52
53import httpx
54
55from connexion.decorators.parameter import inspect_function_arguments
56from connexion.exceptions import OAuthProblem, OAuthResponseProblem, OAuthScopeProblem
57from connexion.lifecycle import ConnexionRequest
58from connexion.utils import get_function_from_name
59
60logger = logging.getLogger(__name__)
61
62
63NO_VALUE = object()
64"""Sentinel value to indicate that no security credentials were found."""
65
66
67class AbstractSecurityHandler:
68
69 required_scopes_kw = "required_scopes"
70 request_kw = "request"
71 client = None
72 security_definition_key: str
73 """The key which contains the value for the function name to resolve."""
74 environ_key: str
75 """The name of the environment variable that can be used alternatively for the function name."""
76
77 def get_fn(self, security_scheme, required_scopes):
78 """Returns the handler function"""
79 security_func = self._resolve_func(security_scheme)
80 if not security_func:
81 logger.warning("... %s missing", self.security_definition_key)
82 return None
83
84 return self._get_verify_func(security_func)
85
86 @classmethod
87 def _get_function(
88 cls,
89 security_definition: dict,
90 security_definition_key: str,
91 environ_key: str,
92 default: t.Optional[t.Callable] = None,
93 ):
94 """
95 Return function by getting its name from security_definition or environment variable
96
97 :param security_definition: Security Definition (scheme) from the spec.
98 :param security_definition_key: The key which contains the value for the function name to resolve.
99 :param environ_key: The name of the environment variable that can be used alternatively for the function name.
100 :param default: The default to use in case the function cannot be found based on the security_definition_key or the environ_key
101 """
102 func_name = security_definition.get(security_definition_key) or os.environ.get(
103 environ_key
104 )
105 if func_name:
106 return get_function_from_name(func_name)
107 return default
108
109 def _generic_check(self, func, exception_msg):
110 async def wrapper(request, *args, required_scopes=None):
111 kwargs = {}
112 if self._accepts_kwarg(func, self.required_scopes_kw):
113 kwargs[self.required_scopes_kw] = required_scopes
114 if self._accepts_kwarg(func, self.request_kw):
115 kwargs[self.request_kw] = request
116 token_info = func(*args, **kwargs)
117 while asyncio.iscoroutine(token_info):
118 token_info = await token_info
119 if token_info is NO_VALUE:
120 return NO_VALUE
121 if token_info is None:
122 raise OAuthResponseProblem(detail=exception_msg)
123 return token_info
124
125 return wrapper
126
127 @staticmethod
128 def get_auth_header_value(request):
129 """
130 Return Authorization type and value if any.
131 If not Authorization, return (None, None)
132 Raise OAuthProblem for invalid Authorization header
133 """
134 authorization = request.headers.get("Authorization")
135 if not authorization:
136 return None, None
137
138 try:
139 auth_type, value = authorization.split(maxsplit=1)
140 except ValueError:
141 raise OAuthProblem(detail="Invalid authorization header")
142 return auth_type.lower(), value
143
144 @staticmethod
145 def _accepts_kwarg(func: t.Callable, keyword: str) -> bool:
146 """Check if the function accepts the provided keyword argument."""
147 arguments, has_kwargs = inspect_function_arguments(func)
148 return has_kwargs or keyword in arguments
149
150 def _resolve_func(self, security_scheme):
151 """
152 Get the user function object based on the security scheme or the environment variable.
153
154 :param security_scheme: Security Definition (scheme) from the spec.
155 """
156 return self._get_function(
157 security_scheme, self.security_definition_key, self.environ_key
158 )
159
160 def _get_verify_func(self, function):
161 """
162 Wraps the user security function in a function that checks the request for the correct
163 security credentials and calls the user function with the correct arguments.
164 """
165 return self._generic_check(function, "Provided authorization is not valid")
166
167
168class BasicSecurityHandler(AbstractSecurityHandler):
169 """
170 Security Handler for
171 - `type: basic` (Swagger 2), and
172 - `type: http` and `scheme: basic` (OpenAPI 3)
173 """
174
175 security_definition_key = "x-basicInfoFunc"
176 environ_key = "BASICINFO_FUNC"
177
178 def _get_verify_func(self, basic_info_func):
179 check_basic_info_func = self.check_basic_auth(basic_info_func)
180
181 def wrapper(request):
182 auth_type, user_pass = self.get_auth_header_value(request)
183 if auth_type != "basic":
184 return NO_VALUE
185
186 try:
187 username, password = (
188 base64.b64decode(user_pass).decode("latin1").split(":", 1)
189 )
190 except Exception:
191 raise OAuthProblem(detail="Invalid authorization header")
192
193 return check_basic_info_func(request, username, password)
194
195 return wrapper
196
197 def check_basic_auth(self, basic_info_func):
198 return self._generic_check(
199 basic_info_func, "Provided authorization is not valid"
200 )
201
202
203class BearerSecurityHandler(AbstractSecurityHandler):
204 """
205 Security Handler for HTTP Bearer authentication.
206 """
207
208 security_definition_key = "x-bearerInfoFunc"
209 environ_key = "BEARERINFO_FUNC"
210
211 def check_bearer_token(self, token_info_func):
212 return self._generic_check(token_info_func, "Provided token is not valid")
213
214 def _get_verify_func(self, token_info_func):
215 """
216 :param token_info_func: types.FunctionType
217 :rtype: types.FunctionType
218 """
219 check_bearer_func = self.check_bearer_token(token_info_func)
220
221 def wrapper(request):
222 auth_type, token = self.get_auth_header_value(request)
223 if auth_type != "bearer":
224 return NO_VALUE
225 return check_bearer_func(request, token)
226
227 return wrapper
228
229
230class ApiKeySecurityHandler(AbstractSecurityHandler):
231 """
232 Security Handler for API Keys.
233 """
234
235 security_definition_key = "x-apikeyInfoFunc"
236 environ_key = "APIKEYINFO_FUNC"
237
238 def get_fn(self, security_scheme, required_scopes):
239 apikey_info_func = self._resolve_func(security_scheme)
240 if not apikey_info_func:
241 logger.warning("... %s missing", self.security_definition_key)
242 return None
243
244 return self._get_verify_func(
245 apikey_info_func,
246 security_scheme["in"],
247 security_scheme["name"],
248 required_scopes,
249 )
250
251 def _get_verify_func(self, api_key_info_func, loc, name, required_scopes):
252 check_api_key_func = self.check_api_key(api_key_info_func)
253
254 def wrapper(request: ConnexionRequest):
255 if loc == "query":
256 api_key = request.query_params.get(name)
257 elif loc == "header":
258 api_key = request.headers.get(name)
259 elif loc == "cookie":
260 cookie_list = request.headers.get("Cookie")
261 api_key = self.get_cookie_value(cookie_list, name)
262 else:
263 return NO_VALUE
264
265 if api_key is None:
266 return NO_VALUE
267
268 return check_api_key_func(request, api_key, required_scopes=required_scopes)
269
270 return wrapper
271
272 def check_api_key(self, api_key_info_func):
273 return self._generic_check(api_key_info_func, "Provided apikey is not valid")
274
275 @staticmethod
276 def get_cookie_value(cookies, name):
277 """
278 Returns cookie value by its name. `None` if no such value.
279
280 :param cookies: str: cookies raw data
281 :param name: str: cookies key
282 """
283 cookie_parser = http.cookies.SimpleCookie()
284 cookie_parser.load(str(cookies))
285 try:
286 return cookie_parser[name].value
287 except KeyError:
288 return None
289
290
291class OAuthSecurityHandler(AbstractSecurityHandler):
292 """
293 Security Handler for the OAuth security scheme.
294 """
295
296 def get_fn(self, security_scheme, required_scopes):
297 token_info_func = self.get_tokeninfo_func(security_scheme)
298 scope_validate_func = self.get_scope_validate_func(security_scheme)
299 if not token_info_func:
300 logger.warning("... x-tokenInfoFunc missing")
301 return None
302
303 return self._get_verify_func(
304 token_info_func, scope_validate_func, required_scopes
305 )
306
307 def get_tokeninfo_func(self, security_definition: dict) -> t.Optional[t.Callable]:
308 """
309 Gets the function for retrieving the token info.
310 It is possible to specify a function or a URL. The function variant is
311 preferred. If it is not found, the URL variant is used with the
312 `get_token_info_remote` function.
313
314 >>> get_tokeninfo_func({'x-tokenInfoFunc': 'foo.bar'})
315 '<function foo.bar>'
316 """
317 token_info_func = self._get_function(
318 security_definition, "x-tokenInfoFunc", "TOKENINFO_FUNC"
319 )
320 if token_info_func:
321 return token_info_func
322
323 token_info_url = security_definition.get("x-tokenInfoUrl") or os.environ.get(
324 "TOKENINFO_URL"
325 )
326 if token_info_url:
327 return self.get_token_info_remote(token_info_url)
328
329 return None
330
331 @classmethod
332 def get_scope_validate_func(cls, security_definition):
333 """
334 Gets the function for validating the token scopes.
335 If it is not found, the default `validate_scope` function is used.
336
337 >>> get_scope_validate_func({'x-scopeValidateFunc': 'foo.bar'})
338 '<function foo.bar>'
339 """
340 return cls._get_function(
341 security_definition,
342 "x-scopeValidateFunc",
343 "SCOPEVALIDATE_FUNC",
344 cls.validate_scope,
345 )
346
347 @staticmethod
348 def validate_scope(required_scopes, token_scopes):
349 """
350 :param required_scopes: Scopes required to access operation
351 :param token_scopes: Scopes granted by authorization server
352 :rtype: bool
353 """
354 required_scopes = set(required_scopes)
355 if isinstance(token_scopes, list):
356 token_scopes = set(token_scopes)
357 else:
358 token_scopes = set(token_scopes.split())
359 logger.debug("... Scopes required: %s", required_scopes)
360 logger.debug("... Token scopes: %s", token_scopes)
361 if not required_scopes <= token_scopes:
362 logger.info(
363 "... Token scopes (%s) do not match the scopes necessary to call endpoint (%s)."
364 " Aborting with 403.",
365 token_scopes,
366 required_scopes,
367 )
368 return False
369 return True
370
371 def get_token_info_remote(self, token_info_url: str) -> t.Callable:
372 """
373 Return a function which will call `token_info_url` to retrieve token info.
374
375 Returned function must accept oauth token in parameter.
376 It must return a token_info dict in case of success, None otherwise.
377
378 :param token_info_url: URL to get information about the token
379 """
380
381 async def wrapper(token):
382 if self.client is None:
383 self.client = httpx.AsyncClient()
384 headers = {"Authorization": f"Bearer {token}"}
385 token_request = await self.client.get(
386 token_info_url, headers=headers, timeout=5
387 )
388 if token_request.status_code != 200:
389 return
390 return token_request.json()
391
392 return wrapper
393
394 def _get_verify_func(self, token_info_func, scope_validate_func, required_scopes):
395 check_oauth_func = self.check_oauth_func(token_info_func, scope_validate_func)
396
397 def wrapper(request):
398 auth_type, token = self.get_auth_header_value(request)
399 if auth_type != "bearer":
400 return NO_VALUE
401
402 return check_oauth_func(request, token, required_scopes=required_scopes)
403
404 return wrapper
405
406 def check_oauth_func(self, token_info_func, scope_validate_func):
407 get_token_info = self._generic_check(
408 token_info_func, "Provided token is not valid"
409 )
410
411 async def wrapper(request, token, required_scopes):
412 token_info = await get_token_info(
413 request, token, required_scopes=required_scopes
414 )
415
416 # Fallback to 'scopes' for backward compatibility
417 token_scopes = token_info.get("scope", token_info.get("scopes", ""))
418
419 validation = scope_validate_func(required_scopes, token_scopes)
420 while asyncio.iscoroutine(validation):
421 validation = await validation
422 if not validation:
423 raise OAuthScopeProblem(
424 required_scopes=required_scopes,
425 token_scopes=token_scopes,
426 )
427
428 return token_info
429
430 return wrapper
431
432
433SECURITY_HANDLERS = {
434 # Swagger 2: `type: basic`
435 # OpenAPI 3: `type: http` and `scheme: basic`
436 "basic": BasicSecurityHandler,
437 # Swagger 2 and OpenAPI 3
438 "apiKey": ApiKeySecurityHandler,
439 "oauth2": OAuthSecurityHandler,
440 # OpenAPI 3: http schemes
441 "bearer": BearerSecurityHandler,
442}
443
444
445class SecurityHandlerFactory:
446 """
447 A factory class for parsing security schemes and returning the appropriate
448 security handler.
449
450 By default, it will use the built-in security handlers specified in the
451 SECURITY_HANDLERS dict, but you can also pass in your own security handlers
452 to override the built-in ones.
453 """
454
455 def __init__(
456 self,
457 security_handlers: t.Optional[dict] = None,
458 ) -> None:
459 self.security_handlers = SECURITY_HANDLERS.copy()
460 if security_handlers is not None:
461 self.security_handlers.update(security_handlers)
462
463 def parse_security_scheme(
464 self,
465 security_scheme: dict,
466 required_scopes: t.List[str],
467 ) -> t.Optional[t.Callable]:
468 """Parses the security scheme and returns the function for verifying it.
469
470 :param security_scheme: The security scheme from the spec.
471 :param required_scopes: List of scopes for this security scheme.
472 """
473 security_type = security_scheme["type"]
474 if security_type in ("basic", "oauth2"):
475 security_handler = self.security_handlers[security_type]
476 return security_handler().get_fn(security_scheme, required_scopes)
477
478 # OpenAPI 3.0.0
479 elif security_type == "http":
480 scheme = security_scheme["scheme"].lower()
481 if scheme in self.security_handlers:
482 security_handler = self.security_handlers[scheme]
483 return security_handler().get_fn(security_scheme, required_scopes)
484 else:
485 logger.warning("... Unsupported http authorization scheme %s", scheme)
486 return None
487
488 elif security_type == "apiKey":
489 scheme = security_scheme.get("x-authentication-scheme", "").lower()
490 if scheme == "bearer":
491 return BearerSecurityHandler().get_fn(security_scheme, required_scopes)
492 else:
493 security_handler = self.security_handlers["apiKey"]
494 return security_handler().get_fn(security_scheme, required_scopes)
495
496 elif security_type == "openIdConnect":
497 if security_type in self.security_handlers:
498 security_handler = self.security_handlers[security_type]
499 return security_handler().get_fn(security_scheme, required_scopes)
500 logger.warning("... No default implementation for openIdConnect")
501 return None
502
503 # Custom security scheme handler
504 elif (
505 "scheme" in security_scheme
506 and (scheme := security_scheme["scheme"].lower()) in self.security_handlers
507 ):
508 security_handler = self.security_handlers[scheme]
509 return security_handler().get_fn(security_scheme, required_scopes)
510
511 # Custom security type handler
512 elif security_type in self.security_handlers:
513 security_handler = self.security_handlers[security_type]
514 return security_handler().get_fn(security_scheme, required_scopes)
515
516 else:
517 logger.warning(
518 "... Unsupported security scheme type %s",
519 security_type,
520 )
521 return None
522
523 @staticmethod
524 async def security_passthrough(request):
525 """Used when no security is required for the operation.
526
527 Equivalent OpenAPI snippet:
528
529 .. code-block:: yaml
530
531 /helloworld
532 get:
533 security: [] # No security
534 ...
535 """
536 return request
537
538 @staticmethod
539 def verify_none(request):
540 """Used for optional security.
541
542 Equivalent OpenAPI snippet:
543
544 .. code-block:: yaml
545
546 security:
547 - {} # <--
548 - myapikey: []
549 """
550 return {}
551
552 def verify_multiple_schemes(self, schemes):
553 """
554 Verifies multiple authentication schemes in AND fashion.
555 If any scheme fails, the entire authentication fails.
556
557 :param schemes: mapping scheme_name to auth function
558 :type schemes: dict
559 :rtype: types.FunctionType
560 """
561
562 async def wrapper(request):
563 token_info = {}
564 for scheme_name, func in schemes.items():
565 result = func(request)
566 while asyncio.iscoroutine(result):
567 result = await result
568 if result is NO_VALUE:
569 return NO_VALUE
570 token_info[scheme_name] = result
571
572 return token_info
573
574 return wrapper
575
576 @classmethod
577 def verify_security(cls, auth_funcs):
578 async def verify_fn(request):
579 token_info = NO_VALUE
580 errors = []
581 for func in auth_funcs:
582 try:
583 token_info = func(request)
584 while asyncio.iscoroutine(token_info):
585 token_info = await token_info
586 if token_info is not NO_VALUE:
587 break
588 except Exception as err:
589 errors.append(err)
590
591 else:
592 if errors != []:
593 cls._raise_most_specific(errors)
594 else:
595 logger.info("... No auth provided. Aborting with 401.")
596 raise OAuthProblem(detail="No authorization token provided")
597
598 request.context.update(
599 {
600 # Fallback to 'uid' for backward compatibility
601 "user": token_info.get("sub", token_info.get("uid")),
602 "token_info": token_info,
603 }
604 )
605
606 return verify_fn
607
608 @staticmethod
609 def _raise_most_specific(exceptions: t.List[Exception]) -> None:
610 """Raises the most specific error from a list of exceptions by status code.
611
612 The status codes are expected to be either in the `code`
613 or in the `status` attribute of the exceptions.
614
615 The order is as follows:
616 - 403: valid credentials but not enough privileges
617 - 401: no or invalid credentials
618 - for other status codes, the smallest one is selected
619
620 :param errors: List of exceptions.
621 :type errors: t.List[Exception]
622 """
623 if not exceptions:
624 return
625 # We only use status code attributes from exceptions
626 # We use 600 as default because 599 is highest valid status code
627 status_to_exc = {
628 getattr(exc, "status_code", getattr(exc, "status", 600)): exc
629 for exc in exceptions
630 }
631 if 403 in status_to_exc:
632 raise status_to_exc[403]
633 elif 401 in status_to_exc:
634 raise status_to_exc[401]
635 else:
636 lowest_status_code = min(status_to_exc)
637 raise status_to_exc[lowest_status_code]