Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/connexion/middleware/security.py: 35%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

72 statements  

1import logging 

2import typing as t 

3from collections import defaultdict 

4 

5from starlette.types import ASGIApp, Receive, Scope, Send 

6 

7from connexion.exceptions import ProblemException 

8from connexion.lifecycle import ConnexionRequest 

9from connexion.middleware.abstract import RoutedAPI, RoutedMiddleware 

10from connexion.operations import AbstractOperation 

11from connexion.security import SecurityHandlerFactory 

12from connexion.spec import Specification 

13 

14logger = logging.getLogger("connexion.middleware.security") 

15 

16 

17class SecurityOperation: 

18 def __init__( 

19 self, 

20 next_app: ASGIApp, 

21 *, 

22 security_handler_factory: SecurityHandlerFactory, 

23 security: list, 

24 security_schemes: dict, 

25 ): 

26 self.next_app = next_app 

27 self.security_handler_factory = security_handler_factory 

28 self.security = security 

29 self.security_schemes = security_schemes 

30 self.verification_fn = self._get_verification_fn() 

31 

32 @classmethod 

33 def from_operation( 

34 cls, 

35 operation: t.Union[AbstractOperation, Specification], 

36 *, 

37 next_app: ASGIApp, 

38 security_handler_factory: SecurityHandlerFactory, 

39 ) -> "SecurityOperation": 

40 """Create a SecurityOperation from an Operation of Specification instance 

41 

42 :param operation: The operation can be both an Operation or Specification instance here 

43 since security is defined at both levels in the OpenAPI spec. Creating a 

44 SecurityOperation based on a Specification can be used to create a SecurityOperation 

45 for routes not explicitly defined in the specification. 

46 :param next_app: The next ASGI app to call. 

47 :param security_handler_factory: The factory to be used to generate security handlers for 

48 the different security schemes. 

49 """ 

50 return cls( 

51 next_app=next_app, 

52 security_handler_factory=security_handler_factory, 

53 security=operation.security, 

54 security_schemes=operation.security_schemes, 

55 ) 

56 

57 def _get_verification_fn(self): 

58 logger.debug("... Security: %s", self.security, extra=vars(self)) 

59 if not self.security: 

60 return self.security_handler_factory.security_passthrough 

61 

62 auth_funcs = [] 

63 for security_req in self.security: 

64 if not security_req: 

65 auth_funcs.append(self.security_handler_factory.verify_none) 

66 continue 

67 

68 sec_req_funcs = {} 

69 oauth = False 

70 for scheme_name, required_scopes in security_req.items(): 

71 security_scheme = self.security_schemes[scheme_name] 

72 

73 if security_scheme["type"] == "oauth2": 

74 if oauth: 

75 logger.warning( 

76 "... multiple OAuth2 security schemes in AND fashion not supported", 

77 extra=vars(self), 

78 ) 

79 break 

80 oauth = True 

81 

82 sec_req_func = self.security_handler_factory.parse_security_scheme( 

83 security_scheme, required_scopes 

84 ) 

85 if sec_req_func is None: 

86 break 

87 

88 sec_req_funcs[scheme_name] = sec_req_func 

89 

90 else: 

91 # No break encountered: no missing funcs 

92 if len(sec_req_funcs) == 1: 

93 (func,) = sec_req_funcs.values() 

94 auth_funcs.append(func) 

95 else: 

96 auth_funcs.append( 

97 self.security_handler_factory.verify_multiple_schemes( 

98 sec_req_funcs 

99 ) 

100 ) 

101 

102 return self.security_handler_factory.verify_security(auth_funcs) 

103 

104 async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: 

105 if not self.security: 

106 await self.next_app(scope, receive, send) 

107 return 

108 

109 request = ConnexionRequest(scope) 

110 await self.verification_fn(request) 

111 await self.next_app(scope, receive, send) 

112 

113 

114class SecurityAPI(RoutedAPI[SecurityOperation]): 

115 def __init__( 

116 self, *args, auth_all_paths: bool = False, security_map: dict = None, **kwargs 

117 ): 

118 super().__init__(*args, **kwargs) 

119 

120 self.security_handler_factory = SecurityHandlerFactory(security_map) 

121 

122 if auth_all_paths: 

123 self.add_auth_on_not_found() 

124 else: 

125 self.operations: t.MutableMapping[t.Optional[str], SecurityOperation] = {} 

126 

127 self.add_paths() 

128 

129 def add_auth_on_not_found(self) -> None: 

130 """Register a default SecurityOperation for routes that are not found.""" 

131 default_operation = self.make_operation(self.specification) 

132 self.operations = defaultdict(lambda: default_operation) 

133 

134 def make_operation( 

135 self, operation: t.Union[AbstractOperation, Specification] 

136 ) -> SecurityOperation: 

137 """Create a SecurityOperation from an Operation of Specification instance 

138 

139 :param operation: The operation can be both an Operation or Specification instance here 

140 since security is defined at both levels in the OpenAPI spec. Creating a 

141 SecurityOperation based on a Specification can be used to create a SecurityOperation 

142 for routes not explicitly defined in the specification. 

143 """ 

144 return SecurityOperation.from_operation( 

145 operation, 

146 next_app=self.next_app, 

147 security_handler_factory=self.security_handler_factory, 

148 ) 

149 

150 

151class SecurityMiddleware(RoutedMiddleware[SecurityAPI]): 

152 """Middleware to check if operation is accessible on scope.""" 

153 

154 api_cls = SecurityAPI 

155 

156 

157class MissingSecurityOperation(ProblemException): 

158 pass