Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/msal/managed_identity.py: 29%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

230 statements  

1# Copyright (c) Microsoft Corporation. 

2# All rights reserved. 

3# 

4# This code is licensed under the MIT License. 

5import hashlib 

6import json 

7import logging 

8import os 

9import sys 

10import time 

11import uuid 

12from urllib.parse import urlparse # Python 3+ 

13from collections import UserDict # Python 3+ 

14from typing import List, Optional, Union # Needed in Python 3.7 & 3.8 

15from .token_cache import TokenCache 

16from .individual_cache import _IndividualCache as IndividualCache 

17from .throttled_http_client import ThrottledHttpClientBase, RetryAfterParser 

18from .cloudshell import _is_running_in_cloud_shell 

19from .sku import SKU, __version__ 

20 

21 

22logger = logging.getLogger(__name__) 

23 

24 

25class ManagedIdentityError(ValueError): 

26 pass 

27 

28 

29class ManagedIdentity(UserDict): 

30 """Feed an instance of this class to :class:`msal.ManagedIdentityClient` 

31 to acquire token for the specified managed identity. 

32 """ 

33 # The key names used in config dict 

34 ID_TYPE = "ManagedIdentityIdType" # Contains keyword ManagedIdentity so its json equivalent will be more readable 

35 ID = "Id" 

36 

37 # Valid values for key ID_TYPE 

38 CLIENT_ID = "ClientId" 

39 RESOURCE_ID = "ResourceId" 

40 OBJECT_ID = "ObjectId" 

41 SYSTEM_ASSIGNED = "SystemAssigned" 

42 

43 _types_mapping = { # Maps type name in configuration to type name on wire 

44 CLIENT_ID: "client_id", 

45 RESOURCE_ID: "msi_res_id", # VM's IMDS prefers msi_res_id https://github.com/Azure/azure-rest-api-specs/blob/dba6ed1f03bda88ac6884c0a883246446cc72495/specification/imds/data-plane/Microsoft.InstanceMetadataService/stable/2018-10-01/imds.json#L233-L239 

46 OBJECT_ID: "object_id", 

47 } 

48 

49 @classmethod 

50 def is_managed_identity(cls, unknown): 

51 return (isinstance(unknown, ManagedIdentity) 

52 or cls.is_system_assigned(unknown) 

53 or cls.is_user_assigned(unknown)) 

54 

55 @classmethod 

56 def is_system_assigned(cls, unknown): 

57 return isinstance(unknown, SystemAssignedManagedIdentity) or ( 

58 isinstance(unknown, dict) 

59 and unknown.get(cls.ID_TYPE) == cls.SYSTEM_ASSIGNED) 

60 

61 @classmethod 

62 def is_user_assigned(cls, unknown): 

63 return isinstance(unknown, UserAssignedManagedIdentity) or ( 

64 isinstance(unknown, dict) 

65 and unknown.get(cls.ID_TYPE) in cls._types_mapping 

66 and unknown.get(cls.ID)) 

67 

68 def __init__(self, identifier=None, id_type=None): 

69 # Undocumented. Use subclasses instead. 

70 super(ManagedIdentity, self).__init__({ 

71 self.ID_TYPE: id_type, 

72 self.ID: identifier, 

73 }) 

74 

75 

76class SystemAssignedManagedIdentity(ManagedIdentity): 

77 """Represent a system-assigned managed identity. 

78 

79 It is equivalent to a Python dict of:: 

80 

81 {"ManagedIdentityIdType": "SystemAssigned", "Id": None} 

82 

83 or a JSON blob of:: 

84 

85 {"ManagedIdentityIdType": "SystemAssigned", "Id": null} 

86 """ 

87 def __init__(self): 

88 super(SystemAssignedManagedIdentity, self).__init__(id_type=self.SYSTEM_ASSIGNED) 

89 

90 

91class UserAssignedManagedIdentity(ManagedIdentity): 

92 """Represent a user-assigned managed identity. 

93 

94 Depends on the id you provided, the outcome is equivalent to one of the below:: 

95 

96 {"ManagedIdentityIdType": "ClientId", "Id": "foo"} 

97 {"ManagedIdentityIdType": "ResourceId", "Id": "foo"} 

98 {"ManagedIdentityIdType": "ObjectId", "Id": "foo"} 

99 """ 

