Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/connexion/security.py: 24%

237 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:12 +0000

1""" 

2This module defines an abstract SecurityHandlerFactory which supports the creation of security 

3handlers for operations. 

4""" 

5 

6import asyncio 

7import base64 

8import http.cookies 

9import logging 

10import os 

11import textwrap 

12import typing as t 

13 

14import httpx 

15 

16from connexion.decorators.parameter import inspect_function_arguments 

17from connexion.exceptions import ( 

18 ConnexionException, 

19 OAuthProblem, 

20 OAuthResponseProblem, 

21 OAuthScopeProblem, 

22) 

23from connexion.utils import get_function_from_name 

24 

25logger = logging.getLogger("connexion.api.security") 

26 

27 

28class SecurityHandlerFactory: 

29 """ 

30 get_*_func -> _get_function -> get_function_from_name (name=security function defined in spec) 

31 (if url defined instead of a function -> get_token_info_remote) 

32 

33 std security functions: security_{passthrough,deny} 

34 

35 verify_* -> returns a security wrapper around the security function 

36 check_* -> returns a function tasked with doing auth for use inside the verify wrapper 

37 check helpers (used outside wrappers): _need_to_add_context_or_scopes 

38 the security function 

39 

40 verify helpers (used inside wrappers): get_auth_header_value, get_cookie_value 

41 """ 

42 

43 no_value = object() 

44 required_scopes_kw = "required_scopes" 

45 context_kw = "context_" 

46 client = None 

47 

48 @staticmethod 

49 def _get_function( 

50 security_definition, security_definition_key, environ_key, default=None 

51 ): 

52 """ 

53 Return function by getting its name from security_definition or environment variable 

54 """ 

55 func = security_definition.get(security_definition_key) or os.environ.get( 

56 environ_key 

57 ) 

58 if func: 

59 return get_function_from_name(func) 

60 return default 

61 

62 def get_tokeninfo_func(self, security_definition: dict) -> t.Optional[t.Callable]: 

63 """ 

64 :type security_definition: dict 

65 

66 >>> get_tokeninfo_url({'x-tokenInfoFunc': 'foo.bar'}) 

67 '<function foo.bar>' 

68 """ 

69 token_info_func = self._get_function( 

70 security_definition, "x-tokenInfoFunc", "TOKENINFO_FUNC" 

71 ) 

72 if token_info_func: 

73 return token_info_func 

74 

75 token_info_url = security_definition.get("x-tokenInfoUrl") or os.environ.get( 

76 "TOKENINFO_URL" 

77 ) 

78 if token_info_url: 

79 return self.get_token_info_remote(token_info_url) 

80 

81 return None 

82 

83 @classmethod 

84 def get_scope_validate_func(cls, security_definition): 

85 """ 

86 :type security_definition: dict 

87 :rtype: function 

88 

89 >>> get_scope_validate_func({'x-scopeValidateFunc': 'foo.bar'}) 

90 '<function foo.bar>' 

91 """ 

92 return cls._get_function( 

93 security_definition, 

94 "x-scopeValidateFunc", 

95 "SCOPEVALIDATE_FUNC", 

96 cls.validate_scope, 

97 ) 

98 

99 @classmethod 

100 def get_basicinfo_func(cls, security_definition): 

101 """ 

102 :type security_definition: dict 

103 :rtype: function 

104 

105 >>> get_basicinfo_func({'x-basicInfoFunc': 'foo.bar'}) 

106 '<function foo.bar>' 

107 """ 

108 return cls._get_function( 

109 security_definition, "x-basicInfoFunc", "BASICINFO_FUNC" 

110 ) 

111 

112 @classmethod 

113 def get_apikeyinfo_func(cls, security_definition): 

114 """ 

115 :type security_definition: dict 

116 :rtype: function 

117 

118 >>> get_apikeyinfo_func({'x-apikeyInfoFunc': 'foo.bar'}) 

119 '<function foo.bar>' 

120 """ 

121 return cls._get_function( 

122 security_definition, "x-apikeyInfoFunc", "APIKEYINFO_FUNC" 

123 ) 

