Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/connexion/security.py: 26%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

281 statements  

1""" 

2This module defines a SecurityHandlerFactory which supports the creation of 

3SecurityHandler instances for different security schemes. 

4 

5It also exposes a `SECURITY_HANDLERS` dictionary which maps security scheme 

6types to SecurityHandler classes. This dictionary can be used to register 

7custom SecurityHandler classes for custom security schemes, or to overwrite 

8existing SecurityHandler classes. 

9This can be done by supplying a value for `security_map` argument of the 

10SecurityHandlerFactory. 

11 

12Swagger 2.0 lets you define the following authentication types for an API: 

13 

14- Basic authentication 

15- API key (as a header or a query string parameter) 

16- OAuth 2 common flows (authorization code, implicit, resource owner password credentials, client credentials) 

17 

18 

19Changes from OpenAPI 2.0 to OpenAPI 3.0 

20If you used OpenAPI 2.0 before, here is a summary of changes to help you get started with OpenAPI 3.0: 

21- securityDefinitions were renamed to securitySchemes and moved inside components. 

22- type: basic was replaced with type: http and scheme: basic. 

23- The new type: http is an umbrella type for all HTTP security schemes, including Basic, Bearer and other, 

24and the scheme keyword indicates the scheme type. 

25- API keys can now be sent in: cookie. 

26- Added support for OpenID Connect Discovery (type: openIdConnect). 

27- OAuth 2 security schemes can now define multiple flows. 

28- OAuth 2 flows were renamed to match the OAuth 2 Specification: accessCode is now authorizationCode, 

29and application is now clientCredentials. 

30 

31 

32OpenAPI uses the term security scheme for authentication and authorization schemes. 

33OpenAPI 3.0 lets you describe APIs protected using the following security schemes: 

34 

35- HTTP authentication schemes (they use the Authorization header): 

36 - Basic 

37 - Bearer 

38 - other HTTP schemes as defined by RFC 7235 and HTTP Authentication Scheme Registry 

39- API keys in headers, query string or cookies 

40 - Cookie authentication 

41- OAuth 2 

42- OpenID Connect Discovery 

43 

44""" 

45 

46import asyncio 

47import base64 

48import http.cookies 

49import logging 

50import os 

51import typing as t 

52 

53import httpx 

54 

55from connexion.decorators.parameter import inspect_function_arguments 

56from connexion.exceptions import OAuthProblem, OAuthResponseProblem, OAuthScopeProblem 

57from connexion.lifecycle import ConnexionRequest 

58from connexion.utils import get_function_from_name 

59 

60logger = logging.getLogger(__name__) 

61 

62 

63NO_VALUE = object() 

64"""Sentinel value to indicate that no security credentials were found.""" 

65 

66 

67class AbstractSecurityHandler: 

68 

69 required_scopes_kw = "required_scopes" 

70 request_kw = "request" 

71 client = None 

72 security_definition_key: str 

73 """The key which contains the value for the function name to resolve.""" 

74 environ_key: str 

75 """The name of the environment variable that can be used alternatively for the function name.""" 

76 

77 def get_fn(self, security_scheme, required_scopes): 

78 """Returns the handler function""" 

79 security_func = self._resolve_func(security_scheme) 

80 if not security_func: 

81 logger.warning("... %s missing", self.security_definition_key) 

82 return None 

83 

84 return self._get_verify_func(security_func) 

85 

86 @classmethod 

87 def _get_function( 

88 cls, 

89 security_definition: dict, 

90 security_definition_key: str, 

91 environ_key: str, 

92 default: t.Optional[t.Callable] = None, 

93 ): 

94 """ 

95 Return function by getting its name from security_definition or environment variable 

96 

97 :param security_definition: Security Definition (scheme) from the spec. 

98 :param security_definition_key: The key which contains the value for the function name to resolve. 

99 :param environ_key: The name of the environment variable that can be used alternatively for the function name. 

100 :param default: The default to use in case the function cannot be found based on the security_definition_key or the environ_key 

101 """ 

102 func_name = security_definition.get(security_definition_key) or os.environ.get( 

103 environ_key 

104 ) 

105 if func_name: 

106 return get_function_from_name(func_name) 

107 return default 

108 

109 def _generic_check(self, func, exception_msg): 

110 async def wrapper(request, *args, required_scopes=None): 

111 kwargs = {} 

112 if self._accepts_kwarg(func, self.required_scopes_kw): 

113 kwargs[self.required_scopes_kw] = required_scopes 

