Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/azure/core/credentials.py: 58%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

78 statements  

1# ------------------------------------------------------------------------- 

2# Copyright (c) Microsoft Corporation. All rights reserved. 

3# Licensed under the MIT License. See LICENSE.txt in the project root for 

4# license information. 

5# ------------------------------------------------------------------------- 

6from typing import Any, NamedTuple, Optional, TypedDict, Union, ContextManager 

7from typing_extensions import Protocol, runtime_checkable 

8 

9 

10class AccessToken(NamedTuple): 

11 """Represents an OAuth access token.""" 

12 

13 token: str 

14 """The token string.""" 

15 expires_on: int 

16 """The token's expiration time in Unix time.""" 

17 

18 

19class AccessTokenInfo: 

20 """Information about an OAuth access token. 

21 

22 This class is an alternative to `AccessToken` which provides additional information about the token. 

23 

24 :param str token: The token string. 

25 :param int expires_on: The token's expiration time in Unix time. 

26 :keyword str token_type: The type of access token. Defaults to 'Bearer'. 

27 :keyword int refresh_on: Specifies the time, in Unix time, when the cached token should be proactively 

28 refreshed. Optional. 

29 """ 

30 

31 token: str 

32 """The token string.""" 

33 expires_on: int 

34 """The token's expiration time in Unix time.""" 

35 token_type: str 

36 """The type of access token.""" 

37 refresh_on: Optional[int] 

38 """Specifies the time, in Unix time, when the cached token should be proactively refreshed. Optional.""" 

39 

40 def __init__( 

41 self, token: str, expires_on: int, *, token_type: str = "Bearer", refresh_on: Optional[int] = None 

42 ) -> None: 

43 self.token = token 

44 self.expires_on = expires_on 

45 self.token_type = token_type 

46 self.refresh_on = refresh_on 

47 

48 def __repr__(self) -> str: 

49 return "AccessTokenInfo(token='{}', expires_on={}, token_type='{}', refresh_on={})".format( 

50 self.token, self.expires_on, self.token_type, self.refresh_on 

51 ) 

52 

53 

54class TokenRequestOptions(TypedDict, total=False): 

55 """Options to use for access token requests. All parameters are optional.""" 

56 

57 claims: str 

58 """Additional claims required in the token, such as those returned in a resource provider's claims 

59 challenge following an authorization failure.""" 

60 tenant_id: str 

61 """The tenant ID to include in the token request.""" 

62 enable_cae: bool 

63 """Indicates whether to enable Continuous Access Evaluation (CAE) for the requested token.""" 

64 

65 

66@runtime_checkable 

67class TokenCredential(Protocol): 

68 """Protocol for classes able to provide OAuth tokens.""" 

69 

70 def get_token( 

71 self, 

72 *scopes: str, 

73 claims: Optional[str] = None, 

74 tenant_id: Optional[str] = None, 

75 enable_cae: bool = False, 

76 **kwargs: Any, 

77 ) -> AccessToken: 

78 """Request an access token for `scopes`. 

79 

80 :param str scopes: The type of access needed. 

81 

82 :keyword str claims: Additional claims required in the token, such as those returned in a resource 

83 provider's claims challenge following an authorization failure. 

84 :keyword str tenant_id: Optional tenant to include in the token request. 

85 :keyword bool enable_cae: Indicates whether to enable Continuous Access Evaluation (CAE) for the requested 

86 token. Defaults to False. 

87 

88 :rtype: AccessToken 

89 :return: An AccessToken instance containing the token string and its expiration time in Unix time. 

90 """ 

91 ... 

92 

93 

94@runtime_checkable 

95class SupportsTokenInfo(Protocol, ContextManager["SupportsTokenInfo"]): 

96 """Protocol for classes able to provide OAuth access tokens with additional properties.""" 

97 

98 def get_token_info(self, *scopes: str, options: Optional[TokenRequestOptions] = None) -> AccessTokenInfo: 

99 """Request an access token for `scopes`. 

100 

101 This is an alternative to `get_token` to enable certain scenarios that require additional properties 

102 on the token. 

103 

104 :param str scopes: The type of access needed. 

105 :keyword options: A dictionary of options for the token request. Unknown options will be ignored. Optional. 

106 :paramtype options: TokenRequestOptions 

107 

108 :rtype: AccessTokenInfo 

109 :return: An AccessTokenInfo instance containing information about the token. 

110 """ 