124 

125 @classmethod 

126 def get_bearerinfo_func(cls, security_definition): 

127 """ 

128 :type security_definition: dict 

129 :rtype: function 

130 

131 >>> get_bearerinfo_func({'x-bearerInfoFunc': 'foo.bar'}) 

132 '<function foo.bar>' 

133 """ 

134 return cls._get_function( 

135 security_definition, "x-bearerInfoFunc", "BEARERINFO_FUNC" 

136 ) 

137 

138 @staticmethod 

139 async def security_passthrough(request): 

140 return request 

141 

142 @staticmethod 

143 def security_deny(function): 

144 """ 

145 :type function: types.FunctionType 

146 :rtype: types.FunctionType 

147 """ 

148 

149 def deny(*args, **kwargs): 

150 raise ConnexionException("Error in security definitions") 

151 

152 return deny 

153 

154 @staticmethod 

155 def validate_scope(required_scopes, token_scopes): 

156 """ 

157 :param required_scopes: Scopes required to access operation 

158 :param token_scopes: Scopes granted by authorization server 

159 :rtype: bool 

160 """ 

161 required_scopes = set(required_scopes) 

162 if isinstance(token_scopes, list): 

163 token_scopes = set(token_scopes) 

164 else: 

165 token_scopes = set(token_scopes.split()) 

166 logger.debug("... Scopes required: %s", required_scopes) 

167 logger.debug("... Token scopes: %s", token_scopes) 

168 if not required_scopes <= token_scopes: 

169 logger.info( 

170 textwrap.dedent( 

171 """ 

172 ... Token scopes (%s) do not match the scopes necessary to call endpoint (%s). 

173 Aborting with 403.""" 

174 ).replace("\n", ""), 

175 token_scopes, 

176 required_scopes, 

177 ) 

178 return False 

179 return True 

180 

181 @staticmethod 

182 def get_auth_header_value(request): 

183 """ 

184 Called inside security wrapper functions 

185 

186 Return Authorization type and value if any. 

187 If not Authorization, return (None, None) 

188 Raise OAuthProblem for invalid Authorization header 

189 """ 

190 authorization = request.headers.get("Authorization") 

191 if not authorization: 

192 return None, None 

193 

194 try: 

195 auth_type, value = authorization.split(None, 1) 

196 except ValueError: 

197 raise OAuthProblem(detail="Invalid authorization header") 

198 return auth_type.lower(), value 

199 

200 def verify_oauth(self, token_info_func, scope_validate_func, required_scopes): 

201 check_oauth_func = self.check_oauth_func(token_info_func, scope_validate_func) 

202 

203 def wrapper(request): 

204 auth_type, token = self.get_auth_header_value(request) 

205 if auth_type != "bearer": 

206 return self.no_value 

207 

208 return check_oauth_func(request, token, required_scopes=required_scopes) 

209 

210 return wrapper 

211 

212 def verify_basic(self, basic_info_func): 

213 check_basic_info_func = self.check_basic_auth(basic_info_func) 

214 

215 def wrapper(request): 

216 auth_type, user_pass = self.get_auth_header_value(request) 

217 if auth_type != "basic": 

218 return self.no_value 

219 

220 try: 

221 username, password = ( 

222 base64.b64decode(user_pass).decode("latin1").split(":", 1) 

223 ) 

224 except Exception: 

225 raise OAuthProblem(detail="Invalid authorization header") 

226 

227 return check_basic_info_func(request, username, password) 

228 

229 return wrapper 

230 

231 @staticmethod 

232 def get_cookie_value(cookies, name): 

233 """ 

234 Called inside security wrapper functions 

235 

236 Returns cookie value by its name. None if no such value. 

237 :param cookies: str: cookies raw data 

238 :param name: str: cookies key 

239 """ 

240 cookie_parser = http.cookies.SimpleCookie() 

241 cookie_parser.load(str(cookies)) 

242 try: 

243 return cookie_parser[name].value 

244 except KeyError: 

245 return None 

246 

247 def verify_api_key(self, api_key_info_func, loc, name): 