114 if self._accepts_kwarg(func, self.request_kw): 

115 kwargs[self.request_kw] = request 

116 token_info = func(*args, **kwargs) 

117 while asyncio.iscoroutine(token_info): 

118 token_info = await token_info 

119 if token_info is NO_VALUE: 

120 return NO_VALUE 

121 if token_info is None: 

122 raise OAuthResponseProblem(detail=exception_msg) 

123 return token_info 

124 

125 return wrapper 

126 

127 @staticmethod 

128 def get_auth_header_value(request): 

129 """ 

130 Return Authorization type and value if any. 

131 If not Authorization, return (None, None) 

132 Raise OAuthProblem for invalid Authorization header 

133 """ 

134 authorization = request.headers.get("Authorization") 

135 if not authorization: 

136 return None, None 

137 

138 try: 

139 auth_type, value = authorization.split(maxsplit=1) 

140 except ValueError: 

141 raise OAuthProblem(detail="Invalid authorization header") 

142 return auth_type.lower(), value 

143 

144 @staticmethod 

145 def _accepts_kwarg(func: t.Callable, keyword: str) -> bool: 

146 """Check if the function accepts the provided keyword argument.""" 

147 arguments, has_kwargs = inspect_function_arguments(func) 

148 return has_kwargs or keyword in arguments 

149 

150 def _resolve_func(self, security_scheme): 

151 """ 

152 Get the user function object based on the security scheme or the environment variable. 

153 

154 :param security_scheme: Security Definition (scheme) from the spec. 

155 """ 

156 return self._get_function( 

157 security_scheme, self.security_definition_key, self.environ_key 

158 ) 

159 

160 def _get_verify_func(self, function): 

161 """ 

162 Wraps the user security function in a function that checks the request for the correct 

163 security credentials and calls the user function with the correct arguments. 

164 """ 

165 return self._generic_check(function, "Provided authorization is not valid") 

166 

167 

168class BasicSecurityHandler(AbstractSecurityHandler): 

169 """ 

170 Security Handler for 

171 - `type: basic` (Swagger 2), and 

172 - `type: http` and `scheme: basic` (OpenAPI 3) 

173 """ 

174 

175 security_definition_key = "x-basicInfoFunc" 

176 environ_key = "BASICINFO_FUNC" 

177 

178 def _get_verify_func(self, basic_info_func): 

179 check_basic_info_func = self.check_basic_auth(basic_info_func) 

180 

181 def wrapper(request): 

182 auth_type, user_pass = self.get_auth_header_value(request) 

183 if auth_type != "basic": 

184 return NO_VALUE 

185 

186 try: 

187 username, password = ( 

188 base64.b64decode(user_pass).decode("latin1").split(":", 1) 

189 ) 

190 except Exception: 

191 raise OAuthProblem(detail="Invalid authorization header") 

192 

193 return check_basic_info_func(request, username, password) 

194 

195 return wrapper 

196 

197 def check_basic_auth(self, basic_info_func): 

198 return self._generic_check( 

199 basic_info_func, "Provided authorization is not valid" 

200 ) 

201 

202 

203class BearerSecurityHandler(AbstractSecurityHandler): 

204 """ 

205 Security Handler for HTTP Bearer authentication. 

206 """ 

207 

208 security_definition_key = "x-bearerInfoFunc" 

209 environ_key = "BEARERINFO_FUNC" 

210 

211 def check_bearer_token(self, token_info_func): 

212 return self._generic_check(token_info_func, "Provided token is not valid") 

213 

214 def _get_verify_func(self, token_info_func): 

215 """ 

216 :param token_info_func: types.FunctionType 

217 :rtype: types.FunctionType 

218 """ 

219 check_bearer_func = self.check_bearer_token(token_info_func) 

220 

221 def wrapper(request): 

222 auth_type, token = self.get_auth_header_value(request) 

223 if auth_type != "bearer": 

224 return NO_VALUE 

225 return check_bearer_func(request, token) 

226 

227 return wrapper 

228 

229 

230class ApiKeySecurityHandler(AbstractSecurityHandler): 

231 """ 

232 Security Handler for API Keys. 

233 """ 

234 

235 security_definition_key = "x-apikeyInfoFunc" 

236 environ_key = "APIKEYINFO_FUNC" 

237 

238 def get_fn(self, security_scheme, required_scopes): 

239 apikey_info_func = self._resolve_func(security_scheme) 

240 if not apikey_info_func: 

241 logger.warning("... %s missing", self.security_definition_key) 

242 return None 

243 