100 def __init__(self, *, client_id=None, resource_id=None, object_id=None): 

101 if client_id and not resource_id and not object_id: 

102 super(UserAssignedManagedIdentity, self).__init__( 

103 id_type=self.CLIENT_ID, identifier=client_id) 

104 elif not client_id and resource_id and not object_id: 

105 super(UserAssignedManagedIdentity, self).__init__( 

106 id_type=self.RESOURCE_ID, identifier=resource_id) 

107 elif not client_id and not resource_id and object_id: 

108 super(UserAssignedManagedIdentity, self).__init__( 

109 id_type=self.OBJECT_ID, identifier=object_id) 

110 else: 

111 raise ManagedIdentityError( 

112 "You shall specify one of the three parameters: " 

113 "client_id, resource_id, object_id") 

114 

115 

116class _ThrottledHttpClient(ThrottledHttpClientBase): 

117 def __init__(self, *args, **kwargs): 

118 super(_ThrottledHttpClient, self).__init__(*args, **kwargs) 

119 self.get = IndividualCache( # All MIs (except Cloud Shell) use GETs 

120 mapping=self._expiring_mapping, 

121 key_maker=lambda func, args, kwargs: "REQ {} hash={} 429/5xx/Retry-After".format( 

122 args[0], # It is the endpoint, typically a constant per MI type 

123 self._hash( 

124 # Managed Identity flavors have inconsistent parameters. 

125 # We simply choose to hash them all. 

126 str(kwargs.get("params")) + str(kwargs.get("data"))), 

127 ), 

128 expires_in=RetryAfterParser(5).parse, # 5 seconds default for non-PCA 

129 )(self.get) # Note: Decorate the parent get(), not the http_client.get() 

130 

131 

132class ManagedIdentityClient(object): 

133 """This API encapsulates multiple managed identity back-ends: 

134 VM, App Service, Azure Automation (Runbooks), Azure Function, Service Fabric, 

135 and Azure Arc. 

136 

137 It also provides token cache support. 

138 

139 .. note:: 

140 

141 Cloud Shell support is NOT implemented in this class. 

142 Since MSAL Python 1.18 in May 2022, it has been implemented in 

143 :func:`PublicClientApplication.acquire_token_interactive` via calling pattern 

144 ``PublicClientApplication(...).acquire_token_interactive(scopes=[...], prompt="none")``. 

145 That is appropriate, because Cloud Shell yields a token with 

146 delegated permissions for the end user who has signed in to the Azure Portal 

147 (like what a ``PublicClientApplication`` does), 

148 not a token with application permissions for an app. 

149 """ 

150 __instance = "localhost" # We used to get this value from socket.getfqdn() 

151 # but it is unreliable because getfqdn() either hangs or returns empty value 

152 # on some misconfigured machines 

153 _tenant = "managed_identity" 

154 _TOKEN_SOURCE = "token_source" 

155 _TOKEN_SOURCE_IDP = "identity_provider" 

156 _TOKEN_SOURCE_CACHE = "cache" 

157 

158 def __init__( 

159 self, 

160 managed_identity: Union[ 

161 dict, 

162 ManagedIdentity, # Could use Type[ManagedIdentity] but it is deprecated in Python 3.9+ 

163 SystemAssignedManagedIdentity, 

164 UserAssignedManagedIdentity, 

165 ], 

166 *, 

167 http_client, 

168 token_cache=None, 

169 http_cache=None, 

170 client_capabilities: Optional[List[str]] = None, 

171 ): 