111 ... 

112 

113 def close(self) -> None: 

114 pass 

115 

116 

117TokenProvider = Union[TokenCredential, SupportsTokenInfo] 

118 

119 

120class AzureNamedKey(NamedTuple): 

121 """Represents a name and key pair.""" 

122 

123 name: str 

124 key: str 

125 

126 

127__all__ = [ 

128 "AzureKeyCredential", 

129 "AzureSasCredential", 

130 "AccessToken", 

131 "AccessTokenInfo", 

132 "SupportsTokenInfo", 

133 "AzureNamedKeyCredential", 

134 "TokenCredential", 

135 "TokenRequestOptions", 

136 "TokenProvider", 

137] 

138 

139 

140class AzureKeyCredential: 

141 """Credential type used for authenticating to an Azure service. 

142 It provides the ability to update the key without creating a new client. 

143 

144 :param str key: The key used to authenticate to an Azure service 

145 :raises: TypeError 

146 """ 

147 

148 def __init__(self, key: str) -> None: 

149 if not isinstance(key, str): 

150 raise TypeError("key must be a string.") 

151 self._key = key 

152 

153 @property 

154 def key(self) -> str: 

155 """The value of the configured key. 

156 

157 :rtype: str 

158 :return: The value of the configured key. 

159 """ 

160 return self._key 

161 

162 def update(self, key: str) -> None: 

163 """Update the key. 

164 

165 This can be used when you've regenerated your service key and want 

166 to update long-lived clients. 

167 

168 :param str key: The key used to authenticate to an Azure service 

169 :raises: ValueError or TypeError 

170 """ 

171 if not key: 

172 raise ValueError("The key used for updating can not be None or empty") 

173 if not isinstance(key, str): 

174 raise TypeError("The key used for updating must be a string.") 

175 self._key = key 

176 

177 

178class AzureSasCredential: 

179 """Credential type used for authenticating to an Azure service. 

180 It provides the ability to update the shared access signature without creating a new client. 

181 

182 :param str signature: The shared access signature used to authenticate to an Azure service 

183 :raises: TypeError 

184 """ 

185 

186 def __init__(self, signature: str) -> None: 

187 if not isinstance(signature, str): 

188 raise TypeError("signature must be a string.") 

189 self._signature = signature 

190 

191 @property 

192 def signature(self) -> str: 

193 """The value of the configured shared access signature. 

194 

195 :rtype: str 

196 :return: The value of the configured shared access signature. 

197 """ 

198 return self._signature 

199 

200 def update(self, signature: str) -> None: 

201 """Update the shared access signature. 

202 

203 This can be used when you've regenerated your shared access signature and want 

204 to update long-lived clients. 

205 

206 :param str signature: The shared access signature used to authenticate to an Azure service 

207 :raises: ValueError or TypeError 

208 """ 

209 if not signature: 

210 raise ValueError("The signature used for updating can not be None or empty") 

211 if not isinstance(signature, str): 

212 raise TypeError("The signature used for updating must be a string.") 

213 self._signature = signature 

214 

215 

216class AzureNamedKeyCredential: 

217 """Credential type used for working with any service needing a named key that follows patterns 

218 established by the other credential types. 

219 

220 :param str name: The name of the credential used to authenticate to an Azure service. 

221 :param str key: The key used to authenticate to an Azure service. 

222 :raises: TypeError 

223 """ 

224 

225 def __init__(self, name: str, key: str) -> None: 

226 if not isinstance(name, str) or not isinstance(key, str): 

227 raise TypeError("Both name and key must be strings.") 

228 self._credential = AzureNamedKey(name, key) 

229 

230 @property 

231 def named_key(self) -> AzureNamedKey: 

232 """The value of the configured name. 

233 

234 :rtype: AzureNamedKey 

235 :return: The value of the configured name. 

236 """ 

237 return self._credential 

238 

239 def update(self, name: str, key: str) -> None: 

240 """Update the named key credential. 

241 

242 Both name and key must be provided in order to update the named key credential. 

243 Individual attributes cannot be updated. 

244 

245 :param str name: The name of the credential used to authenticate to an Azure service. 

246 :param str key: The key used to authenticate to an Azure service. 

247 """ 

248 if not isinstance(name, str) or not isinstance(key, str): 

249 raise TypeError("Both name and key must be strings.") 

250 self._credential = AzureNamedKey(name, key)