244 return self._get_verify_func( 

245 apikey_info_func, 

246 security_scheme["in"], 

247 security_scheme["name"], 

248 required_scopes, 

249 ) 

250 

251 def _get_verify_func(self, api_key_info_func, loc, name, required_scopes): 

252 check_api_key_func = self.check_api_key(api_key_info_func) 

253 

254 def wrapper(request: ConnexionRequest): 

255 if loc == "query": 

256 api_key = request.query_params.get(name) 

257 elif loc == "header": 

258 api_key = request.headers.get(name) 

259 elif loc == "cookie": 

260 cookie_list = request.headers.get("Cookie") 

261 api_key = self.get_cookie_value(cookie_list, name) 

262 else: 

263 return NO_VALUE 

264 

265 if api_key is None: 

266 return NO_VALUE 

267 

268 return check_api_key_func(request, api_key, required_scopes=required_scopes) 

269 

270 return wrapper 

271 

272 def check_api_key(self, api_key_info_func): 

273 return self._generic_check(api_key_info_func, "Provided apikey is not valid") 

274 

275 @staticmethod 

276 def get_cookie_value(cookies, name): 

277 """ 

278 Returns cookie value by its name. `None` if no such value. 

279 

280 :param cookies: str: cookies raw data 

281 :param name: str: cookies key 

282 """ 

283 cookie_parser = http.cookies.SimpleCookie() 

284 cookie_parser.load(str(cookies)) 

285 try: 

286 return cookie_parser[name].value 

287 except KeyError: 

288 return None 

289 

290 

291class OAuthSecurityHandler(AbstractSecurityHandler): 

292 """ 

293 Security Handler for the OAuth security scheme. 

294 """ 

295 

296 def get_fn(self, security_scheme, required_scopes): 

297 token_info_func = self.get_tokeninfo_func(security_scheme) 

298 scope_validate_func = self.get_scope_validate_func(security_scheme) 

299 if not token_info_func: 

300 logger.warning("... x-tokenInfoFunc missing") 

301 return None 

302 

303 return self._get_verify_func( 

304 token_info_func, scope_validate_func, required_scopes 

305 ) 

306 

307 def get_tokeninfo_func(self, security_definition: dict) -> t.Optional[t.Callable]: 

308 """ 

309 Gets the function for retrieving the token info. 

310 It is possible to specify a function or a URL. The function variant is 

311 preferred. If it is not found, the URL variant is used with the 

312 `get_token_info_remote` function. 

313 

314 >>> get_tokeninfo_func({'x-tokenInfoFunc': 'foo.bar'}) 

315 '<function foo.bar>' 

316 """ 

317 token_info_func = self._get_function( 

318 security_definition, "x-tokenInfoFunc", "TOKENINFO_FUNC" 

319 ) 

320 if token_info_func: 

321 return token_info_func 

322 

323 token_info_url = security_definition.get("x-tokenInfoUrl") or os.environ.get( 

324 "TOKENINFO_URL" 

325 ) 

326 if token_info_url: 

327 return self.get_token_info_remote(token_info_url) 

328 

329 return None 

330 

331 @classmethod 

332 def get_scope_validate_func(cls, security_definition): 

333 """ 

334 Gets the function for validating the token scopes. 

335 If it is not found, the default `validate_scope` function is used. 

336 

337 >>> get_scope_validate_func({'x-scopeValidateFunc': 'foo.bar'}) 

338 '<function foo.bar>' 

339 """ 

340 return cls._get_function( 

341 security_definition, 

342 "x-scopeValidateFunc", 

343 "SCOPEVALIDATE_FUNC", 

344 cls.validate_scope, 

345 ) 

346 

347 @staticmethod 

348 def validate_scope(required_scopes, token_scopes): 

349 """ 

350 :param required_scopes: Scopes required to access operation 

351 :param token_scopes: Scopes granted by authorization server 

352 :rtype: bool 

353 """ 

354 required_scopes = set(required_scopes) 

355 if isinstance(token_scopes, list): 

356 token_scopes = set(token_scopes) 

357 else: 

358 token_scopes = set(token_scopes.split()) 

359 logger.debug("... Scopes required: %s", required_scopes) 

360 logger.debug("... Token scopes: %s", token_scopes) 

361 if not required_scopes <= token_scopes: 

362 logger.info( 

363 "... Token scopes (%s) do not match the scopes necessary to call endpoint (%s)." 

364 " Aborting with 403.", 

365 token_scopes, 

366 required_scopes, 

367 ) 