248 check_api_key_func = self.check_api_key(api_key_info_func) 

249 

250 def wrapper(request): 

251 def _immutable_pop(_dict, key): 

252 """ 

253 Pops the key from an immutable dict and returns the value that was popped, 

254 and a new immutable dict without the popped key. 

255 """ 

256 cls = type(_dict) 

257 try: 

258 _dict = _dict.to_dict(flat=False) 

259 return _dict.pop(key)[0], cls(_dict) 

260 except AttributeError: 

261 _dict = dict(_dict.items()) 

262 return _dict.pop(key), cls(_dict) 

263 

264 if loc == "query": 

265 try: 

266 api_key, request.query = _immutable_pop(request.query, name) 

267 except KeyError: 

268 api_key = None 

269 elif loc == "header": 

270 api_key = request.headers.get(name) 

271 elif loc == "cookie": 

272 cookie_list = request.headers.get("Cookie") 

273 api_key = self.get_cookie_value(cookie_list, name) 

274 else: 

275 return self.no_value 

276 

277 if api_key is None: 

278 return self.no_value 

279 

280 return check_api_key_func(request, api_key) 

281 

282 return wrapper 

283 

284 def verify_bearer(self, token_info_func): 

285 """ 

286 :param token_info_func: types.FunctionType 

287 :rtype: types.FunctionType 

288 """ 

289 check_bearer_func = self.check_bearer_token(token_info_func) 

290 

291 def wrapper(request): 

292 auth_type, token = self.get_auth_header_value(request) 

293 if auth_type != "bearer": 

294 return self.no_value 

295 return check_bearer_func(request, token) 

296 

297 return wrapper 

298 

299 def verify_multiple_schemes(self, schemes): 

300 """ 

301 Verifies multiple authentication schemes in AND fashion. 

302 If any scheme fails, the entire authentication fails. 

303 

304 :param schemes: mapping scheme_name to auth function 

305 :type schemes: dict 

306 :rtype: types.FunctionType 

307 """ 

308 

309 async def wrapper(request): 

310 token_info = {} 

311 for scheme_name, func in schemes.items(): 

312 result = func(request) 

313 while asyncio.iscoroutine(result): 

314 result = await result 

315 if result is self.no_value: 

316 return self.no_value 

317 token_info[scheme_name] = result 

318 

319 return token_info 

320 

321 return wrapper 

322 

323 @staticmethod 

324 def verify_none(): 

325 """ 

326 :rtype: types.FunctionType 

327 """ 

328 

329 def wrapper(request): 

330 return {} 

331 

332 return wrapper 

333 

334 def _need_to_add_context_or_scopes(self, func): 

335 arguments, has_kwargs = inspect_function_arguments(func) 

336 need_context = self.context_kw in arguments 

337 need_required_scopes = has_kwargs or self.required_scopes_kw in arguments 

338 return need_context, need_required_scopes 

339 

340 def _generic_check(self, func, exception_msg): 

341 ( 

342 need_to_add_context, 

343 need_to_add_required_scopes, 

344 ) = self._need_to_add_context_or_scopes(func) 

345 

346 async def wrapper(request, *args, required_scopes=None): 

347 kwargs = {} 

348 if need_to_add_context: 

349 kwargs[self.context_kw] = request.context 

350 if need_to_add_required_scopes: 

351 kwargs[self.required_scopes_kw] = required_scopes 

352 token_info = func(*args, **kwargs) 

353 while asyncio.iscoroutine(token_info): 

354 token_info = await token_info 

355 if token_info is self.no_value: 

356 return self.no_value 

357 if token_info is None: 

358 raise OAuthResponseProblem(detail=exception_msg) 

359 return token_info 

360 

361 return wrapper 

362 

363 def check_bearer_token(self, token_info_func): 

364 return self._generic_check(token_info_func, "Provided token is not valid") 

365 

366 def check_basic_auth(self, basic_info_func): 

367 return self._generic_check( 

368 basic_info_func, "Provided authorization is not valid" 

369 ) 

370 

371 def check_api_key(self, api_key_info_func): 