172 """Create a managed identity client. 

173 

174 :param managed_identity: 

175 It accepts an instance of :class:`SystemAssignedManagedIdentity` 

176 or :class:`UserAssignedManagedIdentity`. 

177 They are equivalent to a dict with a certain shape, 

178 which may be loaded from a JSON configuration file or an env var. 

179 

180 :param http_client: 

181 An http client object. For example, you can use ``requests.Session()``, 

182 optionally with exponential backoff behavior demonstrated in this recipe:: 

183 

184 import msal, requests 

185 from requests.adapters import HTTPAdapter, Retry 

186 s = requests.Session() 

187 retries = Retry(total=3, backoff_factor=0.1, status_forcelist=[ 

188 429, 500, 501, 502, 503, 504]) 

189 s.mount('https://', HTTPAdapter(max_retries=retries)) 

190 managed_identity = ... 

191 client = msal.ManagedIdentityClient(managed_identity, http_client=s) 

192 

193 :param token_cache: 

194 Optional. It accepts a :class:`msal.TokenCache` instance to store tokens. 

195 It will use an in-memory token cache by default. 

196 

197 :param http_cache: 

198 Optional. It has the same characteristics as the 

199 :paramref:`msal.ClientApplication.http_cache`. 

200 

201 :param list[str] client_capabilities: (optional) 

202 Allows configuration of one or more client capabilities, e.g. ["CP1"]. 

203 

204 Client capability is meant to inform the Microsoft identity platform 

205 (STS) what this client is capable for, 

206 so STS can decide to turn on certain features. 

207 

208 Implementation details: 

209 Client capability in Managed Identity is relayed as-is 

210 via ``xms_cc`` parameter on the wire. 

211 

212 Recipe 1: Hard code a managed identity for your app:: 

213 

214 import msal, requests 

215 client = msal.ManagedIdentityClient( 

216 msal.UserAssignedManagedIdentity(client_id="foo"), 

217 http_client=requests.Session(), 

218 ) 

219 token = client.acquire_token_for_client("resource") 

220 

221 Recipe 2: Write once, run everywhere. 

222 If you use different managed identity on different deployment, 

223 you may use an environment variable (such as MY_MANAGED_IDENTITY_CONFIG) 

224 to store a json blob like 

225 ``{"ManagedIdentityIdType": "ClientId", "Id": "foo"}`` or 

226 ``{"ManagedIdentityIdType": "SystemAssigned", "Id": null}``. 

227 The following app can load managed identity configuration dynamically:: 

228 

229 import json, os, msal, requests 

230 config = os.getenv("MY_MANAGED_IDENTITY_CONFIG") 

231 assert config, "An ENV VAR with value should exist" 

232 client = msal.ManagedIdentityClient( 

233 json.loads(config), 

234 http_client=requests.Session(), 

235 ) 

236 token = client.acquire_token_for_client("resource") 

237 """ 

238 if not ManagedIdentity.is_managed_identity(managed_identity): 

239 raise ManagedIdentityError( 

240 f"Incorrect managed_identity: {managed_identity}") 

241 self._managed_identity = managed_identity 

242 self._http_client = _ThrottledHttpClient( 

243 # This class only throttles excess token acquisition requests. 

244 # It does not provide retry. 

245 # Retry is the http_client or caller's responsibility, not MSAL's. 

246 # 

247 # FWIW, here is the inconsistent retry recommendation. 

248 # 1. Only MI on VM defines exotic 404 and 410 retry recommendations 

249 # ( https://learn.microsoft.com/en-us/entra/identity/managed-identities-azure-resources/how-to-use-vm-token#error-handling ) 

250 # (especially for 410 which was supposed to be a permanent failure). 

251 # 2. MI on Service Fabric specifically suggests to not retry on 404. 

252 # ( https://learn.microsoft.com/en-us/azure/service-fabric/how-to-managed-cluster-managed-identity-service-fabric-app-code#error-handling ) 

253 http_client, 

254 http_cache=http_cache, 

255 ) 

256 self._token_cache = token_cache or TokenCache() 

257 self._client_capabilities = client_capabilities 

258 

259 def acquire_token_for_client( 

260 self, 

261 *, 

262 resource: str, # If/when we support scope, resource will become optional 

263 claims_challenge: Optional[str] = None, 

264 ): 

265 """Acquire token for the managed identity. 

266 

267 The result will be automatically cached. 

268 Subsequent calls will automatically search from cache first. 

269 

270 :param resource: The resource for which the token is acquired. 

271 

272 :param claims_challenge: 

273 Optional. 

274 It is a string representation of a JSON object 

275 (which contains lists of claims being requested). 

276 

277 The tenant admin may choose to revoke all Managed Identity tokens, 

278 and then a *claims challenge* will be returned by the target resource, 

279 as a `claims_challenge` directive in the `www-authenticate` header, 

280 even if the app developer did not opt in for the "CP1" client capability. 

281 Upon receiving a `claims_challenge`, MSAL will attempt to acquire a new token. 

282 

283 .. note:: 

284 

285 Known issue: When an Azure VM has only one user-assigned managed identity, 

286 and your app specifies to use system-assigned managed identity, 

287 Azure VM may still return a token for your user-assigned identity. 

288 

289 This is a service-side behavior that cannot be changed by this library. 

290 `Azure VM docs <https://learn.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/how-to-use-vm-token#get-a-token-using-http>`_ 

291 """ 