368 return False 

369 return True 

370 

371 def get_token_info_remote(self, token_info_url: str) -> t.Callable: 

372 """ 

373 Return a function which will call `token_info_url` to retrieve token info. 

374 

375 Returned function must accept oauth token in parameter. 

376 It must return a token_info dict in case of success, None otherwise. 

377 

378 :param token_info_url: URL to get information about the token 

379 """ 

380 

381 async def wrapper(token): 

382 if self.client is None: 

383 self.client = httpx.AsyncClient() 

384 headers = {"Authorization": f"Bearer {token}"} 

385 token_request = await self.client.get( 

386 token_info_url, headers=headers, timeout=5 

387 ) 

388 if token_request.status_code != 200: 

389 return 

390 return token_request.json() 

391 

392 return wrapper 

393 

394 def _get_verify_func(self, token_info_func, scope_validate_func, required_scopes): 

395 check_oauth_func = self.check_oauth_func(token_info_func, scope_validate_func) 

396 

397 def wrapper(request): 

398 auth_type, token = self.get_auth_header_value(request) 

399 if auth_type != "bearer": 

400 return NO_VALUE 

401 

402 return check_oauth_func(request, token, required_scopes=required_scopes) 

403 

404 return wrapper 

405 

406 def check_oauth_func(self, token_info_func, scope_validate_func): 

407 get_token_info = self._generic_check( 

408 token_info_func, "Provided token is not valid" 

409 ) 

410 

411 async def wrapper(request, token, required_scopes): 

412 token_info = await get_token_info( 

413 request, token, required_scopes=required_scopes 

414 ) 

415 

416 # Fallback to 'scopes' for backward compatibility 

417 token_scopes = token_info.get("scope", token_info.get("scopes", "")) 

418 

419 validation = scope_validate_func(required_scopes, token_scopes) 

420 while asyncio.iscoroutine(validation): 

421 validation = await validation 

422 if not validation: 

423 raise OAuthScopeProblem( 

424 required_scopes=required_scopes, 

425 token_scopes=token_scopes, 

426 ) 

427 

428 return token_info 

429 

430 return wrapper 

431 

432 

433SECURITY_HANDLERS = { 

434 # Swagger 2: `type: basic` 

435 # OpenAPI 3: `type: http` and `scheme: basic` 

436 "basic": BasicSecurityHandler, 

437 # Swagger 2 and OpenAPI 3 

438 "apiKey": ApiKeySecurityHandler, 

439 "oauth2": OAuthSecurityHandler, 

440 # OpenAPI 3: http schemes 

441 "bearer": BearerSecurityHandler, 

442} 

443 

444 

445class SecurityHandlerFactory: 

446 """ 

447 A factory class for parsing security schemes and returning the appropriate 

448 security handler. 

449 

450 By default, it will use the built-in security handlers specified in the 

451 SECURITY_HANDLERS dict, but you can also pass in your own security handlers 

452 to override the built-in ones. 

453 """ 

454 

455 def __init__( 

456 self, 

457 security_handlers: t.Optional[dict] = None, 

458 ) -> None: 

459 self.security_handlers = SECURITY_HANDLERS.copy() 

460 if security_handlers is not None: 

461 self.security_handlers.update(security_handlers) 

462 

463 def parse_security_scheme( 

464 self, 

465 security_scheme: dict, 

466 required_scopes: t.List[str], 

467 ) -> t.Optional[t.Callable]: 

468 """Parses the security scheme and returns the function for verifying it. 

469 

470 :param security_scheme: The security scheme from the spec. 

471 :param required_scopes: List of scopes for this security scheme. 

472 """ 

473 security_type = security_scheme["type"] 

474 if security_type in ("basic", "oauth2"): 

475 security_handler = self.security_handlers[security_type] 

476 return security_handler().get_fn(security_scheme, required_scopes) 

477 

478 # OpenAPI 3.0.0 

479 elif security_type == "http": 

480 scheme = security_scheme["scheme"].lower() 

481 if scheme in self.security_handlers: 

482 security_handler = self.security_handlers[scheme] 

483 return security_handler().get_fn(security_scheme, required_scopes) 

484 else: 

485 logger.warning("... Unsupported http authorization scheme %s", scheme) 

486 return None 

487 

488 elif security_type == "apiKey": 

489 scheme = security_scheme.get("x-authentication-scheme", "").lower() 

490 if scheme == "bearer": 

491 return BearerSecurityHandler().get_fn(security_scheme, required_scopes) 

492 else: 

