Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/connexion/middleware/security.py: 35%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

72 statements  

1import logging 

2import typing as t 

3from collections import defaultdict 

4 

5from starlette.types import ASGIApp, Receive, Scope, Send 

6 

7from connexion.exceptions import ProblemException 

8from connexion.lifecycle import ConnexionRequest 

9from connexion.middleware.abstract import RoutedAPI, RoutedMiddleware 

10from connexion.operations import AbstractOperation 

11from connexion.security import SecurityHandlerFactory 

12from connexion.spec import Specification 

13 

14logger = logging.getLogger("connexion.middleware.security") 

15 

16 

17class SecurityOperation: 

18 def __init__( 

19 self, 

20 next_app: ASGIApp, 

21 *, 

22 security_handler_factory: SecurityHandlerFactory, 

23 security: list, 

24 security_schemes: dict, 

25 ): 

26 self.next_app = next_app 

27 self.security_handler_factory = security_handler_factory 

28 self.security = security 

29 self.security_schemes = security_schemes 

30 self.verification_fn = self._get_verification_fn() 

31 

32 @classmethod 

33 def from_operation( 

34 cls, 

35 operation: t.Union[AbstractOperation, Specification], 

36 *, 

37 next_app: ASGIApp, 

38 security_handler_factory: SecurityHandlerFactory, 

39 ) -> "SecurityOperation": 

40 """Create a SecurityOperation from an Operation of Specification instance 

41 

42 :param operation: The operation can be both an Operation or Specification instance here 

43 since security is defined at both levels in the OpenAPI spec. Creating a 

44 SecurityOperation based on a Specification can be used to create a SecurityOperation 

45 for routes not explicitly defined in the specification. 

46 :param next_app: The next ASGI app to call. 

47 :param security_handler_factory: The factory to be used to generate security handlers for 

48 the different security schemes. 

49 """ 

50 return cls( 

51 next_app=next_app, 

52 security_handler_factory=security_handler_factory, 

53 security=operation.security, 

54 security_schemes=operation.security_schemes, 

55 ) 

56 

57 def _get_verification_fn(self): 

58 logger.debug("... Security: %s", self.security, extra=vars(self)) 

59 if not self.security: 

60 return self.security_handler_factory.security_passthrough 

61 

62 auth_funcs = [] 

63 for security_req in self.security: 

64 if not security_req: 

65 auth_funcs.append(self.security_handler_factory.verify_none) 

66 continue 

67 

68 sec_req_funcs = {} 

69 oauth = False 

70 for scheme_name, required_scopes in security_req.items(): 

71 security_scheme = self.security_schemes[scheme_name] 

72 

73 if security_scheme["type"] == "oauth2": 

74 if oauth: 

75 logger.warning( 

76 "... multiple OAuth2 security schemes in AND fashion not supported", 

77 extra=vars(self), 

78 ) 

79 break 

80 oauth = True 

81 

82 sec_req_func = self.security_handler_factory.parse_security_scheme( 

83 security_scheme, required_scopes 

84 ) 

85 if sec_req_func is None: 

86 break 

87 

88 sec_req_funcs[scheme_name] = sec_req_func 

89 

90 else: 

91 # No break encountered: no missing funcs 

92 if len(sec_req_funcs) == 1: 

93 (func,) = sec_req_funcs.values() 

94 auth_funcs.append(func) 

95 else: 

96 auth_funcs.append( 

97 self.security_handler_factory.verify_multiple_schemes( 

98 sec_req_funcs 

99 ) 

100 ) 

101 

102 return self.security_handler_factory.verify_security(auth_funcs) 

103 

104 async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: 

105 if not self.security: 

106 await self.next_app(scope, receive, send) 

107 return 

108 

109 request = ConnexionRequest(scope) 

110 await self.verification_fn(request) 

111 await self.next_app(scope, receive, send) 

112 

113 

114class SecurityAPI(RoutedAPI[SecurityOperation]): 

115 def __init__( 

116 self, 

117 *args, 

118 auth_all_paths: bool = False, 

119 security_map: t.Optional[dict] = None, 

120 **kwargs, 

121 ): 

122 super().__init__(*args, **kwargs) 

123 

124 self.security_handler_factory = SecurityHandlerFactory(security_map) 

125 

126 if auth_all_paths: 

127 self.add_auth_on_not_found() 

128 else: 

129 self.operations: t.MutableMapping[t.Optional[str], SecurityOperation] = {} 

130 

131 self.add_paths() 

132 

133 def add_auth_on_not_found(self) -> None: 

134 """Register a default SecurityOperation for routes that are not found.""" 

135 default_operation = self.make_operation(self.specification) 

136 self.operations = defaultdict(lambda: default_operation) 

137 

138 def make_operation( 

139 self, operation: t.Union[AbstractOperation, Specification] 

140 ) -> SecurityOperation: 

141 """Create a SecurityOperation from an Operation of Specification instance 

142 

143 :param operation: The operation can be both an Operation or Specification instance here 

144 since security is defined at both levels in the OpenAPI spec. Creating a 

145 SecurityOperation based on a Specification can be used to create a SecurityOperation 

146 for routes not explicitly defined in the specification. 

147 """ 

148 return SecurityOperation.from_operation( 

149 operation, 

150 next_app=self.next_app, 

151 security_handler_factory=self.security_handler_factory, 

152 ) 

153 

154 

155class SecurityMiddleware(RoutedMiddleware[SecurityAPI]): 

156 """Middleware to check if operation is accessible on scope.""" 

157 

158 api_cls = SecurityAPI 

159 

160 

161class MissingSecurityOperation(ProblemException): 

162 pass