Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/azure/core/pipeline/policies/_authentication_async.py: 34%

65 statements  

« prev     ^ index     » next       coverage.py v7.4.0, created at 2024-01-07 06:33 +0000

1# ------------------------------------------------------------------------- 

2# Copyright (c) Microsoft Corporation. All rights reserved. 

3# Licensed under the MIT License. See LICENSE.txt in the project root for 

4# license information. 

5# ------------------------------------------------------------------------- 

6import time 

7from typing import TYPE_CHECKING, Any, Awaitable, Optional, cast, TypeVar 

8 

9from anyio import Lock 

10from azure.core.credentials import AccessToken 

11from azure.core.pipeline import PipelineRequest, PipelineResponse 

12from azure.core.pipeline.policies import AsyncHTTPPolicy 

13from azure.core.pipeline.policies._authentication import ( 

14 _BearerTokenCredentialPolicyBase, 

15) 

16from azure.core.pipeline.transport import AsyncHttpResponse as LegacyAsyncHttpResponse, HttpRequest as LegacyHttpRequest 

17from azure.core.rest import AsyncHttpResponse, HttpRequest 

18 

19from .._tools_async import await_result 

20 

21if TYPE_CHECKING: 

22 from azure.core.credentials_async import AsyncTokenCredential 

23 

24AsyncHTTPResponseType = TypeVar("AsyncHTTPResponseType", AsyncHttpResponse, LegacyAsyncHttpResponse) 

25HTTPRequestType = TypeVar("HTTPRequestType", HttpRequest, LegacyHttpRequest) 

26 

27 

28class AsyncBearerTokenCredentialPolicy(AsyncHTTPPolicy[HTTPRequestType, AsyncHTTPResponseType]): 

29 """Adds a bearer token Authorization header to requests. 

30 

31 :param credential: The credential. 

32 :type credential: ~azure.core.credentials.TokenCredential 

33 :param str scopes: Lets you specify the type of access needed. 

34 :keyword bool enable_cae: Indicates whether to enable Continuous Access Evaluation (CAE) on all requested 

35 tokens. Defaults to False. 

36 """ 

37 

38 def __init__(self, credential: "AsyncTokenCredential", *scopes: str, **kwargs: Any) -> None: 

39 super().__init__() 

40 self._credential = credential 

41 self._lock = Lock() 

42 self._scopes = scopes 

43 self._token: Optional["AccessToken"] = None 

44 self._enable_cae: bool = kwargs.get("enable_cae", False) 

45 

46 async def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None: 

47 """Adds a bearer token Authorization header to request and sends request to next policy. 

48 

49 :param request: The pipeline request object to be modified. 

50 :type request: ~azure.core.pipeline.PipelineRequest 

51 :raises: :class:`~azure.core.exceptions.ServiceRequestError` 

52 """ 

53 _BearerTokenCredentialPolicyBase._enforce_https(request) # pylint:disable=protected-access 

54 

55 if self._token is None or self._need_new_token(): 

56 async with self._lock: 

57 # double check because another coroutine may have acquired a token while we waited to acquire the lock 

58 if self._token is None or self._need_new_token(): 

59 if self._enable_cae: 

60 self._token = await await_result( 

61 self._credential.get_token, *self._scopes, enable_cae=self._enable_cae 

62 ) 

63 else: 

64 self._token = await await_result(self._credential.get_token, *self._scopes) 

65 request.http_request.headers["Authorization"] = "Bearer " + cast(AccessToken, self._token).token 

66 

67 async def authorize_request(self, request: PipelineRequest[HTTPRequestType], *scopes: str, **kwargs: Any) -> None: 

68 """Acquire a token from the credential and authorize the request with it. 

69 

70 Keyword arguments are passed to the credential's get_token method. The token will be cached and used to 

71 authorize future requests. 

72 

73 :param ~azure.core.pipeline.PipelineRequest request: the request 

74 :param str scopes: required scopes of authentication 

75 """ 

76 if self._enable_cae: 

77 kwargs.setdefault("enable_cae", self._enable_cae) 

78 async with self._lock: 

79 self._token = await await_result(self._credential.get_token, *scopes, **kwargs) 

80 request.http_request.headers["Authorization"] = "Bearer " + cast(AccessToken, self._token).token 

81 

82 async def send( 

83 self, request: PipelineRequest[HTTPRequestType] 

84 ) -> PipelineResponse[HTTPRequestType, AsyncHTTPResponseType]: 

85 """Authorize request with a bearer token and send it to the next policy 

86 

87 :param request: The pipeline request object 

88 :type request: ~azure.core.pipeline.PipelineRequest 

89 :return: The pipeline response object 

90 :rtype: ~azure.core.pipeline.PipelineResponse 

91 """ 

92 await await_result(self.on_request, request) 

93 try: 

94 response = await self.next.send(request) 

95 except Exception: # pylint:disable=broad-except 

96 await await_result(self.on_exception, request) 

97 raise 

98 else: 

99 await await_result(self.on_response, request, response) 

100 

101 if response.http_response.status_code == 401: 

102 self._token = None # any cached token is invalid 

103 if "WWW-Authenticate" in response.http_response.headers: 

104 request_authorized = await self.on_challenge(request, response) 

105 if request_authorized: 

106 # if we receive a challenge response, we retrieve a new token 

107 # which matches the new target. In this case, we don't want to remove 

108 # token from the request so clear the 'insecure_domain_change' tag 

109 request.context.options.pop("insecure_domain_change", False) 

110 try: 

111 response = await self.next.send(request) 

112 except Exception: # pylint:disable=broad-except 

113 await await_result(self.on_exception, request) 

114 raise 

115 else: 

116 await await_result(self.on_response, request, response) 

117 

118 return response 

119 

120 async def on_challenge( 

121 self, 

122 request: PipelineRequest[HTTPRequestType], 

123 response: PipelineResponse[HTTPRequestType, AsyncHTTPResponseType], 

124 ) -> bool: 

125 """Authorize request according to an authentication challenge 

126 

127 This method is called when the resource provider responds 401 with a WWW-Authenticate header. 

128 

129 :param ~azure.core.pipeline.PipelineRequest request: the request which elicited an authentication challenge 

130 :param ~azure.core.pipeline.PipelineResponse response: the resource provider's response 

131 :returns: a bool indicating whether the policy should send the request 

132 :rtype: bool 

133 """ 

134 # pylint:disable=unused-argument 

135 return False 

136 

137 def on_response( 

138 self, 

139 request: PipelineRequest[HTTPRequestType], 

140 response: PipelineResponse[HTTPRequestType, AsyncHTTPResponseType], 

141 ) -> Optional[Awaitable[None]]: 

142 """Executed after the request comes back from the next policy. 

143 

144 :param request: Request to be modified after returning from the policy. 

145 :type request: ~azure.core.pipeline.PipelineRequest 

146 :param response: Pipeline response object 

147 :type response: ~azure.core.pipeline.PipelineResponse 

148 """ 

149 

150 def on_exception(self, request: PipelineRequest[HTTPRequestType]) -> None: 

151 """Executed when an exception is raised while executing the next policy. 

152 

153 This method is executed inside the exception handler. 

154 

155 :param request: The Pipeline request object 

156 :type request: ~azure.core.pipeline.PipelineRequest 

157 """ 

158 # pylint: disable=unused-argument 

159 return 

160 

161 def _need_new_token(self) -> bool: 

162 return not self._token or self._token.expires_on - time.time() < 300