Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/azure/core/credentials.py: 61%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

85 statements  

1# ------------------------------------------------------------------------- 

2# Copyright (c) Microsoft Corporation. All rights reserved. 

3# Licensed under the MIT License. See LICENSE.txt in the project root for 

4# license information. 

5# ------------------------------------------------------------------------- 

6from typing import Any, NamedTuple, Optional, TypedDict, Union, ContextManager 

7from typing_extensions import Protocol, runtime_checkable 

8 

9 

10class AccessToken(NamedTuple): 

11 """Represents an OAuth access token.""" 

12 

13 token: str 

14 """The token string.""" 

15 expires_on: int 

16 """The token's expiration time in Unix time.""" 

17 

18 

19class AccessTokenInfo: 

20 """Information about an OAuth access token. 

21 

22 This class is an alternative to `AccessToken` which provides additional information about the token. 

23 

24 :param str token: The token string. 

25 :param int expires_on: The token's expiration time in Unix time. 

26 :keyword str token_type: The type of access token. Defaults to 'Bearer'. 

27 :keyword int refresh_on: Specifies the time, in Unix time, when the cached token should be proactively 

28 refreshed. Optional. 

29 """ 

30 

31 token: str 

32 """The token string.""" 

33 expires_on: int 

34 """The token's expiration time in Unix time.""" 

35 token_type: str 

36 """The type of access token.""" 

37 refresh_on: Optional[int] 

38 """Specifies the time, in Unix time, when the cached token should be proactively refreshed. Optional.""" 

39 

40 def __init__( 

41 self, 

42 token: str, 

43 expires_on: int, 

44 *, 

45 token_type: str = "Bearer", 

46 refresh_on: Optional[int] = None, 

47 ) -> None: 

48 self.token = token 

49 self.expires_on = expires_on 

50 self.token_type = token_type 

51 self.refresh_on = refresh_on 

52 

53 def __repr__(self) -> str: 

54 return "AccessTokenInfo(token='{}', expires_on={}, token_type='{}', refresh_on={})".format( 

55 self.token, self.expires_on, self.token_type, self.refresh_on 

56 ) 

57 

58 

59class TokenRequestOptions(TypedDict, total=False): 

60 """Options to use for access token requests. All parameters are optional.""" 

61 

62 claims: str 

63 """Additional claims required in the token, such as those returned in a resource provider's claims 

64 challenge following an authorization failure.""" 

65 tenant_id: str 

66 """The tenant ID to include in the token request.""" 

67 enable_cae: bool 

68 """Indicates whether to enable Continuous Access Evaluation (CAE) for the requested token.""" 

69 

70 

71@runtime_checkable 

72class TokenCredential(Protocol): 

73 """Protocol for classes able to provide OAuth tokens.""" 

74 

75 def get_token( 

76 self, 

77 *scopes: str, 

78 claims: Optional[str] = None, 

79 tenant_id: Optional[str] = None, 

80 enable_cae: bool = False, 

81 **kwargs: Any, 

82 ) -> AccessToken: 

83 """Request an access token for `scopes`. 

84 

85 :param str scopes: The type of access needed. 

86 

87 :keyword str claims: Additional claims required in the token, such as those returned in a resource 

88 provider's claims challenge following an authorization failure. 

89 :keyword str tenant_id: Optional tenant to include in the token request. 

90 :keyword bool enable_cae: Indicates whether to enable Continuous Access Evaluation (CAE) for the requested 

91 token. Defaults to False. 

92 

93 :rtype: AccessToken 

94 :return: An AccessToken instance containing the token string and its expiration time in Unix time. 

95 """ 

96 ... 

97 

98 

99@runtime_checkable 

100class SupportsTokenInfo(Protocol, ContextManager["SupportsTokenInfo"]): 

101 """Protocol for classes able to provide OAuth access tokens with additional properties.""" 

102 

103 def get_token_info(self, *scopes: str, options: Optional[TokenRequestOptions] = None) -> AccessTokenInfo: 

104 """Request an access token for `scopes`. 

105 

106 This is an alternative to `get_token` to enable certain scenarios that require additional properties 

107 on the token. 

108 

109 :param str scopes: The type of access needed. 

110 :keyword options: A dictionary of options for the token request. Unknown options will be ignored. Optional. 

111 :paramtype options: TokenRequestOptions 

112 

113 :rtype: AccessTokenInfo 

114 :return: An AccessTokenInfo instance containing information about the token. 

115 """ 

116 ... 

117 

118 def close(self) -> None: 