292 access_token_to_refresh = None # This could become a public parameter in the future 

293 access_token_from_cache = None 

294 client_id_in_cache = self._managed_identity.get( 

295 ManagedIdentity.ID, "SYSTEM_ASSIGNED_MANAGED_IDENTITY") 

296 now = time.time() 

297 if True: # Attempt cache search even if receiving claims_challenge, 

298 # because we want to locate the existing token (if any) and refresh it 

299 matches = self._token_cache.search( 

300 self._token_cache.CredentialType.ACCESS_TOKEN, 

301 target=[resource], 

302 query=dict( 

303 client_id=client_id_in_cache, 

304 environment=self.__instance, 

305 realm=self._tenant, 

306 home_account_id=None, 

307 ), 

308 ) 

309 for entry in matches: 

310 expires_in = int(entry["expires_on"]) - now 

311 if expires_in < 5*60: # Then consider it expired 

312 continue # Removal is not necessary, it will be overwritten 

313 if claims_challenge and not access_token_to_refresh: 

314 # Since caller did not pinpoint the token causing claims challenge, 

315 # we have to assume it is the first token we found in cache. 

316 access_token_to_refresh = entry["secret"] 

317 break 

318 logger.debug("Cache hit an AT") 

319 access_token_from_cache = { # Mimic a real response 

320 "access_token": entry["secret"], 

321 "token_type": entry.get("token_type", "Bearer"), 

322 "expires_in": int(expires_in), # OAuth2 specs defines it as int 

323 self._TOKEN_SOURCE: self._TOKEN_SOURCE_CACHE, 

324 } 

325 if "refresh_on" in entry: 

326 access_token_from_cache["refresh_on"] = int(entry["refresh_on"]) 

327 if int(entry["refresh_on"]) < now: # aging 

328 break # With a fallback in hand, we break here to go refresh 

329 return access_token_from_cache # It is still good as new 

330 try: 

331 result = _obtain_token( 

332 self._http_client, self._managed_identity, resource, 

333 access_token_sha256_to_refresh=hashlib.sha256( 

334 access_token_to_refresh.encode("utf-8")).hexdigest() 

335 if access_token_to_refresh else None, 

336 client_capabilities=self._client_capabilities, 

337 ) 

338 if "access_token" in result: 

339 expires_in = result.get("expires_in", 3600) 

340 if "refresh_in" not in result and expires_in >= 7200: 

341 result["refresh_in"] = int(expires_in / 2) 

342 self._token_cache.add(dict( 

343 client_id=client_id_in_cache, 

344 scope=[resource], 

345 token_endpoint="https://{}/{}".format( 

346 self.__instance, self._tenant), 

347 response=result, 

348 params={}, 

349 data={}, 

350 )) 

351 if "refresh_in" in result: 

352 result["refresh_on"] = int(now + result["refresh_in"]) 

