Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/azure/core/pipeline/policies/_authentication.py: 30%

104 statements  

« prev     ^ index     » next       coverage.py v7.4.0, created at 2024-01-07 06:33 +0000

1# ------------------------------------------------------------------------- 

2# Copyright (c) Microsoft Corporation. All rights reserved. 

3# Licensed under the MIT License. See LICENSE.txt in the project root for 

4# license information. 

5# ------------------------------------------------------------------------- 

6import time 

7from typing import TYPE_CHECKING, Optional, TypeVar, MutableMapping, Any 

8from azure.core.pipeline import PipelineRequest, PipelineResponse 

9from azure.core.pipeline.transport import HttpResponse as LegacyHttpResponse, HttpRequest as LegacyHttpRequest 

10from azure.core.rest import HttpResponse, HttpRequest 

11from . import HTTPPolicy, SansIOHTTPPolicy 

12from ...exceptions import ServiceRequestError 

13 

14if TYPE_CHECKING: 

15 # pylint:disable=unused-import 

16 from azure.core.credentials import ( 

17 AccessToken, 

18 TokenCredential, 

19 AzureKeyCredential, 

20 AzureSasCredential, 

21 ) 

22 

23HTTPResponseType = TypeVar("HTTPResponseType", HttpResponse, LegacyHttpResponse) 

24HTTPRequestType = TypeVar("HTTPRequestType", HttpRequest, LegacyHttpRequest) 

25 

26 

27# pylint:disable=too-few-public-methods 

28class _BearerTokenCredentialPolicyBase: 

29 """Base class for a Bearer Token Credential Policy. 

30 

31 :param credential: The credential. 

32 :type credential: ~azure.core.credentials.TokenCredential 

33 :param str scopes: Lets you specify the type of access needed. 

34 :keyword bool enable_cae: Indicates whether to enable Continuous Access Evaluation (CAE) on all requested 

35 tokens. Defaults to False. 

36 """ 

37 

38 def __init__(self, credential: "TokenCredential", *scopes: str, **kwargs: Any) -> None: 

39 super(_BearerTokenCredentialPolicyBase, self).__init__() 

40 self._scopes = scopes 

41 self._credential = credential 

42 self._token: Optional["AccessToken"] = None 

43 self._enable_cae: bool = kwargs.get("enable_cae", False) 

44 

45 @staticmethod 

46 def _enforce_https(request: PipelineRequest[HTTPRequestType]) -> None: 

47 # move 'enforce_https' from options to context so it persists 

48 # across retries but isn't passed to a transport implementation 

49 option = request.context.options.pop("enforce_https", None) 

50 

51 # True is the default setting; we needn't preserve an explicit opt in to the default behavior 

52 if option is False: 

53 request.context["enforce_https"] = option 

54 

55 enforce_https = request.context.get("enforce_https", True) 

56 if enforce_https and not request.http_request.url.lower().startswith("https"): 

57 raise ServiceRequestError( 

58 "Bearer token authentication is not permitted for non-TLS protected (non-https) URLs." 

59 ) 

60 

61 @staticmethod 

62 def _update_headers(headers: MutableMapping[str, str], token: str) -> None: 

63 """Updates the Authorization header with the bearer token. 

64 

65 :param MutableMapping[str, str] headers: The HTTP Request headers 

66 :param str token: The OAuth token. 

67 """ 

68 headers["Authorization"] = "Bearer {}".format(token) 

69 

70 @property 

71 def _need_new_token(self) -> bool: 

72 return not self._token or self._token.expires_on - time.time() < 300 

73 

74 

75class BearerTokenCredentialPolicy(_BearerTokenCredentialPolicyBase, HTTPPolicy[HTTPRequestType, HTTPResponseType]): 

76 """Adds a bearer token Authorization header to requests. 

77 

78 :param credential: The credential. 

79 :type credential: ~azure.core.TokenCredential 

80 :param str scopes: Lets you specify the type of access needed. 

81 :keyword bool enable_cae: Indicates whether to enable Continuous Access Evaluation (CAE) on all requested 

82 tokens. Defaults to False. 

83 :raises: :class:`~azure.core.exceptions.ServiceRequestError` 

84 """ 

85 

86 def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None: 

87 """Called before the policy sends a request. 

88 

89 The base implementation authorizes the request with a bearer token. 

90 

91 :param ~azure.core.pipeline.PipelineRequest request: the request 

92 """ 

93 self._enforce_https(request) 

94 

95 if self._token is None or self._need_new_token: 

96 if self._enable_cae: 

97 self._token = self._credential.get_token(*self._scopes, enable_cae=self._enable_cae) 

98 else: 

99 self._token = self._credential.get_token(*self._scopes) 

100 self._update_headers(request.http_request.headers, self._token.token) 

101 

102 def authorize_request(self, request: PipelineRequest[HTTPRequestType], *scopes: str, **kwargs: Any) -> None: 

103 """Acquire a token from the credential and authorize the request with it. 

104 

105 Keyword arguments are passed to the credential's get_token method. The token will be cached and used to 

106 authorize future requests. 

107 

108 :param ~azure.core.pipeline.PipelineRequest request: the request 

109 :param str scopes: required scopes of authentication 

110 """ 

111 if self._enable_cae: 

112 kwargs.setdefault("enable_cae", self._enable_cae) 

