Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/azure/core/pipeline/policies/_authentication_async.py: 34%
65 statements
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-07 06:33 +0000
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-07 06:33 +0000
1# -------------------------------------------------------------------------
2# Copyright (c) Microsoft Corporation. All rights reserved.
3# Licensed under the MIT License. See LICENSE.txt in the project root for
4# license information.
5# -------------------------------------------------------------------------
6import time
7from typing import TYPE_CHECKING, Any, Awaitable, Optional, cast, TypeVar
9from anyio import Lock
10from azure.core.credentials import AccessToken
11from azure.core.pipeline import PipelineRequest, PipelineResponse
12from azure.core.pipeline.policies import AsyncHTTPPolicy
13from azure.core.pipeline.policies._authentication import (
14 _BearerTokenCredentialPolicyBase,
15)
16from azure.core.pipeline.transport import AsyncHttpResponse as LegacyAsyncHttpResponse, HttpRequest as LegacyHttpRequest
17from azure.core.rest import AsyncHttpResponse, HttpRequest
19from .._tools_async import await_result
21if TYPE_CHECKING:
22 from azure.core.credentials_async import AsyncTokenCredential
24AsyncHTTPResponseType = TypeVar("AsyncHTTPResponseType", AsyncHttpResponse, LegacyAsyncHttpResponse)
25HTTPRequestType = TypeVar("HTTPRequestType", HttpRequest, LegacyHttpRequest)
28class AsyncBearerTokenCredentialPolicy(AsyncHTTPPolicy[HTTPRequestType, AsyncHTTPResponseType]):
29 """Adds a bearer token Authorization header to requests.
31 :param credential: The credential.
32 :type credential: ~azure.core.credentials.TokenCredential
33 :param str scopes: Lets you specify the type of access needed.
34 :keyword bool enable_cae: Indicates whether to enable Continuous Access Evaluation (CAE) on all requested
35 tokens. Defaults to False.
36 """
38 def __init__(self, credential: "AsyncTokenCredential", *scopes: str, **kwargs: Any) -> None:
39 super().__init__()
40 self._credential = credential
41 self._lock = Lock()
42 self._scopes = scopes
43 self._token: Optional["AccessToken"] = None
44 self._enable_cae: bool = kwargs.get("enable_cae", False)
46 async def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None:
47 """Adds a bearer token Authorization header to request and sends request to next policy.
49 :param request: The pipeline request object to be modified.
50 :type request: ~azure.core.pipeline.PipelineRequest
51 :raises: :class:`~azure.core.exceptions.ServiceRequestError`
52 """
53 _BearerTokenCredentialPolicyBase._enforce_https(request) # pylint:disable=protected-access
55 if self._token is None or self._need_new_token():
56 async with self._lock:
57 # double check because another coroutine may have acquired a token while we waited to acquire the lock
58 if self._token is None or self._need_new_token():
59 if self._enable_cae:
60 self._token = await await_result(
61 self._credential.get_token, *self._scopes, enable_cae=self._enable_cae
62 )
63 else:
64 self._token = await await_result(self._credential.get_token, *self._scopes)
65 request.http_request.headers["Authorization"] = "Bearer " + cast(AccessToken, self._token).token
67 async def authorize_request(self, request: PipelineRequest[HTTPRequestType], *scopes: str, **kwargs: Any) -> None:
68 """Acquire a token from the credential and authorize the request with it.
70 Keyword arguments are passed to the credential's get_token method. The token will be cached and used to
71 authorize future requests.
73 :param ~azure.core.pipeline.PipelineRequest request: the request
74 :param str scopes: required scopes of authentication
75 """
76 if self._enable_cae:
77 kwargs.setdefault("enable_cae", self._enable_cae)
78 async with self._lock:
79 self._token = await await_result(self._credential.get_token, *scopes, **kwargs)
80 request.http_request.headers["Authorization"] = "Bearer " + cast(AccessToken, self._token).token
82 async def send(
83 self, request: PipelineRequest[HTTPRequestType]
84 ) -> PipelineResponse[HTTPRequestType, AsyncHTTPResponseType]:
85 """Authorize request with a bearer token and send it to the next policy
87 :param request: The pipeline request object
88 :type request: ~azure.core.pipeline.PipelineRequest
89 :return: The pipeline response object
90 :rtype: ~azure.core.pipeline.PipelineResponse
91 """
92 await await_result(self.on_request, request)
93 try:
94 response = await self.next.send(request)
95 except Exception: # pylint:disable=broad-except
96 await await_result(self.on_exception, request)
97 raise
98 else:
99 await await_result(self.on_response, request, response)
101 if response.http_response.status_code == 401:
102 self._token = None # any cached token is invalid
103 if "WWW-Authenticate" in response.http_response.headers:
104 request_authorized = await self.on_challenge(request, response)
105 if request_authorized:
106 # if we receive a challenge response, we retrieve a new token
107 # which matches the new target. In this case, we don't want to remove
108 # token from the request so clear the 'insecure_domain_change' tag
109 request.context.options.pop("insecure_domain_change", False)
110 try:
111 response = await self.next.send(request)
112 except Exception: # pylint:disable=broad-except
113 await await_result(self.on_exception, request)
114 raise
115 else:
116 await await_result(self.on_response, request, response)
118 return response
120 async def on_challenge(
121 self,
122 request: PipelineRequest[HTTPRequestType],
123 response: PipelineResponse[HTTPRequestType, AsyncHTTPResponseType],
124 ) -> bool:
125 """Authorize request according to an authentication challenge
127 This method is called when the resource provider responds 401 with a WWW-Authenticate header.
129 :param ~azure.core.pipeline.PipelineRequest request: the request which elicited an authentication challenge
130 :param ~azure.core.pipeline.PipelineResponse response: the resource provider's response
131 :returns: a bool indicating whether the policy should send the request
132 :rtype: bool
133 """
134 # pylint:disable=unused-argument
135 return False
137 def on_response(
138 self,
139 request: PipelineRequest[HTTPRequestType],
140 response: PipelineResponse[HTTPRequestType, AsyncHTTPResponseType],
141 ) -> Optional[Awaitable[None]]:
142 """Executed after the request comes back from the next policy.
144 :param request: Request to be modified after returning from the policy.
145 :type request: ~azure.core.pipeline.PipelineRequest
146 :param response: Pipeline response object
147 :type response: ~azure.core.pipeline.PipelineResponse
148 """
150 def on_exception(self, request: PipelineRequest[HTTPRequestType]) -> None:
151 """Executed when an exception is raised while executing the next policy.
153 This method is executed inside the exception handler.
155 :param request: The Pipeline request object
156 :type request: ~azure.core.pipeline.PipelineRequest
157 """
158 # pylint: disable=unused-argument
159 return
161 def _need_new_token(self) -> bool:
162 return not self._token or self._token.expires_on - time.time() < 300