372 return self._generic_check(api_key_info_func, "Provided apikey is not valid") 

373 

374 def check_oauth_func(self, token_info_func, scope_validate_func): 

375 get_token_info = self._generic_check( 

376 token_info_func, "Provided token is not valid" 

377 ) 

378 need_to_add_context, _ = self._need_to_add_context_or_scopes( 

379 scope_validate_func 

380 ) 

381 

382 async def wrapper(request, token, required_scopes): 

383 token_info = await get_token_info( 

384 request, token, required_scopes=required_scopes 

385 ) 

386 

387 # Fallback to 'scopes' for backward compatibility 

388 token_scopes = token_info.get("scope", token_info.get("scopes", "")) 

389 

390 kwargs = {} 

391 if need_to_add_context: 

392 kwargs[self.context_kw] = request.context 

393 validation = scope_validate_func(required_scopes, token_scopes, **kwargs) 

394 while asyncio.iscoroutine(validation): 

395 validation = await validation 

396 if not validation: 

397 raise OAuthScopeProblem( 

398 required_scopes=required_scopes, 

399 token_scopes=token_scopes, 

400 ) 

401 

402 return token_info 

403 

404 return wrapper 

405 

406 @classmethod 

407 def verify_security(cls, auth_funcs): 

408 async def verify_fn(request): 

409 token_info = cls.no_value 

410 errors = [] 

411 for func in auth_funcs: 

412 try: 

413 token_info = func(request) 

414 while asyncio.iscoroutine(token_info): 

415 token_info = await token_info 

416 if token_info is not cls.no_value: 

417 break 

418 except Exception as err: 

419 errors.append(err) 

420 

421 else: 

422 if errors != []: 

423 cls._raise_most_specific(errors) 

424 else: 

425 logger.info("... No auth provided. Aborting with 401.") 

426 raise OAuthProblem(detail="No authorization token provided") 

427 

428 request.context.update( 

429 { 

430 # Fallback to 'uid' for backward compatibility 

431 "user": token_info.get("sub", token_info.get("uid")), 

432 "token_info": token_info, 

433 } 

434 ) 

435 

436 return verify_fn 

437 

438 @staticmethod 

439 def _raise_most_specific(exceptions: t.List[Exception]) -> None: 

440 """Raises the most specific error from a list of exceptions by status code. 

441 

442 The status codes are expected to be either in the `code` 

443 or in the `status` attribute of the exceptions. 

444 

445 The order is as follows: 

446 - 403: valid credentials but not enough privileges 

447 - 401: no or invalid credentials 

448 - for other status codes, the smallest one is selected 

449 

450 :param errors: List of exceptions. 

451 :type errors: t.List[Exception] 

452 """ 

453 if not exceptions: 

454 return 

455 # We only use status code attributes from exceptions 

456 # We use 600 as default because 599 is highest valid status code 

457 status_to_exc = { 

458 getattr(exc, "status_code", getattr(exc, "status", 600)): exc 

459 for exc in exceptions 

460 } 

461 if 403 in status_to_exc: 

462 raise status_to_exc[403] 

463 elif 401 in status_to_exc: 

464 raise status_to_exc[401] 

465 else: 

466 lowest_status_code = min(status_to_exc) 

467 raise status_to_exc[lowest_status_code] 

468 

469 def get_token_info_remote(self, token_info_url): 

470 """ 

471 Return a function which will call `token_info_url` to retrieve token info. 

472 

473 Returned function must accept oauth token in parameter. 

474 It must return a token_info dict in case of success, None otherwise. 

475 

476 :param token_info_url: Url to get information about the token 

477 :type token_info_url: str 

478 :rtype: types.FunctionType 

479 """ 

480 

481 async def wrapper(token): 

482 if self.client is None: 

483 self.client = httpx.AsyncClient() 

484 headers = {"Authorization": f"Bearer {token}"} 

485 token_request = await self.client.get( 

486 token_info_url, headers=headers, timeout=5 

487 ) 

488 if token_request.status_code != 200: 

489 return 

490 return token_request.json() 

491 

492 return wrapper