Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/connexion/security.py: 24%
237 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
1"""
2This module defines an abstract SecurityHandlerFactory which supports the creation of security
3handlers for operations.
4"""
6import asyncio
7import base64
8import http.cookies
9import logging
10import os
11import textwrap
12import typing as t
14import httpx
16from connexion.decorators.parameter import inspect_function_arguments
17from connexion.exceptions import (
18 ConnexionException,
19 OAuthProblem,
20 OAuthResponseProblem,
21 OAuthScopeProblem,
22)
23from connexion.utils import get_function_from_name
25logger = logging.getLogger("connexion.api.security")
28class SecurityHandlerFactory:
29 """
30 get_*_func -> _get_function -> get_function_from_name (name=security function defined in spec)
31 (if url defined instead of a function -> get_token_info_remote)
33 std security functions: security_{passthrough,deny}
35 verify_* -> returns a security wrapper around the security function
36 check_* -> returns a function tasked with doing auth for use inside the verify wrapper
37 check helpers (used outside wrappers): _need_to_add_context_or_scopes
38 the security function
40 verify helpers (used inside wrappers): get_auth_header_value, get_cookie_value
41 """
43 no_value = object()
44 required_scopes_kw = "required_scopes"
45 context_kw = "context_"
46 client = None
48 @staticmethod
49 def _get_function(
50 security_definition, security_definition_key, environ_key, default=None
51 ):
52 """
53 Return function by getting its name from security_definition or environment variable
54 """
55 func = security_definition.get(security_definition_key) or os.environ.get(
56 environ_key
57 )
58 if func:
59 return get_function_from_name(func)
60 return default
62 def get_tokeninfo_func(self, security_definition: dict) -> t.Optional[t.Callable]:
63 """
64 :type security_definition: dict
66 >>> get_tokeninfo_url({'x-tokenInfoFunc': 'foo.bar'})
67 '<function foo.bar>'
68 """
69 token_info_func = self._get_function(
70 security_definition, "x-tokenInfoFunc", "TOKENINFO_FUNC"
71 )
72 if token_info_func:
73 return token_info_func
75 token_info_url = security_definition.get("x-tokenInfoUrl") or os.environ.get(
76 "TOKENINFO_URL"
77 )
78 if token_info_url:
79 return self.get_token_info_remote(token_info_url)
81 return None
83 @classmethod
84 def get_scope_validate_func(cls, security_definition):
85 """
86 :type security_definition: dict
87 :rtype: function
89 >>> get_scope_validate_func({'x-scopeValidateFunc': 'foo.bar'})
90 '<function foo.bar>'
91 """
92 return cls._get_function(
93 security_definition,
94 "x-scopeValidateFunc",
95 "SCOPEVALIDATE_FUNC",
96 cls.validate_scope,
97 )
99 @classmethod
100 def get_basicinfo_func(cls, security_definition):
101 """
102 :type security_definition: dict
103 :rtype: function
105 >>> get_basicinfo_func({'x-basicInfoFunc': 'foo.bar'})
106 '<function foo.bar>'
107 """
108 return cls._get_function(
109 security_definition, "x-basicInfoFunc", "BASICINFO_FUNC"
110 )
112 @classmethod
113 def get_apikeyinfo_func(cls, security_definition):
114 """
115 :type security_definition: dict
116 :rtype: function
118 >>> get_apikeyinfo_func({'x-apikeyInfoFunc': 'foo.bar'})
119 '<function foo.bar>'
120 """
121 return cls._get_function(
122 security_definition, "x-apikeyInfoFunc", "APIKEYINFO_FUNC"
123 )
125 @classmethod
126 def get_bearerinfo_func(cls, security_definition):
127 """
128 :type security_definition: dict
129 :rtype: function
131 >>> get_bearerinfo_func({'x-bearerInfoFunc': 'foo.bar'})
132 '<function foo.bar>'
133 """
134 return cls._get_function(
135 security_definition, "x-bearerInfoFunc", "BEARERINFO_FUNC"
136 )
138 @staticmethod
139 async def security_passthrough(request):
140 return request
142 @staticmethod
143 def security_deny(function):
144 """
145 :type function: types.FunctionType
146 :rtype: types.FunctionType
147 """
149 def deny(*args, **kwargs):
150 raise ConnexionException("Error in security definitions")
152 return deny
154 @staticmethod
155 def validate_scope(required_scopes, token_scopes):
156 """
157 :param required_scopes: Scopes required to access operation
158 :param token_scopes: Scopes granted by authorization server
159 :rtype: bool
160 """
161 required_scopes = set(required_scopes)
162 if isinstance(token_scopes, list):
163 token_scopes = set(token_scopes)
164 else:
165 token_scopes = set(token_scopes.split())
166 logger.debug("... Scopes required: %s", required_scopes)
167 logger.debug("... Token scopes: %s", token_scopes)
168 if not required_scopes <= token_scopes:
169 logger.info(
170 textwrap.dedent(
171 """
172 ... Token scopes (%s) do not match the scopes necessary to call endpoint (%s).
173 Aborting with 403."""
174 ).replace("\n", ""),
175 token_scopes,
176 required_scopes,
177 )
178 return False
179 return True
181 @staticmethod
182 def get_auth_header_value(request):
183 """
184 Called inside security wrapper functions
186 Return Authorization type and value if any.
187 If not Authorization, return (None, None)
188 Raise OAuthProblem for invalid Authorization header
189 """
190 authorization = request.headers.get("Authorization")
191 if not authorization:
192 return None, None
194 try:
195 auth_type, value = authorization.split(None, 1)
196 except ValueError:
197 raise OAuthProblem(detail="Invalid authorization header")
198 return auth_type.lower(), value
200 def verify_oauth(self, token_info_func, scope_validate_func, required_scopes):
201 check_oauth_func = self.check_oauth_func(token_info_func, scope_validate_func)
203 def wrapper(request):
204 auth_type, token = self.get_auth_header_value(request)
205 if auth_type != "bearer":
206 return self.no_value
208 return check_oauth_func(request, token, required_scopes=required_scopes)
210 return wrapper
212 def verify_basic(self, basic_info_func):
213 check_basic_info_func = self.check_basic_auth(basic_info_func)
215 def wrapper(request):
216 auth_type, user_pass = self.get_auth_header_value(request)
217 if auth_type != "basic":
218 return self.no_value
220 try:
221 username, password = (
222 base64.b64decode(user_pass).decode("latin1").split(":", 1)
223 )
224 except Exception:
225 raise OAuthProblem(detail="Invalid authorization header")
227 return check_basic_info_func(request, username, password)
229 return wrapper
231 @staticmethod
232 def get_cookie_value(cookies, name):
233 """
234 Called inside security wrapper functions
236 Returns cookie value by its name. None if no such value.
237 :param cookies: str: cookies raw data
238 :param name: str: cookies key
239 """
240 cookie_parser = http.cookies.SimpleCookie()
241 cookie_parser.load(str(cookies))
242 try:
243 return cookie_parser[name].value
244 except KeyError:
245 return None
247 def verify_api_key(self, api_key_info_func, loc, name):
248 check_api_key_func = self.check_api_key(api_key_info_func)
250 def wrapper(request):
251 def _immutable_pop(_dict, key):
252 """
253 Pops the key from an immutable dict and returns the value that was popped,
254 and a new immutable dict without the popped key.
255 """
256 cls = type(_dict)
257 try:
258 _dict = _dict.to_dict(flat=False)
259 return _dict.pop(key)[0], cls(_dict)
260 except AttributeError:
261 _dict = dict(_dict.items())
262 return _dict.pop(key), cls(_dict)
264 if loc == "query":
265 try:
266 api_key, request.query = _immutable_pop(request.query, name)
267 except KeyError:
268 api_key = None
269 elif loc == "header":
270 api_key = request.headers.get(name)
271 elif loc == "cookie":
272 cookie_list = request.headers.get("Cookie")
273 api_key = self.get_cookie_value(cookie_list, name)
274 else:
275 return self.no_value
277 if api_key is None:
278 return self.no_value
280 return check_api_key_func(request, api_key)
282 return wrapper
284 def verify_bearer(self, token_info_func):
285 """
286 :param token_info_func: types.FunctionType
287 :rtype: types.FunctionType
288 """
289 check_bearer_func = self.check_bearer_token(token_info_func)
291 def wrapper(request):
292 auth_type, token = self.get_auth_header_value(request)
293 if auth_type != "bearer":
294 return self.no_value
295 return check_bearer_func(request, token)
297 return wrapper
299 def verify_multiple_schemes(self, schemes):
300 """
301 Verifies multiple authentication schemes in AND fashion.
302 If any scheme fails, the entire authentication fails.
304 :param schemes: mapping scheme_name to auth function
305 :type schemes: dict
306 :rtype: types.FunctionType
307 """
309 async def wrapper(request):
310 token_info = {}
311 for scheme_name, func in schemes.items():
312 result = func(request)
313 while asyncio.iscoroutine(result):
314 result = await result
315 if result is self.no_value:
316 return self.no_value
317 token_info[scheme_name] = result
319 return token_info
321 return wrapper
323 @staticmethod
324 def verify_none():
325 """
326 :rtype: types.FunctionType
327 """
329 def wrapper(request):
330 return {}
332 return wrapper
334 def _need_to_add_context_or_scopes(self, func):
335 arguments, has_kwargs = inspect_function_arguments(func)
336 need_context = self.context_kw in arguments
337 need_required_scopes = has_kwargs or self.required_scopes_kw in arguments
338 return need_context, need_required_scopes
340 def _generic_check(self, func, exception_msg):
341 (
342 need_to_add_context,
343 need_to_add_required_scopes,
344 ) = self._need_to_add_context_or_scopes(func)
346 async def wrapper(request, *args, required_scopes=None):
347 kwargs = {}
348 if need_to_add_context:
349 kwargs[self.context_kw] = request.context
350 if need_to_add_required_scopes:
351 kwargs[self.required_scopes_kw] = required_scopes
352 token_info = func(*args, **kwargs)
353 while asyncio.iscoroutine(token_info):
354 token_info = await token_info
355 if token_info is self.no_value:
356 return self.no_value
357 if token_info is None:
358 raise OAuthResponseProblem(detail=exception_msg)
359 return token_info
361 return wrapper
363 def check_bearer_token(self, token_info_func):
364 return self._generic_check(token_info_func, "Provided token is not valid")
366 def check_basic_auth(self, basic_info_func):
367 return self._generic_check(
368 basic_info_func, "Provided authorization is not valid"
369 )
371 def check_api_key(self, api_key_info_func):
372 return self._generic_check(api_key_info_func, "Provided apikey is not valid")
374 def check_oauth_func(self, token_info_func, scope_validate_func):
375 get_token_info = self._generic_check(
376 token_info_func, "Provided token is not valid"
377 )
378 need_to_add_context, _ = self._need_to_add_context_or_scopes(
379 scope_validate_func
380 )
382 async def wrapper(request, token, required_scopes):
383 token_info = await get_token_info(
384 request, token, required_scopes=required_scopes
385 )
387 # Fallback to 'scopes' for backward compatibility
388 token_scopes = token_info.get("scope", token_info.get("scopes", ""))
390 kwargs = {}
391 if need_to_add_context:
392 kwargs[self.context_kw] = request.context
393 validation = scope_validate_func(required_scopes, token_scopes, **kwargs)
394 while asyncio.iscoroutine(validation):
395 validation = await validation
396 if not validation:
397 raise OAuthScopeProblem(
398 required_scopes=required_scopes,
399 token_scopes=token_scopes,
400 )
402 return token_info
404 return wrapper
406 @classmethod
407 def verify_security(cls, auth_funcs):
408 async def verify_fn(request):
409 token_info = cls.no_value
410 errors = []
411 for func in auth_funcs:
412 try:
413 token_info = func(request)
414 while asyncio.iscoroutine(token_info):
415 token_info = await token_info
416 if token_info is not cls.no_value:
417 break
418 except Exception as err:
419 errors.append(err)
421 else:
422 if errors != []:
423 cls._raise_most_specific(errors)
424 else:
425 logger.info("... No auth provided. Aborting with 401.")
426 raise OAuthProblem(detail="No authorization token provided")
428 request.context.update(
429 {
430 # Fallback to 'uid' for backward compatibility
431 "user": token_info.get("sub", token_info.get("uid")),
432 "token_info": token_info,
433 }
434 )
436 return verify_fn
438 @staticmethod
439 def _raise_most_specific(exceptions: t.List[Exception]) -> None:
440 """Raises the most specific error from a list of exceptions by status code.
442 The status codes are expected to be either in the `code`
443 or in the `status` attribute of the exceptions.
445 The order is as follows:
446 - 403: valid credentials but not enough privileges
447 - 401: no or invalid credentials
448 - for other status codes, the smallest one is selected
450 :param errors: List of exceptions.
451 :type errors: t.List[Exception]
452 """
453 if not exceptions:
454 return
455 # We only use status code attributes from exceptions
456 # We use 600 as default because 599 is highest valid status code
457 status_to_exc = {
458 getattr(exc, "status_code", getattr(exc, "status", 600)): exc
459 for exc in exceptions
460 }
461 if 403 in status_to_exc:
462 raise status_to_exc[403]
463 elif 401 in status_to_exc:
464 raise status_to_exc[401]
465 else:
466 lowest_status_code = min(status_to_exc)
467 raise status_to_exc[lowest_status_code]
469 def get_token_info_remote(self, token_info_url):
470 """
471 Return a function which will call `token_info_url` to retrieve token info.
473 Returned function must accept oauth token in parameter.
474 It must return a token_info dict in case of success, None otherwise.
476 :param token_info_url: Url to get information about the token
477 :type token_info_url: str
478 :rtype: types.FunctionType
479 """
481 async def wrapper(token):
482 if self.client is None:
483 self.client = httpx.AsyncClient()
484 headers = {"Authorization": f"Bearer {token}"}
485 token_request = await self.client.get(
486 token_info_url, headers=headers, timeout=5
487 )
488 if token_request.status_code != 200:
489 return
490 return token_request.json()
492 return wrapper