493 security_handler = self.security_handlers["apiKey"] 

494 return security_handler().get_fn(security_scheme, required_scopes) 

495 

496 elif security_type == "openIdConnect": 

497 if security_type in self.security_handlers: 

498 security_handler = self.security_handlers[security_type] 

499 return security_handler().get_fn(security_scheme, required_scopes) 

500 logger.warning("... No default implementation for openIdConnect") 

501 return None 

502 

503 # Custom security scheme handler 

504 elif ( 

505 "scheme" in security_scheme 

506 and (scheme := security_scheme["scheme"].lower()) in self.security_handlers 

507 ): 

508 security_handler = self.security_handlers[scheme] 

509 return security_handler().get_fn(security_scheme, required_scopes) 

510 

511 # Custom security type handler 

512 elif security_type in self.security_handlers: 

513 security_handler = self.security_handlers[security_type] 

514 return security_handler().get_fn(security_scheme, required_scopes) 

515 

516 else: 

517 logger.warning( 

518 "... Unsupported security scheme type %s", 

519 security_type, 

520 ) 

521 return None 

522 

523 @staticmethod 

524 async def security_passthrough(request): 

525 """Used when no security is required for the operation. 

526 

527 Equivalent OpenAPI snippet: 

528 

529 .. code-block:: yaml 

530 

531 /helloworld 

532 get: 

533 security: [] # No security 

534 ... 

535 """ 

536 return request 

537 

538 @staticmethod 

539 def verify_none(request): 

540 """Used for optional security. 

541 

542 Equivalent OpenAPI snippet: 

543 

544 .. code-block:: yaml 

545 

546 security: 

547 - {} # <-- 

548 - myapikey: [] 

549 """ 

550 return {} 

551 

552 def verify_multiple_schemes(self, schemes): 

553 """ 

554 Verifies multiple authentication schemes in AND fashion. 

555 If any scheme fails, the entire authentication fails. 

556 

557 :param schemes: mapping scheme_name to auth function 

558 :type schemes: dict 

559 :rtype: types.FunctionType 

560 """ 

561 

562 async def wrapper(request): 

563 token_info = {} 

564 for scheme_name, func in schemes.items(): 

565 result = func(request) 

566 while asyncio.iscoroutine(result): 

567 result = await result 

568 if result is NO_VALUE: 

569 return NO_VALUE 

570 token_info[scheme_name] = result 

571 

572 return token_info 

573 

574 return wrapper 

575 

576 @classmethod 

577 def verify_security(cls, auth_funcs): 

578 async def verify_fn(request): 

579 token_info = NO_VALUE 

580 errors = [] 

581 for func in auth_funcs: 

582 try: 

583 token_info = func(request) 

584 while asyncio.iscoroutine(token_info): 

585 token_info = await token_info 

586 if token_info is not NO_VALUE: 

587 break 

588 except Exception as err: 

589 errors.append(err) 

590 

591 else: 

592 if errors != []: 

593 cls._raise_most_specific(errors) 

594 else: 

595 logger.info("... No auth provided. Aborting with 401.") 

596 raise OAuthProblem(detail="No authorization token provided") 

597 

598 request.context.update( 

599 { 

600 # Fallback to 'uid' for backward compatibility 

601 "user": token_info.get("sub", token_info.get("uid")), 

602 "token_info": token_info, 

603 } 

604 ) 

605 

606 return verify_fn 

607 

608 @staticmethod 

609 def _raise_most_specific(exceptions: t.List[Exception]) -> None: 

610 """Raises the most specific error from a list of exceptions by status code. 

611 

612 The status codes are expected to be either in the `code` 

613 or in the `status` attribute of the exceptions. 

614 

615 The order is as follows: 

616 - 403: valid credentials but not enough privileges 

617 - 401: no or invalid credentials 

618 - for other status codes, the smallest one is selected 

619 

620 :param errors: List of exceptions. 

621 :type errors: t.List[Exception] 

622 """ 

623 if not exceptions: 

624 return 

625 # We only use status code attributes from exceptions 

626 # We use 600 as default because 599 is highest valid status code 

627 status_to_exc = { 

628 getattr(exc, "status_code", getattr(exc, "status", 600)): exc 

629 for exc in exceptions 

630 } 

631 if 403 in status_to_exc: 

632 raise status_to_exc[403] 

633 elif 401 in status_to_exc: 

634 raise status_to_exc[401] 

635 else: 

636 lowest_status_code = min(status_to_exc) 

637 raise status_to_exc[lowest_status_code]