353 result[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

354 if (result and "error" not in result) or (not access_token_from_cache): 

355 return result 

356 except: # The exact HTTP exception is transportation-layer dependent 

357 # Typically network error. Potential AAD outage? 

358 if not access_token_from_cache: # It means there is no fall back option 

359 raise # We choose to bubble up the exception 

360 return access_token_from_cache 

361 

362 

363def _scope_to_resource(scope): # This is an experimental reasonable-effort approach 

364 u = urlparse(scope) 

365 if u.scheme: 

366 return "{}://{}".format(u.scheme, u.netloc) 

367 return scope # There is no much else we can do here 

368 

369 

370def _get_arc_endpoint(): 

371 if "IDENTITY_ENDPOINT" in os.environ and "IMDS_ENDPOINT" in os.environ: 

372 return os.environ["IDENTITY_ENDPOINT"] 

373 if ( # Defined in https://eng.ms/docs/cloud-ai-platform/azure-core/azure-management-and-platforms/control-plane-bburns/hybrid-resource-provider/azure-arc-for-servers/specs/extension_authoring 

374 sys.platform == "linux" and os.path.exists("/opt/azcmagent/bin/himds") 

375 or sys.platform == "win32" and os.path.exists(os.path.expandvars( 

376 # Avoid Windows-only "%EnvVar%" syntax so that tests can be run on Linux 

377 r"${ProgramFiles}\AzureConnectedMachineAgent\himds.exe" 

378 )) 

379 ): 

380 return "http://localhost:40342/metadata/identity/oauth2/token" 

381 

382 

383APP_SERVICE = object() 

384AZURE_ARC = object() 

385CLOUD_SHELL = object() # In MSAL Python, token acquisition was done by 

386 # PublicClientApplication(...).acquire_token_interactive(..., prompt="none") 

387MACHINE_LEARNING = object() 

388SERVICE_FABRIC = object() 

389DEFAULT_TO_VM = object() # Unknown environment; default to VM; you may want to probe 

390def get_managed_identity_source(): 

391 """Detect the current environment and return the likely identity source. 

392 

393 When this function returns ``CLOUD_SHELL``, you should use 

394 :func:`msal.PublicClientApplication.acquire_token_interactive` with ``prompt="none"`` 

395 to obtain a token. 

396 """ 

397 if ("IDENTITY_ENDPOINT" in os.environ and "IDENTITY_HEADER" in os.environ 

398 and "IDENTITY_SERVER_THUMBPRINT" in os.environ 

399 ): 

400 return SERVICE_FABRIC 

401 if "IDENTITY_ENDPOINT" in os.environ and "IDENTITY_HEADER" in os.environ: 

402 return APP_SERVICE 

403 if "MSI_ENDPOINT" in os.environ and "MSI_SECRET" in os.environ: 

404 return MACHINE_LEARNING 

405 if _get_arc_endpoint(): 

406 return AZURE_ARC 

407 if _is_running_in_cloud_shell(): 

408 return CLOUD_SHELL 

409 return DEFAULT_TO_VM 

410 

411 

412def _obtain_token( 

413 http_client, managed_identity, resource, 

414 *, 

415 access_token_sha256_to_refresh: Optional[str] = None, 

416 client_capabilities: Optional[List[str]] = None, 

417): 

418 if ("IDENTITY_ENDPOINT" in os.environ and "IDENTITY_HEADER" in os.environ 

419 and "IDENTITY_SERVER_THUMBPRINT" in os.environ 

420 ): 

421 if managed_identity: 

422 logger.debug( 

423 "Ignoring managed_identity parameter. " 

424 "Managed Identity in Service Fabric is configured in the cluster, " 

425 "not during runtime. See also " 

426 "https://learn.microsoft.com/en-us/azure/service-fabric/configure-existing-cluster-enable-managed-identity-token-service") 

427 return _obtain_token_on_service_fabric( 

428 http_client, 

429 os.environ["IDENTITY_ENDPOINT"], 

430 os.environ["IDENTITY_HEADER"], 

431 os.environ["IDENTITY_SERVER_THUMBPRINT"], 

432 resource, 

433 access_token_sha256_to_refresh=access_token_sha256_to_refresh, 

434 client_capabilities=client_capabilities, 

435 ) 

436 if "IDENTITY_ENDPOINT" in os.environ and "IDENTITY_HEADER" in os.environ: 

437 return _obtain_token_on_app_service( 

438 http_client, 

439 os.environ["IDENTITY_ENDPOINT"], 

440 os.environ["IDENTITY_HEADER"], 

441 managed_identity, 

442 resource, 

443 ) 

444 if "MSI_ENDPOINT" in os.environ and "MSI_SECRET" in os.environ: 

445 # Back ported from https://github.com/Azure/azure-sdk-for-python/blob/azure-identity_1.15.0/sdk/identity/azure-identity/azure/identity/_credentials/azure_ml.py 

446 return _obtain_token_on_machine_learning( 

447 http_client, 

448 os.environ["MSI_ENDPOINT"], 

449 os.environ["MSI_SECRET"], 

450 managed_identity, 

451 resource, 

452 ) 

453 arc_endpoint = _get_arc_endpoint() 

454 if arc_endpoint: 

455 if ManagedIdentity.is_user_assigned(managed_identity): 

456 raise ManagedIdentityError( # Note: Azure Identity for Python raised exception too 

457 "Invalid managed_identity parameter. " 

458 "Azure Arc supports only system-assigned managed identity, " 

459 "See also " 

460 "https://learn.microsoft.com/en-us/azure/service-fabric/configure-existing-cluster-enable-managed-identity-token-service") 

461 return _obtain_token_on_arc(http_client, arc_endpoint, resource) 

462 return _obtain_token_on_azure_vm(http_client, managed_identity, resource) 

463 

464 

465def _adjust_param(params, managed_identity, types_mapping=None): 

466 # Modify the params dict in place 

467 id_name = (types_mapping or ManagedIdentity._types_mapping).get( 

468 managed_identity.get(ManagedIdentity.ID_TYPE)) 

469 if id_name: 

470 params[id_name] = managed_identity[ManagedIdentity.ID] 

471 

472def _obtain_token_on_azure_vm(http_client, managed_identity, resource): 

473 # Based on https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/how-to-use-vm-token#get-a-token-using-http 

474 logger.debug("Obtaining token via managed identity on Azure VM") 

475 params = { 

476 "api-version": "2018-02-01", 

477 "resource": resource, 

478 } 

479 _adjust_param(params, managed_identity) 

480 resp = http_client.get( 

481 os.getenv( 

482 "AZURE_POD_IDENTITY_AUTHORITY_HOST", "http://169.254.169.254" 

483 ).strip("/") + "/metadata/identity/oauth2/token", 

484 params=params, 

485 headers={ 

486 "Metadata": "true", 

487 "x-client-SKU": SKU, 

488 "x-client-Ver": __version__, 

489 "x-ms-client-request-id": str(uuid.uuid4()), 

490 }, 

491 ) 

492 try: 

493 payload = json.loads(resp.text) 

494 if payload.get("access_token") and payload.get("expires_in"): 

495 return { # Normalizing the payload into OAuth2 format 

496 "access_token": payload["access_token"], 

497 "expires_in": int(payload["expires_in"]), 

498 "resource": payload.get("resource"), 

499 "token_type": payload.get("token_type", "Bearer"), 

500 } 

501 return payload # It would be {"error": ..., "error_description": ...} according to https://learn.microsoft.com/en-us/entra/identity/managed-identities-azure-resources/how-to-use-vm-token#error-handling 

502 except json.decoder.JSONDecodeError: 

503 logger.debug("IMDS emits unexpected payload: %s", resp.text) 

504 raise 

505 

506def _obtain_token_on_app_service( 

507 http_client, endpoint, identity_header, managed_identity, resource, 

508): 

509 """Obtains token for 

510 `App Service <https://learn.microsoft.com/en-us/azure/app-service/overview-managed-identity?tabs=portal%2Chttp#rest-endpoint-reference>`_, 

511 Azure Functions, and Azure Automation. 

512 """ 

513 # Prerequisite: Create your app service https://docs.microsoft.com/en-us/azure/app-service/quickstart-python 

514 # Assign it a managed identity https://docs.microsoft.com/en-us/azure/app-service/overview-managed-identity?tabs=portal%2Chttp 

515 # SSH into your container for testing https://docs.microsoft.com/en-us/azure/app-service/configure-linux-open-ssh-session 

516 logger.debug("Obtaining token via managed identity on Azure App Service") 

517 params = { 

518 "api-version": "2019-08-01", 

519 "resource": resource, 

520 } 

521 _adjust_param(params, managed_identity, types_mapping={ 

522 ManagedIdentity.CLIENT_ID: "client_id", 

523 ManagedIdentity.RESOURCE_ID: "mi_res_id", # App Service's resource id uses "mi_res_id" 

524 ManagedIdentity.OBJECT_ID: "object_id", 

525 }) 

526 

527 resp = http_client.get( 

528 endpoint, 

529 params=params, 

530 headers={ 

531 "X-IDENTITY-HEADER": identity_header, 

532 "Metadata": "true", # Unnecessary yet harmless for App Service, 

533 # It will be needed by Azure Automation 

534 # https://docs.microsoft.com/en-us/azure/automation/enable-managed-identity-for-automation#get-access-token-for-system-assigned-managed-identity-using-http-get 

535 }, 

536 ) 

537 try: 

538 payload = json.loads(resp.text) 

539 if payload.get("access_token") and payload.get("expires_on"): 

540 return { # Normalizing the payload into OAuth2 format 

541 "access_token": payload["access_token"], 

542 "expires_in": int(payload["expires_on"]) - int(time.time()), 

543 "resource": payload.get("resource"), 

544 "token_type": payload.get("token_type", "Bearer"), 

545 } 

546 return { 

547 "error": "invalid_scope", # Empirically, wrong resource ends up with a vague statusCode=500 

548 "error_description": "{}, {}".format( 

549 payload.get("statusCode"), payload.get("message")), 

550 } 

551 except json.decoder.JSONDecodeError: 

552 logger.debug("IMDS emits unexpected payload: %s", resp.text) 

553 raise 

554 

555def _obtain_token_on_machine_learning( 

556 http_client, endpoint, secret, managed_identity, resource, 

557): 

558 # Could not find protocol docs from https://docs.microsoft.com/en-us/azure/machine-learning 

559 # The following implementation is back ported from Azure Identity 1.15.0 

560 logger.debug("Obtaining token via managed identity on Azure Machine Learning") 

561 params = {"api-version": "2017-09-01", "resource": resource} 

562 _adjust_param(params, managed_identity) 

563 if params["api-version"] == "2017-09-01" and "client_id" in params: 

564 # Workaround for a known bug in Azure ML 2017 API 

565 params["clientid"] = params.pop("client_id") 

566 resp = http_client.get( 

567 endpoint, 

568 params=params, 

569 headers={"secret": secret}, 

570 ) 

571 try: 

572 payload = json.loads(resp.text) 

573 if payload.get("access_token") and payload.get("expires_on"): 

574 return { # Normalizing the payload into OAuth2 format 

575 "access_token": payload["access_token"], 

576 "expires_in": int(payload["expires_on"]) - int(time.time()), 

577 "resource": payload.get("resource"), 

578 "token_type": payload.get("token_type", "Bearer"), 

579 } 

580 return { 

581 "error": "invalid_scope", # TODO: To be tested 

582 "error_description": "{}".format(payload), 

583 } 

584 except json.decoder.JSONDecodeError: 

585 logger.debug("IMDS emits unexpected payload: %s", resp.text) 

586 raise 

587 

588 

589def _obtain_token_on_service_fabric( 

590 http_client, endpoint, identity_header, server_thumbprint, resource, 

591 *, 

592 access_token_sha256_to_refresh: str = None, 

593 client_capabilities: Optional[List[str]] = None, 

594): 

595 """Obtains token for 

596 `Service Fabric <https://learn.microsoft.com/en-us/azure/service-fabric/>`_ 

597 """ 

598 # Deployment https://learn.microsoft.com/en-us/azure/service-fabric/service-fabric-get-started-containers-linux 

599 # See also https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/identity/azure-identity/tests/managed-identity-live/service-fabric/service_fabric.md 

600 # Protocol https://learn.microsoft.com/en-us/azure/service-fabric/how-to-managed-identity-service-fabric-app-code#acquiring-an-access-token-using-rest-api 

601 logger.debug("Obtaining token via managed identity on Azure Service Fabric") 

602 resp = http_client.get( 

603 endpoint, 

604 params={k: v for k, v in { 

605 "api-version": "2019-07-01-preview", 

606 "resource": resource, 

607 "token_sha256_to_refresh": access_token_sha256_to_refresh, 

608 "xms_cc": ",".join(client_capabilities) if client_capabilities else None, 

609 }.items() if v is not None}, 

610 headers={"Secret": identity_header}, 

611 ) 

612 try: 

613 payload = json.loads(resp.text) 

614 if payload.get("access_token") and payload.get("expires_on"): 

615 return { # Normalizing the payload into OAuth2 format 

616 "access_token": payload["access_token"], 

617 "expires_in": int( # Despite the example in docs shows an integer, 

618 payload["expires_on"] # Azure SDK team's test obtained a string. 

619 ) - int(time.time()), 

620 "resource": payload.get("resource"), 

621 "token_type": payload["token_type"], 

622 } 

623 error = payload.get("error", {}) # https://learn.microsoft.com/en-us/azure/service-fabric/how-to-managed-identity-service-fabric-app-code#error-handling 

624 error_mapping = { # Map Service Fabric errors into OAuth2 errors https://www.rfc-editor.org/rfc/rfc6749#section-5.2 

625 "SecretHeaderNotFound": "unauthorized_client", 

626 "ManagedIdentityNotFound": "invalid_client", 

627 "ArgumentNullOrEmpty": "invalid_scope", 

628 } 

629 return { 

630 "error": error_mapping.get(error.get("code"), "invalid_request"), 

631 "error_description": resp.text, 

632 } 

633 except json.decoder.JSONDecodeError: 

634 logger.debug("IMDS emits unexpected payload: %s", resp.text) 

635 raise 

636 

637 

638_supported_arc_platforms_and_their_prefixes = { 

639 "linux": "/var/opt/azcmagent/tokens", 

640 "win32": os.path.expandvars(r"%ProgramData%\AzureConnectedMachineAgent\Tokens"), 

641} 

642 

643class ArcPlatformNotSupportedError(ManagedIdentityError): 

644 pass 

645 

646def _obtain_token_on_arc(http_client, endpoint, resource): 

647 # https://learn.microsoft.com/en-us/azure/azure-arc/servers/managed-identity-authentication 

648 logger.debug("Obtaining token via managed identity on Azure Arc") 

649 resp = http_client.get( 

650 endpoint, 

651 params={"api-version": "2020-06-01", "resource": resource}, 

652 headers={"Metadata": "true"}, 

653 ) 

654 www_auth = "www-authenticate" # Header in lower case 

655 challenge = { 

656 # Normalized to lowercase, because header names are case-insensitive 

657 # https://datatracker.ietf.org/doc/html/rfc7230#section-3.2 

658 k.lower(): v for k, v in resp.headers.items() if k.lower() == www_auth 

659 }.get(www_auth, "").split("=") # Output will be ["Basic realm", "content"] 

660 if not ( # https://datatracker.ietf.org/doc/html/rfc7617#section-2 

661 len(challenge) == 2 and challenge[0].lower() == "basic realm"): 

662 raise ManagedIdentityError( 

663 "Unrecognizable WWW-Authenticate header: {}".format(resp.headers)) 

664 if sys.platform not in _supported_arc_platforms_and_their_prefixes: 

665 raise ArcPlatformNotSupportedError( 

666 f"Platform {sys.platform} was undefined and unsupported") 

667 filename = os.path.join( 

668 # This algorithm is documented in an internal doc https://msazure.visualstudio.com/One/_wiki/wikis/One.wiki/233012/VM-Extension-Authoring-for-Arc?anchor=2.-obtaining-tokens 

669 _supported_arc_platforms_and_their_prefixes[sys.platform], 

670 os.path.splitext(os.path.basename(challenge[1]))[0] + ".key") 

671 if os.stat(filename).st_size > 4096: # Check size BEFORE loading its content 

672 raise ManagedIdentityError("Local key file shall not be larger than 4KB") 

673 with open(filename) as f: 

674 secret = f.read() 

675 response = http_client.get( 

676 endpoint, 

677 params={"api-version": "2020-06-01", "resource": resource}, 

678 headers={"Metadata": "true", "Authorization": "Basic {}".format(secret)}, 

679 ) 

680 try: 

681 payload = json.loads(response.text) 

682 if payload.get("access_token") and payload.get("expires_in"): 

683 # Example: https://learn.microsoft.com/en-us/azure/azure-arc/servers/media/managed-identity-authentication/bash-token-output-example.png 

684 return { 

685 "access_token": payload["access_token"], 

686 "expires_in": int(payload["expires_in"]), 

687 "token_type": payload.get("token_type", "Bearer"), 

688 "resource": payload.get("resource"), 

689 } 

690 except json.decoder.JSONDecodeError: 

691 pass 

692 return { 

693 "error": "invalid_request", 

694 "error_description": response.text, 

695 }