Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/azure/core/pipeline/policies/_authentication.py: 30%
104 statements
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-07 06:33 +0000
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-07 06:33 +0000
1# -------------------------------------------------------------------------
2# Copyright (c) Microsoft Corporation. All rights reserved.
3# Licensed under the MIT License. See LICENSE.txt in the project root for
4# license information.
5# -------------------------------------------------------------------------
6import time
7from typing import TYPE_CHECKING, Optional, TypeVar, MutableMapping, Any
8from azure.core.pipeline import PipelineRequest, PipelineResponse
9from azure.core.pipeline.transport import HttpResponse as LegacyHttpResponse, HttpRequest as LegacyHttpRequest
10from azure.core.rest import HttpResponse, HttpRequest
11from . import HTTPPolicy, SansIOHTTPPolicy
12from ...exceptions import ServiceRequestError
14if TYPE_CHECKING:
15 # pylint:disable=unused-import
16 from azure.core.credentials import (
17 AccessToken,
18 TokenCredential,
19 AzureKeyCredential,
20 AzureSasCredential,
21 )
23HTTPResponseType = TypeVar("HTTPResponseType", HttpResponse, LegacyHttpResponse)
24HTTPRequestType = TypeVar("HTTPRequestType", HttpRequest, LegacyHttpRequest)
27# pylint:disable=too-few-public-methods
28class _BearerTokenCredentialPolicyBase:
29 """Base class for a Bearer Token Credential Policy.
31 :param credential: The credential.
32 :type credential: ~azure.core.credentials.TokenCredential
33 :param str scopes: Lets you specify the type of access needed.
34 :keyword bool enable_cae: Indicates whether to enable Continuous Access Evaluation (CAE) on all requested
35 tokens. Defaults to False.
36 """
38 def __init__(self, credential: "TokenCredential", *scopes: str, **kwargs: Any) -> None:
39 super(_BearerTokenCredentialPolicyBase, self).__init__()
40 self._scopes = scopes
41 self._credential = credential
42 self._token: Optional["AccessToken"] = None
43 self._enable_cae: bool = kwargs.get("enable_cae", False)
45 @staticmethod
46 def _enforce_https(request: PipelineRequest[HTTPRequestType]) -> None:
47 # move 'enforce_https' from options to context so it persists
48 # across retries but isn't passed to a transport implementation
49 option = request.context.options.pop("enforce_https", None)
51 # True is the default setting; we needn't preserve an explicit opt in to the default behavior
52 if option is False:
53 request.context["enforce_https"] = option
55 enforce_https = request.context.get("enforce_https", True)
56 if enforce_https and not request.http_request.url.lower().startswith("https"):
57 raise ServiceRequestError(
58 "Bearer token authentication is not permitted for non-TLS protected (non-https) URLs."
59 )
61 @staticmethod
62 def _update_headers(headers: MutableMapping[str, str], token: str) -> None:
63 """Updates the Authorization header with the bearer token.
65 :param MutableMapping[str, str] headers: The HTTP Request headers
66 :param str token: The OAuth token.
67 """
68 headers["Authorization"] = "Bearer {}".format(token)
70 @property
71 def _need_new_token(self) -> bool:
72 return not self._token or self._token.expires_on - time.time() < 300
75class BearerTokenCredentialPolicy(_BearerTokenCredentialPolicyBase, HTTPPolicy[HTTPRequestType, HTTPResponseType]):
76 """Adds a bearer token Authorization header to requests.
78 :param credential: The credential.
79 :type credential: ~azure.core.TokenCredential
80 :param str scopes: Lets you specify the type of access needed.
81 :keyword bool enable_cae: Indicates whether to enable Continuous Access Evaluation (CAE) on all requested
82 tokens. Defaults to False.
83 :raises: :class:`~azure.core.exceptions.ServiceRequestError`
84 """
86 def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None:
87 """Called before the policy sends a request.
89 The base implementation authorizes the request with a bearer token.
91 :param ~azure.core.pipeline.PipelineRequest request: the request
92 """
93 self._enforce_https(request)
95 if self._token is None or self._need_new_token:
96 if self._enable_cae:
97 self._token = self._credential.get_token(*self._scopes, enable_cae=self._enable_cae)
98 else:
99 self._token = self._credential.get_token(*self._scopes)
100 self._update_headers(request.http_request.headers, self._token.token)
102 def authorize_request(self, request: PipelineRequest[HTTPRequestType], *scopes: str, **kwargs: Any) -> None:
103 """Acquire a token from the credential and authorize the request with it.
105 Keyword arguments are passed to the credential's get_token method. The token will be cached and used to
106 authorize future requests.
108 :param ~azure.core.pipeline.PipelineRequest request: the request
109 :param str scopes: required scopes of authentication
110 """
111 if self._enable_cae:
112 kwargs.setdefault("enable_cae", self._enable_cae)
113 self._token = self._credential.get_token(*scopes, **kwargs)
114 self._update_headers(request.http_request.headers, self._token.token)
116 def send(self, request: PipelineRequest[HTTPRequestType]) -> PipelineResponse[HTTPRequestType, HTTPResponseType]:
117 """Authorize request with a bearer token and send it to the next policy
119 :param request: The pipeline request object
120 :type request: ~azure.core.pipeline.PipelineRequest
121 :return: The pipeline response object
122 :rtype: ~azure.core.pipeline.PipelineResponse
123 """
124 self.on_request(request)
125 try:
126 response = self.next.send(request)
127 self.on_response(request, response)
128 except Exception: # pylint:disable=broad-except
129 self.on_exception(request)
130 raise
131 else:
132 if response.http_response.status_code == 401:
133 self._token = None # any cached token is invalid
134 if "WWW-Authenticate" in response.http_response.headers:
135 request_authorized = self.on_challenge(request, response)
136 if request_authorized:
137 # if we receive a challenge response, we retrieve a new token
138 # which matches the new target. In this case, we don't want to remove
139 # token from the request so clear the 'insecure_domain_change' tag
140 request.context.options.pop("insecure_domain_change", False)
141 try:
142 response = self.next.send(request)
143 self.on_response(request, response)
144 except Exception: # pylint:disable=broad-except
145 self.on_exception(request)
146 raise
148 return response
150 def on_challenge(
151 self, request: PipelineRequest[HTTPRequestType], response: PipelineResponse[HTTPRequestType, HTTPResponseType]
152 ) -> bool:
153 """Authorize request according to an authentication challenge
155 This method is called when the resource provider responds 401 with a WWW-Authenticate header.
157 :param ~azure.core.pipeline.PipelineRequest request: the request which elicited an authentication challenge
158 :param ~azure.core.pipeline.PipelineResponse response: the resource provider's response
159 :returns: a bool indicating whether the policy should send the request
160 :rtype: bool
161 """
162 # pylint:disable=unused-argument
163 return False
165 def on_response(
166 self, request: PipelineRequest[HTTPRequestType], response: PipelineResponse[HTTPRequestType, HTTPResponseType]
167 ) -> None:
168 """Executed after the request comes back from the next policy.
170 :param request: Request to be modified after returning from the policy.
171 :type request: ~azure.core.pipeline.PipelineRequest
172 :param response: Pipeline response object
173 :type response: ~azure.core.pipeline.PipelineResponse
174 """
176 def on_exception(self, request: PipelineRequest[HTTPRequestType]) -> None:
177 """Executed when an exception is raised while executing the next policy.
179 This method is executed inside the exception handler.
181 :param request: The Pipeline request object
182 :type request: ~azure.core.pipeline.PipelineRequest
183 """
184 # pylint: disable=unused-argument
185 return
188class AzureKeyCredentialPolicy(SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]):
189 """Adds a key header for the provided credential.
191 :param credential: The credential used to authenticate requests.
192 :type credential: ~azure.core.credentials.AzureKeyCredential
193 :param str name: The name of the key header used for the credential.
194 :keyword str prefix: The name of the prefix for the header value if any.
195 :raises: ValueError or TypeError
196 """
198 def __init__( # pylint: disable=unused-argument
199 self,
200 credential: "AzureKeyCredential",
201 name: str,
202 *,
203 prefix: Optional[str] = None,
204 **kwargs: Any,
205 ) -> None:
206 super().__init__()
207 if not hasattr(credential, "key"):
208 raise TypeError("String is not a supported credential input type. Use an instance of AzureKeyCredential.")
209 if not name:
210 raise ValueError("name can not be None or empty")
211 if not isinstance(name, str):
212 raise TypeError("name must be a string.")
213 self._credential = credential
214 self._name = name
215 self._prefix = prefix + " " if prefix else ""
217 def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None:
218 request.http_request.headers[self._name] = f"{self._prefix}{self._credential.key}"
221class AzureSasCredentialPolicy(SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]):
222 """Adds a shared access signature to query for the provided credential.
224 :param credential: The credential used to authenticate requests.
225 :type credential: ~azure.core.credentials.AzureSasCredential
226 :raises: ValueError or TypeError
227 """
229 def __init__(self, credential: "AzureSasCredential", **kwargs: Any) -> None: # pylint: disable=unused-argument
230 super(AzureSasCredentialPolicy, self).__init__()
231 if not credential:
232 raise ValueError("credential can not be None")
233 self._credential = credential
235 def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None:
236 url = request.http_request.url
237 query = request.http_request.query
238 signature = self._credential.signature
239 if signature.startswith("?"):
240 signature = signature[1:]
241 if query:
242 if signature not in url:
243 url = url + "&" + signature
244 else:
245 if url.endswith("?"):
246 url = url + signature
247 else:
248 url = url + "?" + signature
249 request.http_request.url = url