113 self._token = self._credential.get_token(*scopes, **kwargs) 

114 self._update_headers(request.http_request.headers, self._token.token) 

115 

116 def send(self, request: PipelineRequest[HTTPRequestType]) -> PipelineResponse[HTTPRequestType, HTTPResponseType]: 

117 """Authorize request with a bearer token and send it to the next policy 

118 

119 :param request: The pipeline request object 

120 :type request: ~azure.core.pipeline.PipelineRequest 

121 :return: The pipeline response object 

122 :rtype: ~azure.core.pipeline.PipelineResponse 

123 """ 

124 self.on_request(request) 

125 try: 

126 response = self.next.send(request) 

127 self.on_response(request, response) 

128 except Exception: # pylint:disable=broad-except 

129 self.on_exception(request) 

130 raise 

131 else: 

132 if response.http_response.status_code == 401: 

133 self._token = None # any cached token is invalid 

134 if "WWW-Authenticate" in response.http_response.headers: 

135 request_authorized = self.on_challenge(request, response) 

136 if request_authorized: 

137 # if we receive a challenge response, we retrieve a new token 

138 # which matches the new target. In this case, we don't want to remove 

139 # token from the request so clear the 'insecure_domain_change' tag 

140 request.context.options.pop("insecure_domain_change", False) 

141 try: 

142 response = self.next.send(request) 

143 self.on_response(request, response) 

144 except Exception: # pylint:disable=broad-except 

145 self.on_exception(request) 

146 raise 

147 

148 return response 

149 

150 def on_challenge( 

151 self, request: PipelineRequest[HTTPRequestType], response: PipelineResponse[HTTPRequestType, HTTPResponseType] 

152 ) -> bool: 

153 """Authorize request according to an authentication challenge 

154 

155 This method is called when the resource provider responds 401 with a WWW-Authenticate header. 

156 

157 :param ~azure.core.pipeline.PipelineRequest request: the request which elicited an authentication challenge 

158 :param ~azure.core.pipeline.PipelineResponse response: the resource provider's response 

159 :returns: a bool indicating whether the policy should send the request 

160 :rtype: bool 

161 """ 

162 # pylint:disable=unused-argument 

163 return False 

164 

165 def on_response( 

166 self, request: PipelineRequest[HTTPRequestType], response: PipelineResponse[HTTPRequestType, HTTPResponseType] 

167 ) -> None: 

168 """Executed after the request comes back from the next policy. 

169 

170 :param request: Request to be modified after returning from the policy. 

171 :type request: ~azure.core.pipeline.PipelineRequest 

172 :param response: Pipeline response object 

173 :type response: ~azure.core.pipeline.PipelineResponse 

174 """ 

175 

176 def on_exception(self, request: PipelineRequest[HTTPRequestType]) -> None: 

177 """Executed when an exception is raised while executing the next policy. 

178 

179 This method is executed inside the exception handler. 

180 

181 :param request: The Pipeline request object 

182 :type request: ~azure.core.pipeline.PipelineRequest 

183 """ 

184 # pylint: disable=unused-argument 

185 return 

186 

187 

188class AzureKeyCredentialPolicy(SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]): 

189 """Adds a key header for the provided credential. 

190 

191 :param credential: The credential used to authenticate requests. 

192 :type credential: ~azure.core.credentials.AzureKeyCredential 

193 :param str name: The name of the key header used for the credential. 

194 :keyword str prefix: The name of the prefix for the header value if any. 

195 :raises: ValueError or TypeError 

196 """ 

197 

198 def __init__( # pylint: disable=unused-argument 

199 self, 

200 credential: "AzureKeyCredential", 

201 name: str, 

202 *, 

203 prefix: Optional[str] = None, 

204 **kwargs: Any, 

205 ) -> None: 

206 super().__init__() 

207 if not hasattr(credential, "key"): 

208 raise TypeError("String is not a supported credential input type. Use an instance of AzureKeyCredential.") 

209 if not name: 

210 raise ValueError("name can not be None or empty") 

211 if not isinstance(name, str): 

212 raise TypeError("name must be a string.") 

213 self._credential = credential 

214 self._name = name 

215 self._prefix = prefix + " " if prefix else "" 

216 

217 def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None: 

218 request.http_request.headers[self._name] = f"{self._prefix}{self._credential.key}" 

219 

220 

221class AzureSasCredentialPolicy(SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]): 

222 """Adds a shared access signature to query for the provided credential. 

223 

224 :param credential: The credential used to authenticate requests. 

225 :type credential: ~azure.core.credentials.AzureSasCredential 

226 :raises: ValueError or TypeError 

227 """ 

228 

229 def __init__(self, credential: "AzureSasCredential", **kwargs: Any) -> None: # pylint: disable=unused-argument 

230 super(AzureSasCredentialPolicy, self).__init__() 

231 if not credential: 

232 raise ValueError("credential can not be None") 

233 self._credential = credential 

234 

235 def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None: 

236 url = request.http_request.url 

237 query = request.http_request.query 

238 signature = self._credential.signature 

239 if signature.startswith("?"): 

240 signature = signature[1:] 

241 if query: 

242 if signature not in url: 

243 url = url + "&" + signature 

244 else: 

245 if url.endswith("?"): 

246 url = url + signature 

247 else: 

248 url = url + "?" + signature 

249 request.http_request.url = url