1# -------------------------------------------------------------------------
2# Copyright (c) Microsoft Corporation. All rights reserved.
3# Licensed under the MIT License. See LICENSE.txt in the project root for
4# license information.
5# -------------------------------------------------------------------------
6from typing import Any, NamedTuple, Optional, TypedDict, Union, ContextManager
7from typing_extensions import Protocol, runtime_checkable
8
9
10class AccessToken(NamedTuple):
11 """Represents an OAuth access token."""
12
13 token: str
14 """The token string."""
15 expires_on: int
16 """The token's expiration time in Unix time."""
17
18
19class AccessTokenInfo:
20 """Information about an OAuth access token.
21
22 This class is an alternative to `AccessToken` which provides additional information about the token.
23
24 :param str token: The token string.
25 :param int expires_on: The token's expiration time in Unix time.
26 :keyword str token_type: The type of access token. Defaults to 'Bearer'.
27 :keyword int refresh_on: Specifies the time, in Unix time, when the cached token should be proactively
28 refreshed. Optional.
29 """
30
31 token: str
32 """The token string."""
33 expires_on: int
34 """The token's expiration time in Unix time."""
35 token_type: str
36 """The type of access token."""
37 refresh_on: Optional[int]
38 """Specifies the time, in Unix time, when the cached token should be proactively refreshed. Optional."""
39
40 def __init__(
41 self,
42 token: str,
43 expires_on: int,
44 *,
45 token_type: str = "Bearer",
46 refresh_on: Optional[int] = None,
47 ) -> None:
48 self.token = token
49 self.expires_on = expires_on
50 self.token_type = token_type
51 self.refresh_on = refresh_on
52
53 def __repr__(self) -> str:
54 return "AccessTokenInfo(token='{}', expires_on={}, token_type='{}', refresh_on={})".format(
55 self.token, self.expires_on, self.token_type, self.refresh_on
56 )
57
58
59class TokenRequestOptions(TypedDict, total=False):
60 """Options to use for access token requests. All parameters are optional."""
61
62 claims: str
63 """Additional claims required in the token, such as those returned in a resource provider's claims
64 challenge following an authorization failure."""
65 tenant_id: str
66 """The tenant ID to include in the token request."""
67 enable_cae: bool
68 """Indicates whether to enable Continuous Access Evaluation (CAE) for the requested token."""
69
70
71@runtime_checkable
72class TokenCredential(Protocol):
73 """Protocol for classes able to provide OAuth tokens."""
74
75 def get_token(
76 self,
77 *scopes: str,
78 claims: Optional[str] = None,
79 tenant_id: Optional[str] = None,
80 enable_cae: bool = False,
81 **kwargs: Any,
82 ) -> AccessToken:
83 """Request an access token for `scopes`.
84
85 :param str scopes: The type of access needed.
86
87 :keyword str claims: Additional claims required in the token, such as those returned in a resource
88 provider's claims challenge following an authorization failure.
89 :keyword str tenant_id: Optional tenant to include in the token request.
90 :keyword bool enable_cae: Indicates whether to enable Continuous Access Evaluation (CAE) for the requested
91 token. Defaults to False.
92
93 :rtype: AccessToken
94 :return: An AccessToken instance containing the token string and its expiration time in Unix time.
95 """
96 ...
97
98
99@runtime_checkable
100class SupportsTokenInfo(Protocol, ContextManager["SupportsTokenInfo"]):
101 """Protocol for classes able to provide OAuth access tokens with additional properties."""
102
103 def get_token_info(self, *scopes: str, options: Optional[TokenRequestOptions] = None) -> AccessTokenInfo:
104 """Request an access token for `scopes`.
105
106 This is an alternative to `get_token` to enable certain scenarios that require additional properties
107 on the token.
108
109 :param str scopes: The type of access needed.
110 :keyword options: A dictionary of options for the token request. Unknown options will be ignored. Optional.
111 :paramtype options: TokenRequestOptions
112
113 :rtype: AccessTokenInfo
114 :return: An AccessTokenInfo instance containing information about the token.
115 """
116 ...
117
118 def close(self) -> None:
119 """Close the credential, releasing any resources it holds.
120
121 :return: None
122 :rtype: None
123 """
124
125
126TokenProvider = Union[TokenCredential, SupportsTokenInfo]
127
128
129class AzureNamedKey(NamedTuple):
130 """Represents a name and key pair."""
131
132 name: str
133 key: str
134
135
136__all__ = [
137 "AzureKeyCredential",
138 "AzureSasCredential",
139 "AccessToken",
140 "AccessTokenInfo",
141 "SupportsTokenInfo",
142 "AzureNamedKeyCredential",
143 "TokenCredential",
144 "TokenRequestOptions",
145 "TokenProvider",
146]
147
148
149class AzureKeyCredential:
150 """Credential type used for authenticating to an Azure service.
151 It provides the ability to update the key without creating a new client.
152
153 :param str key: The key used to authenticate to an Azure service
154 :raises TypeError: If the key is not a string.
155 """
156
157 def __init__(self, key: str) -> None:
158 if not isinstance(key, str):
159 raise TypeError("key must be a string.")
160 self._key = key
161
162 @property
163 def key(self) -> str:
164 """The value of the configured key.
165
166 :rtype: str
167 :return: The value of the configured key.
168 """
169 return self._key
170
171 def update(self, key: str) -> None:
172 """Update the key.
173
174 This can be used when you've regenerated your service key and want
175 to update long-lived clients.
176
177 :param str key: The key used to authenticate to an Azure service
178 :raises ValueError or TypeError: If the key is None, empty, or not a string.
179 """
180 if not key:
181 raise ValueError("The key used for updating can not be None or empty")
182 if not isinstance(key, str):
183 raise TypeError("The key used for updating must be a string.")
184 self._key = key
185
186
187class AzureSasCredential:
188 """Credential type used for authenticating to an Azure service.
189 It provides the ability to update the shared access signature without creating a new client.
190
191 :param str signature: The shared access signature used to authenticate to an Azure service
192 :raises TypeError: If the signature is not a string.
193 """
194
195 def __init__(self, signature: str) -> None:
196 if not isinstance(signature, str):
197 raise TypeError("signature must be a string.")
198 self._signature = signature
199
200 @property
201 def signature(self) -> str:
202 """The value of the configured shared access signature.
203
204 :rtype: str
205 :return: The value of the configured shared access signature.
206 """
207 return self._signature
208
209 def update(self, signature: str) -> None:
210 """Update the shared access signature.
211
212 This can be used when you've regenerated your shared access signature and want
213 to update long-lived clients.
214
215 :param str signature: The shared access signature used to authenticate to an Azure service
216 :raises ValueError: If the signature is None or empty.
217 :raises TypeError: If the signature is not a string.
218 """
219 if not signature:
220 raise ValueError("The signature used for updating can not be None or empty")
221 if not isinstance(signature, str):
222 raise TypeError("The signature used for updating must be a string.")
223 self._signature = signature
224
225
226class AzureNamedKeyCredential:
227 """Credential type used for working with any service needing a named key that follows patterns
228 established by the other credential types.
229
230 :param str name: The name of the credential used to authenticate to an Azure service.
231 :param str key: The key used to authenticate to an Azure service.
232 :raises TypeError: If the name or key is not a string.
233 """
234
235 def __init__(self, name: str, key: str) -> None:
236 if not isinstance(name, str) or not isinstance(key, str):
237 raise TypeError("Both name and key must be strings.")
238 self._credential = AzureNamedKey(name, key)
239
240 @property
241 def named_key(self) -> AzureNamedKey:
242 """The value of the configured name.
243
244 :rtype: AzureNamedKey
245 :return: The value of the configured name.
246 """
247 return self._credential
248
249 def update(self, name: str, key: str) -> None:
250 """Update the named key credential.
251
252 Both name and key must be provided in order to update the named key credential.
253 Individual attributes cannot be updated.
254
255 :param str name: The name of the credential used to authenticate to an Azure service.
256 :param str key: The key used to authenticate to an Azure service.
257 """
258 if not isinstance(name, str) or not isinstance(key, str):
259 raise TypeError("Both name and key must be strings.")
260 self._credential = AzureNamedKey(name, key)