1# -------------------------------------------------------------------------
2# Copyright (c) Microsoft Corporation. All rights reserved.
3# Licensed under the MIT License. See LICENSE.txt in the project root for
4# license information.
5# -------------------------------------------------------------------------
6from typing import Any, NamedTuple, Optional, TypedDict, Union, ContextManager
7from typing_extensions import Protocol, runtime_checkable
8
9
10class AccessToken(NamedTuple):
11 """Represents an OAuth access token."""
12
13 token: str
14 """The token string."""
15 expires_on: int
16 """The token's expiration time in Unix time."""
17
18
19class AccessTokenInfo:
20 """Information about an OAuth access token.
21
22 This class is an alternative to `AccessToken` which provides additional information about the token.
23
24 :param str token: The token string.
25 :param int expires_on: The token's expiration time in Unix time.
26 :keyword str token_type: The type of access token. Defaults to 'Bearer'.
27 :keyword int refresh_on: Specifies the time, in Unix time, when the cached token should be proactively
28 refreshed. Optional.
29 """
30
31 token: str
32 """The token string."""
33 expires_on: int
34 """The token's expiration time in Unix time."""
35 token_type: str
36 """The type of access token."""
37 refresh_on: Optional[int]
38 """Specifies the time, in Unix time, when the cached token should be proactively refreshed. Optional."""
39
40 def __init__(
41 self, token: str, expires_on: int, *, token_type: str = "Bearer", refresh_on: Optional[int] = None
42 ) -> None:
43 self.token = token
44 self.expires_on = expires_on
45 self.token_type = token_type
46 self.refresh_on = refresh_on
47
48 def __repr__(self) -> str:
49 return "AccessTokenInfo(token='{}', expires_on={}, token_type='{}', refresh_on={})".format(
50 self.token, self.expires_on, self.token_type, self.refresh_on
51 )
52
53
54class TokenRequestOptions(TypedDict, total=False):
55 """Options to use for access token requests. All parameters are optional."""
56
57 claims: str
58 """Additional claims required in the token, such as those returned in a resource provider's claims
59 challenge following an authorization failure."""
60 tenant_id: str
61 """The tenant ID to include in the token request."""
62 enable_cae: bool
63 """Indicates whether to enable Continuous Access Evaluation (CAE) for the requested token."""
64
65
66@runtime_checkable
67class TokenCredential(Protocol):
68 """Protocol for classes able to provide OAuth tokens."""
69
70 def get_token(
71 self,
72 *scopes: str,
73 claims: Optional[str] = None,
74 tenant_id: Optional[str] = None,
75 enable_cae: bool = False,
76 **kwargs: Any,
77 ) -> AccessToken:
78 """Request an access token for `scopes`.
79
80 :param str scopes: The type of access needed.
81
82 :keyword str claims: Additional claims required in the token, such as those returned in a resource
83 provider's claims challenge following an authorization failure.
84 :keyword str tenant_id: Optional tenant to include in the token request.
85 :keyword bool enable_cae: Indicates whether to enable Continuous Access Evaluation (CAE) for the requested
86 token. Defaults to False.
87
88 :rtype: AccessToken
89 :return: An AccessToken instance containing the token string and its expiration time in Unix time.
90 """
91 ...
92
93
94@runtime_checkable
95class SupportsTokenInfo(Protocol, ContextManager["SupportsTokenInfo"]):
96 """Protocol for classes able to provide OAuth access tokens with additional properties."""
97
98 def get_token_info(self, *scopes: str, options: Optional[TokenRequestOptions] = None) -> AccessTokenInfo:
99 """Request an access token for `scopes`.
100
101 This is an alternative to `get_token` to enable certain scenarios that require additional properties
102 on the token.
103
104 :param str scopes: The type of access needed.
105 :keyword options: A dictionary of options for the token request. Unknown options will be ignored. Optional.
106 :paramtype options: TokenRequestOptions
107
108 :rtype: AccessTokenInfo
109 :return: An AccessTokenInfo instance containing information about the token.
110 """
111 ...
112
113 def close(self) -> None:
114 pass
115
116
117TokenProvider = Union[TokenCredential, SupportsTokenInfo]
118
119
120class AzureNamedKey(NamedTuple):
121 """Represents a name and key pair."""
122
123 name: str
124 key: str
125
126
127__all__ = [
128 "AzureKeyCredential",
129 "AzureSasCredential",
130 "AccessToken",
131 "AccessTokenInfo",
132 "SupportsTokenInfo",
133 "AzureNamedKeyCredential",
134 "TokenCredential",
135 "TokenRequestOptions",
136 "TokenProvider",
137]
138
139
140class AzureKeyCredential:
141 """Credential type used for authenticating to an Azure service.
142 It provides the ability to update the key without creating a new client.
143
144 :param str key: The key used to authenticate to an Azure service
145 :raises: TypeError
146 """
147
148 def __init__(self, key: str) -> None:
149 if not isinstance(key, str):
150 raise TypeError("key must be a string.")
151 self._key = key
152
153 @property
154 def key(self) -> str:
155 """The value of the configured key.
156
157 :rtype: str
158 :return: The value of the configured key.
159 """
160 return self._key
161
162 def update(self, key: str) -> None:
163 """Update the key.
164
165 This can be used when you've regenerated your service key and want
166 to update long-lived clients.
167
168 :param str key: The key used to authenticate to an Azure service
169 :raises: ValueError or TypeError
170 """
171 if not key:
172 raise ValueError("The key used for updating can not be None or empty")
173 if not isinstance(key, str):
174 raise TypeError("The key used for updating must be a string.")
175 self._key = key
176
177
178class AzureSasCredential:
179 """Credential type used for authenticating to an Azure service.
180 It provides the ability to update the shared access signature without creating a new client.
181
182 :param str signature: The shared access signature used to authenticate to an Azure service
183 :raises: TypeError
184 """
185
186 def __init__(self, signature: str) -> None:
187 if not isinstance(signature, str):
188 raise TypeError("signature must be a string.")
189 self._signature = signature
190
191 @property
192 def signature(self) -> str:
193 """The value of the configured shared access signature.
194
195 :rtype: str
196 :return: The value of the configured shared access signature.
197 """
198 return self._signature
199
200 def update(self, signature: str) -> None:
201 """Update the shared access signature.
202
203 This can be used when you've regenerated your shared access signature and want
204 to update long-lived clients.
205
206 :param str signature: The shared access signature used to authenticate to an Azure service
207 :raises: ValueError or TypeError
208 """
209 if not signature:
210 raise ValueError("The signature used for updating can not be None or empty")
211 if not isinstance(signature, str):
212 raise TypeError("The signature used for updating must be a string.")
213 self._signature = signature
214
215
216class AzureNamedKeyCredential:
217 """Credential type used for working with any service needing a named key that follows patterns
218 established by the other credential types.
219
220 :param str name: The name of the credential used to authenticate to an Azure service.
221 :param str key: The key used to authenticate to an Azure service.
222 :raises: TypeError
223 """
224
225 def __init__(self, name: str, key: str) -> None:
226 if not isinstance(name, str) or not isinstance(key, str):
227 raise TypeError("Both name and key must be strings.")
228 self._credential = AzureNamedKey(name, key)
229
230 @property
231 def named_key(self) -> AzureNamedKey:
232 """The value of the configured name.
233
234 :rtype: AzureNamedKey
235 :return: The value of the configured name.
236 """
237 return self._credential
238
239 def update(self, name: str, key: str) -> None:
240 """Update the named key credential.
241
242 Both name and key must be provided in order to update the named key credential.
243 Individual attributes cannot be updated.
244
245 :param str name: The name of the credential used to authenticate to an Azure service.
246 :param str key: The key used to authenticate to an Azure service.
247 """
248 if not isinstance(name, str) or not isinstance(key, str):
249 raise TypeError("Both name and key must be strings.")
250 self._credential = AzureNamedKey(name, key)