1import logging
2import typing as t
3from collections import defaultdict
4
5from starlette.types import ASGIApp, Receive, Scope, Send
6
7from connexion.exceptions import ProblemException
8from connexion.lifecycle import ConnexionRequest
9from connexion.middleware.abstract import RoutedAPI, RoutedMiddleware
10from connexion.operations import AbstractOperation
11from connexion.security import SecurityHandlerFactory
12from connexion.spec import Specification
13
14logger = logging.getLogger("connexion.middleware.security")
15
16
17class SecurityOperation:
18 def __init__(
19 self,
20 next_app: ASGIApp,
21 *,
22 security_handler_factory: SecurityHandlerFactory,
23 security: list,
24 security_schemes: dict,
25 ):
26 self.next_app = next_app
27 self.security_handler_factory = security_handler_factory
28 self.security = security
29 self.security_schemes = security_schemes
30 self.verification_fn = self._get_verification_fn()
31
32 @classmethod
33 def from_operation(
34 cls,
35 operation: t.Union[AbstractOperation, Specification],
36 *,
37 next_app: ASGIApp,
38 security_handler_factory: SecurityHandlerFactory,
39 ) -> "SecurityOperation":
40 """Create a SecurityOperation from an Operation of Specification instance
41
42 :param operation: The operation can be both an Operation or Specification instance here
43 since security is defined at both levels in the OpenAPI spec. Creating a
44 SecurityOperation based on a Specification can be used to create a SecurityOperation
45 for routes not explicitly defined in the specification.
46 :param next_app: The next ASGI app to call.
47 :param security_handler_factory: The factory to be used to generate security handlers for
48 the different security schemes.
49 """
50 return cls(
51 next_app=next_app,
52 security_handler_factory=security_handler_factory,
53 security=operation.security,
54 security_schemes=operation.security_schemes,
55 )
56
57 def _get_verification_fn(self):
58 logger.debug("... Security: %s", self.security, extra=vars(self))
59 if not self.security:
60 return self.security_handler_factory.security_passthrough
61
62 auth_funcs = []
63 for security_req in self.security:
64 if not security_req:
65 auth_funcs.append(self.security_handler_factory.verify_none)
66 continue
67
68 sec_req_funcs = {}
69 oauth = False
70 for scheme_name, required_scopes in security_req.items():
71 security_scheme = self.security_schemes[scheme_name]
72
73 if security_scheme["type"] == "oauth2":
74 if oauth:
75 logger.warning(
76 "... multiple OAuth2 security schemes in AND fashion not supported",
77 extra=vars(self),
78 )
79 break
80 oauth = True
81
82 sec_req_func = self.security_handler_factory.parse_security_scheme(
83 security_scheme, required_scopes
84 )
85 if sec_req_func is None:
86 break
87
88 sec_req_funcs[scheme_name] = sec_req_func
89
90 else:
91 # No break encountered: no missing funcs
92 if len(sec_req_funcs) == 1:
93 (func,) = sec_req_funcs.values()
94 auth_funcs.append(func)
95 else:
96 auth_funcs.append(
97 self.security_handler_factory.verify_multiple_schemes(
98 sec_req_funcs
99 )
100 )
101
102 return self.security_handler_factory.verify_security(auth_funcs)
103
104 async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
105 if not self.security:
106 await self.next_app(scope, receive, send)
107 return
108
109 request = ConnexionRequest(scope)
110 await self.verification_fn(request)
111 await self.next_app(scope, receive, send)
112
113
114class SecurityAPI(RoutedAPI[SecurityOperation]):
115 def __init__(
116 self, *args, auth_all_paths: bool = False, security_map: dict = None, **kwargs
117 ):
118 super().__init__(*args, **kwargs)
119
120 self.security_handler_factory = SecurityHandlerFactory(security_map)
121
122 if auth_all_paths:
123 self.add_auth_on_not_found()
124 else:
125 self.operations: t.MutableMapping[t.Optional[str], SecurityOperation] = {}
126
127 self.add_paths()
128
129 def add_auth_on_not_found(self) -> None:
130 """Register a default SecurityOperation for routes that are not found."""
131 default_operation = self.make_operation(self.specification)
132 self.operations = defaultdict(lambda: default_operation)
133
134 def make_operation(
135 self, operation: t.Union[AbstractOperation, Specification]
136 ) -> SecurityOperation:
137 """Create a SecurityOperation from an Operation of Specification instance
138
139 :param operation: The operation can be both an Operation or Specification instance here
140 since security is defined at both levels in the OpenAPI spec. Creating a
141 SecurityOperation based on a Specification can be used to create a SecurityOperation
142 for routes not explicitly defined in the specification.
143 """
144 return SecurityOperation.from_operation(
145 operation,
146 next_app=self.next_app,
147 security_handler_factory=self.security_handler_factory,
148 )
149
150
151class SecurityMiddleware(RoutedMiddleware[SecurityAPI]):
152 """Middleware to check if operation is accessible on scope."""
153
154 api_cls = SecurityAPI
155
156
157class MissingSecurityOperation(ProblemException):
158 pass