119 """Close the credential, releasing any resources it holds. 

120 

121 :return: None 

122 :rtype: None 

123 """ 

124 

125 

126TokenProvider = Union[TokenCredential, SupportsTokenInfo] 

127 

128 

129class AzureNamedKey(NamedTuple): 

130 """Represents a name and key pair.""" 

131 

132 name: str 

133 key: str 

134 

135 

136__all__ = [ 

137 "AzureKeyCredential", 

138 "AzureSasCredential", 

139 "AccessToken", 

140 "AccessTokenInfo", 

141 "SupportsTokenInfo", 

142 "AzureNamedKeyCredential", 

143 "TokenCredential", 

144 "TokenRequestOptions", 

145 "TokenProvider", 

146] 

147 

148 

149class AzureKeyCredential: 

150 """Credential type used for authenticating to an Azure service. 

151 It provides the ability to update the key without creating a new client. 

152 

153 :param str key: The key used to authenticate to an Azure service 

154 :raises TypeError: If the key is not a string. 

155 """ 

156 

157 def __init__(self, key: str) -> None: 

158 if not isinstance(key, str): 

159 raise TypeError("key must be a string.") 

160 self._key = key 

161 

162 @property 

163 def key(self) -> str: 

164 """The value of the configured key. 

165 

166 :rtype: str 

167 :return: The value of the configured key. 

168 """ 

169 return self._key 

170 

171 def update(self, key: str) -> None: 

172 """Update the key. 

173 

174 This can be used when you've regenerated your service key and want 

175 to update long-lived clients. 

176 

177 :param str key: The key used to authenticate to an Azure service 

178 :raises ValueError or TypeError: If the key is None, empty, or not a string. 

179 """ 

180 if not key: 

181 raise ValueError("The key used for updating can not be None or empty") 

182 if not isinstance(key, str): 

183 raise TypeError("The key used for updating must be a string.") 

184 self._key = key 

185 

186 

187class AzureSasCredential: 

188 """Credential type used for authenticating to an Azure service. 

189 It provides the ability to update the shared access signature without creating a new client. 

190 

191 :param str signature: The shared access signature used to authenticate to an Azure service 

192 :raises TypeError: If the signature is not a string. 

193 """ 

194 

195 def __init__(self, signature: str) -> None: 

196 if not isinstance(signature, str): 

197 raise TypeError("signature must be a string.") 

198 self._signature = signature 

199 

200 @property 

201 def signature(self) -> str: 

202 """The value of the configured shared access signature. 

203 

204 :rtype: str 

205 :return: The value of the configured shared access signature. 

206 """ 

207 return self._signature 

208 

209 def update(self, signature: str) -> None: 

210 """Update the shared access signature. 

211 

212 This can be used when you've regenerated your shared access signature and want 

213 to update long-lived clients. 

214 

215 :param str signature: The shared access signature used to authenticate to an Azure service 

216 :raises ValueError: If the signature is None or empty. 

217 :raises TypeError: If the signature is not a string. 

218 """ 

219 if not signature: 

220 raise ValueError("The signature used for updating can not be None or empty") 

221 if not isinstance(signature, str): 

222 raise TypeError("The signature used for updating must be a string.") 

223 self._signature = signature 

224 

225 

226class AzureNamedKeyCredential: 

227 """Credential type used for working with any service needing a named key that follows patterns 

228 established by the other credential types. 

229 

230 :param str name: The name of the credential used to authenticate to an Azure service. 

231 :param str key: The key used to authenticate to an Azure service. 

232 :raises TypeError: If the name or key is not a string. 

233 """ 

234 

235 def __init__(self, name: str, key: str) -> None: 

236 if not isinstance(name, str) or not isinstance(key, str): 

237 raise TypeError("Both name and key must be strings.") 

238 self._credential = AzureNamedKey(name, key) 

239 

240 @property 

241 def named_key(self) -> AzureNamedKey: 

242 """The value of the configured name. 

243 

244 :rtype: AzureNamedKey 

245 :return: The value of the configured name. 

246 """ 

247 return self._credential 

248 

249 def update(self, name: str, key: str) -> None: 

250 """Update the named key credential. 

251 

252 Both name and key must be provided in order to update the named key credential. 

253 Individual attributes cannot be updated. 

254 

255 :param str name: The name of the credential used to authenticate to an Azure service. 

256 :param str key: The key used to authenticate to an Azure service. 

257 """ 

258 if not isinstance(name, str) or not isinstance(key, str): 

259 raise TypeError("Both name and key must be strings.") 

260 self._credential = AzureNamedKey(name, key)