1# --------------------------------------------------------------------------
2#
3# Copyright (c) Microsoft Corporation. All rights reserved.
4#
5# The MIT License (MIT)
6#
7# Permission is hereby granted, free of charge, to any person obtaining a copy
8# of this software and associated documentation files (the ""Software""), to
9# deal in the Software without restriction, including without limitation the
10# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
11# sell copies of the Software, and to permit persons to whom the Software is
12# furnished to do so, subject to the following conditions:
13#
14# The above copyright notice and this permission notice shall be included in
15# all copies or substantial portions of the Software.
16#
17# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
23# IN THE SOFTWARE.
24#
25# --------------------------------------------------------------------------
26import base64
27import time
28from typing import Optional, TypeVar
29
30from azure.core.pipeline.policies import BearerTokenCredentialPolicy, SansIOHTTPPolicy
31from azure.core.pipeline import PipelineRequest, PipelineResponse
32from azure.core.exceptions import ServiceRequestError
33
34
35HTTPRequestType = TypeVar("HTTPRequestType")
36HTTPResponseType = TypeVar("HTTPResponseType")
37
38
39class ARMChallengeAuthenticationPolicy(BearerTokenCredentialPolicy):
40 """Adds a bearer token Authorization header to requests.
41
42 This policy internally handles Continuous Access Evaluation (CAE) challenges. When it can't complete a challenge,
43 it will return the 401 (unauthorized) response from ARM.
44
45 :param ~azure.core.credentials.TokenCredential credential: credential for authorizing requests
46 :param str scopes: required authentication scopes
47 """
48
49 def on_challenge(
50 self,
51 request: PipelineRequest[HTTPRequestType],
52 response: PipelineResponse[HTTPRequestType, HTTPResponseType],
53 ) -> bool:
54 """Authorize request according to an ARM authentication challenge
55
56 :param ~azure.core.pipeline.PipelineRequest request: the request which elicited an authentication challenge
57 :param ~azure.core.pipeline.PipelineResponse response: ARM's response
58 :returns: a bool indicating whether the policy should send the request
59 """
60
61 challenge = response.http_response.headers.get("WWW-Authenticate")
62 if challenge:
63 claims = _parse_claims_challenge(challenge)
64 if claims:
65 self.authorize_request(request, *self._scopes, claims=claims)
66 return True
67
68 return False
69
70
71# pylint:disable=too-few-public-methods
72class _AuxiliaryAuthenticationPolicyBase:
73 """Adds auxiliary authorization token header to requests.
74
75 :param ~azure.core.credentials.TokenCredential auxiliary_credentials: auxiliary credential for authorizing requests
76 :param str scopes: required authentication scopes
77 """
78
79 def __init__(self, auxiliary_credentials, *scopes, **kwargs): # pylint: disable=unused-argument
80 self._auxiliary_credentials = auxiliary_credentials
81 self._scopes = scopes
82 self._aux_tokens = None
83
84 @staticmethod
85 def _enforce_https(request: PipelineRequest[HTTPRequestType]) -> None:
86 # move 'enforce_https' from options to context, so it persists
87 # across retries but isn't passed to transport implementation
88 option = request.context.options.pop("enforce_https", None)
89
90 # True is the default setting; we needn't preserve an explicit opt in to the default behavior
91 if option is False:
92 request.context["enforce_https"] = option
93
94 enforce_https = request.context.get("enforce_https", True)
95 if enforce_https and not request.http_request.url.lower().startswith("https"):
96 raise ServiceRequestError(
97 "Bearer token authentication is not permitted for non-TLS protected (non-https) URLs."
98 )
99
100 def _update_headers(self, headers):
101 """Updates the x-ms-authorization-auxiliary header with the auxiliary token.
102
103 :param dict headers: The HTTP Request headers
104 """
105 if self._aux_tokens:
106 headers["x-ms-authorization-auxiliary"] = ", ".join(
107 "Bearer {}".format(token.token) for token in self._aux_tokens
108 )
109
110 @property
111 def _need_new_aux_tokens(self):
112 if not self._aux_tokens:
113 return True
114 for token in self._aux_tokens:
115 if token.expires_on - time.time() < 300:
116 return True
117 return False
118
119
120class AuxiliaryAuthenticationPolicy(
121 _AuxiliaryAuthenticationPolicyBase,
122 SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType],
123):
124 def _get_auxiliary_tokens(self, *scopes, **kwargs):
125 if self._auxiliary_credentials:
126 return [cred.get_token(*scopes, **kwargs) for cred in self._auxiliary_credentials]
127 return None
128
129 def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None:
130 """Called before the policy sends a request.
131
132 The base implementation authorizes the request with an auxiliary authorization token.
133
134 :param ~azure.core.pipeline.PipelineRequest request: the request
135 """
136 self._enforce_https(request)
137
138 if self._need_new_aux_tokens:
139 self._aux_tokens = self._get_auxiliary_tokens(*self._scopes)
140
141 self._update_headers(request.http_request.headers)
142
143
144def _parse_claims_challenge(challenge: str) -> Optional[str]:
145 """Parse the "claims" parameter from an authentication challenge
146
147 Example challenge with claims:
148 Bearer authorization_uri="https://login.windows-ppe.net/", error="invalid_token",
149 error_description="User session has been revoked",
150 claims="eyJhY2Nlc3NfdG9rZW4iOnsibmJmIjp7ImVzc2VudGlhbCI6dHJ1ZSwgInZhbHVlIjoiMTYwMzc0MjgwMCJ9fX0="
151
152 :return: the challenge's "claims" parameter or None, if it doesn't contain that parameter
153 """
154 encoded_claims = None
155 for parameter in challenge.split(","):
156 if "claims=" in parameter:
157 if encoded_claims:
158 # multiple claims challenges, e.g. for cross-tenant auth, would require special handling
159 return None
160 encoded_claims = parameter[parameter.index("=") + 1 :].strip(" \"'")
161
162 if not encoded_claims:
163 return None
164
165 padding_needed = -len(encoded_claims) % 4
166 try:
167 decoded_claims = base64.urlsafe_b64decode(encoded_claims + "=" * padding_needed).decode()
168 return decoded_claims
169 except Exception: # pylint:disable=broad-except
170 return None