Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/msal/managed_identity.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

232 statements  

1# Copyright (c) Microsoft Corporation. 

2# All rights reserved. 

3# 

4# This code is licensed under the MIT License. 

5import hashlib 

6import json 

7import logging 

8import os 

9import socket 

10import sys 

11import time 

12from urllib.parse import urlparse # Python 3+ 

13from collections import UserDict # Python 3+ 

14from typing import List, Optional, Union # Needed in Python 3.7 & 3.8 

15from .token_cache import TokenCache 

16from .individual_cache import _IndividualCache as IndividualCache 

17from .throttled_http_client import ThrottledHttpClientBase, RetryAfterParser 

18from .cloudshell import _is_running_in_cloud_shell 

19 

20 

21logger = logging.getLogger(__name__) 

22 

23 

24class ManagedIdentityError(ValueError): 

25 pass 

26 

27 

28class ManagedIdentity(UserDict): 

29 """Feed an instance of this class to :class:`msal.ManagedIdentityClient` 

30 to acquire token for the specified managed identity. 

31 """ 

32 # The key names used in config dict 

33 ID_TYPE = "ManagedIdentityIdType" # Contains keyword ManagedIdentity so its json equivalent will be more readable 

34 ID = "Id" 

35 

36 # Valid values for key ID_TYPE 

37 CLIENT_ID = "ClientId" 

38 RESOURCE_ID = "ResourceId" 

39 OBJECT_ID = "ObjectId" 

40 SYSTEM_ASSIGNED = "SystemAssigned" 

41 

42 _types_mapping = { # Maps type name in configuration to type name on wire 

43 CLIENT_ID: "client_id", 

44 RESOURCE_ID: "msi_res_id", # VM's IMDS prefers msi_res_id https://github.com/Azure/azure-rest-api-specs/blob/dba6ed1f03bda88ac6884c0a883246446cc72495/specification/imds/data-plane/Microsoft.InstanceMetadataService/stable/2018-10-01/imds.json#L233-L239 

45 OBJECT_ID: "object_id", 

46 } 

47 

48 @classmethod 

49 def is_managed_identity(cls, unknown): 

50 return (isinstance(unknown, ManagedIdentity) 

51 or cls.is_system_assigned(unknown) 

52 or cls.is_user_assigned(unknown)) 

53 

54 @classmethod 

55 def is_system_assigned(cls, unknown): 

56 return isinstance(unknown, SystemAssignedManagedIdentity) or ( 

57 isinstance(unknown, dict) 

58 and unknown.get(cls.ID_TYPE) == cls.SYSTEM_ASSIGNED) 

59 

60 @classmethod 

61 def is_user_assigned(cls, unknown): 

62 return isinstance(unknown, UserAssignedManagedIdentity) or ( 

63 isinstance(unknown, dict) 

64 and unknown.get(cls.ID_TYPE) in cls._types_mapping 

65 and unknown.get(cls.ID)) 

66 

67 def __init__(self, identifier=None, id_type=None): 

68 # Undocumented. Use subclasses instead. 

69 super(ManagedIdentity, self).__init__({ 

70 self.ID_TYPE: id_type, 

71 self.ID: identifier, 

72 }) 

73 

74 

75class SystemAssignedManagedIdentity(ManagedIdentity): 

76 """Represent a system-assigned managed identity. 

77 

78 It is equivalent to a Python dict of:: 

79 

80 {"ManagedIdentityIdType": "SystemAssigned", "Id": None} 

81 

82 or a JSON blob of:: 

83 

84 {"ManagedIdentityIdType": "SystemAssigned", "Id": null} 

85 """ 

86 def __init__(self): 

87 super(SystemAssignedManagedIdentity, self).__init__(id_type=self.SYSTEM_ASSIGNED) 

88 

89 

90class UserAssignedManagedIdentity(ManagedIdentity): 

91 """Represent a user-assigned managed identity. 

92 

93 Depends on the id you provided, the outcome is equivalent to one of the below:: 

94 

95 {"ManagedIdentityIdType": "ClientId", "Id": "foo"} 

96 {"ManagedIdentityIdType": "ResourceId", "Id": "foo"} 

97 {"ManagedIdentityIdType": "ObjectId", "Id": "foo"} 

98 """ 

99 def __init__(self, *, client_id=None, resource_id=None, object_id=None): 

100 if client_id and not resource_id and not object_id: 

101 super(UserAssignedManagedIdentity, self).__init__( 

102 id_type=self.CLIENT_ID, identifier=client_id) 

103 elif not client_id and resource_id and not object_id: 

104 super(UserAssignedManagedIdentity, self).__init__( 

105 id_type=self.RESOURCE_ID, identifier=resource_id) 

106 elif not client_id and not resource_id and object_id: 

107 super(UserAssignedManagedIdentity, self).__init__( 

108 id_type=self.OBJECT_ID, identifier=object_id) 

109 else: 

110 raise ManagedIdentityError( 

111 "You shall specify one of the three parameters: " 

112 "client_id, resource_id, object_id") 

113 

114 

115class _ThrottledHttpClient(ThrottledHttpClientBase): 

116 def __init__(self, *args, **kwargs): 

117 super(_ThrottledHttpClient, self).__init__(*args, **kwargs) 

118 self.get = IndividualCache( # All MIs (except Cloud Shell) use GETs 

119 mapping=self._expiring_mapping, 

120 key_maker=lambda func, args, kwargs: "REQ {} hash={} 429/5xx/Retry-After".format( 

121 args[0], # It is the endpoint, typically a constant per MI type 

122 self._hash( 

123 # Managed Identity flavors have inconsistent parameters. 

124 # We simply choose to hash them all. 

125 str(kwargs.get("params")) + str(kwargs.get("data"))), 

126 ), 

127 expires_in=RetryAfterParser(5).parse, # 5 seconds default for non-PCA 

128 )(self.get) # Note: Decorate the parent get(), not the http_client.get() 

129 

130 

131class ManagedIdentityClient(object): 

132 """This API encapsulates multiple managed identity back-ends: 

133 VM, App Service, Azure Automation (Runbooks), Azure Function, Service Fabric, 

134 and Azure Arc. 

135 

136 It also provides token cache support. 

137 

138 .. note:: 

139 

140 Cloud Shell support is NOT implemented in this class. 

141 Since MSAL Python 1.18 in May 2022, it has been implemented in 

142 :func:`PublicClientApplication.acquire_token_interactive` via calling pattern 

143 ``PublicClientApplication(...).acquire_token_interactive(scopes=[...], prompt="none")``. 

144 That is appropriate, because Cloud Shell yields a token with 

145 delegated permissions for the end user who has signed in to the Azure Portal 

146 (like what a ``PublicClientApplication`` does), 

147 not a token with application permissions for an app. 

148 """ 

149 __instance, _tenant = None, "managed_identity" # Placeholders 

150 _TOKEN_SOURCE = "token_source" 

151 _TOKEN_SOURCE_IDP = "identity_provider" 

152 _TOKEN_SOURCE_CACHE = "cache" 

153 

154 def __init__( 

155 self, 

156 managed_identity: Union[ 

157 dict, 

158 ManagedIdentity, # Could use Type[ManagedIdentity] but it is deprecated in Python 3.9+ 

159 SystemAssignedManagedIdentity, 

160 UserAssignedManagedIdentity, 

161 ], 

162 *, 

163 http_client, 

164 token_cache=None, 

165 http_cache=None, 

166 client_capabilities: Optional[List[str]] = None, 

167 ): 

168 """Create a managed identity client. 

169 

170 :param managed_identity: 

171 It accepts an instance of :class:`SystemAssignedManagedIdentity` 

172 or :class:`UserAssignedManagedIdentity`. 

173 They are equivalent to a dict with a certain shape, 

174 which may be loaded from a JSON configuration file or an env var. 

175 

176 :param http_client: 

177 An http client object. For example, you can use ``requests.Session()``, 

178 optionally with exponential backoff behavior demonstrated in this recipe:: 

179 

180 import msal, requests 

181 from requests.adapters import HTTPAdapter, Retry 

182 s = requests.Session() 

183 retries = Retry(total=3, backoff_factor=0.1, status_forcelist=[ 

184 429, 500, 501, 502, 503, 504]) 

185 s.mount('https://', HTTPAdapter(max_retries=retries)) 

186 managed_identity = ... 

187 client = msal.ManagedIdentityClient(managed_identity, http_client=s) 

188 

189 :param token_cache: 

190 Optional. It accepts a :class:`msal.TokenCache` instance to store tokens. 

191 It will use an in-memory token cache by default. 

192 

193 :param http_cache: 

194 Optional. It has the same characteristics as the 

195 :paramref:`msal.ClientApplication.http_cache`. 

196 

197 :param list[str] client_capabilities: (optional) 

198 Allows configuration of one or more client capabilities, e.g. ["CP1"]. 

199 

200 Client capability is meant to inform the Microsoft identity platform 

201 (STS) what this client is capable for, 

202 so STS can decide to turn on certain features. 

203 

204 Implementation details: 

205 Client capability in Managed Identity is relayed as-is 

206 via ``xms_cc`` parameter on the wire. 

207 

208 Recipe 1: Hard code a managed identity for your app:: 

209 

210 import msal, requests 

211 client = msal.ManagedIdentityClient( 

212 msal.UserAssignedManagedIdentity(client_id="foo"), 

213 http_client=requests.Session(), 

214 ) 

215 token = client.acquire_token_for_client("resource") 

216 

217 Recipe 2: Write once, run everywhere. 

218 If you use different managed identity on different deployment, 

219 you may use an environment variable (such as MY_MANAGED_IDENTITY_CONFIG) 

220 to store a json blob like 

221 ``{"ManagedIdentityIdType": "ClientId", "Id": "foo"}`` or 

222 ``{"ManagedIdentityIdType": "SystemAssigned", "Id": null}``. 

223 The following app can load managed identity configuration dynamically:: 

224 

225 import json, os, msal, requests 

226 config = os.getenv("MY_MANAGED_IDENTITY_CONFIG") 

227 assert config, "An ENV VAR with value should exist" 

228 client = msal.ManagedIdentityClient( 

229 json.loads(config), 

230 http_client=requests.Session(), 

231 ) 

232 token = client.acquire_token_for_client("resource") 

233 """ 

234 if not ManagedIdentity.is_managed_identity(managed_identity): 

235 raise ManagedIdentityError( 

236 f"Incorrect managed_identity: {managed_identity}") 

237 self._managed_identity = managed_identity 

238 self._http_client = _ThrottledHttpClient( 

239 # This class only throttles excess token acquisition requests. 

240 # It does not provide retry. 

241 # Retry is the http_client or caller's responsibility, not MSAL's. 

242 # 

243 # FWIW, here is the inconsistent retry recommendation. 

244 # 1. Only MI on VM defines exotic 404 and 410 retry recommendations 

245 # ( https://learn.microsoft.com/en-us/entra/identity/managed-identities-azure-resources/how-to-use-vm-token#error-handling ) 

246 # (especially for 410 which was supposed to be a permanent failure). 

247 # 2. MI on Service Fabric specifically suggests to not retry on 404. 

248 # ( https://learn.microsoft.com/en-us/azure/service-fabric/how-to-managed-cluster-managed-identity-service-fabric-app-code#error-handling ) 

249 http_client, 

250 http_cache=http_cache, 

251 ) 

252 self._token_cache = token_cache or TokenCache() 

253 self._client_capabilities = client_capabilities 

254 

255 def _get_instance(self): 

256 if self.__instance is None: 

257 self.__instance = socket.getfqdn() # Moved from class definition to here 

258 return self.__instance 

259 

260 def acquire_token_for_client( 

261 self, 

262 *, 

263 resource: str, # If/when we support scope, resource will become optional 

264 claims_challenge: Optional[str] = None, 

265 ): 

266 """Acquire token for the managed identity. 

267 

268 The result will be automatically cached. 

269 Subsequent calls will automatically search from cache first. 

270 

271 :param resource: The resource for which the token is acquired. 

272 

273 :param claims_challenge: 

274 Optional. 

275 It is a string representation of a JSON object 

276 (which contains lists of claims being requested). 

277 

278 The tenant admin may choose to revoke all Managed Identity tokens, 

279 and then a *claims challenge* will be returned by the target resource, 

280 as a `claims_challenge` directive in the `www-authenticate` header, 

281 even if the app developer did not opt in for the "CP1" client capability. 

282 Upon receiving a `claims_challenge`, MSAL will attempt to acquire a new token. 

283 

284 .. note:: 

285 

286 Known issue: When an Azure VM has only one user-assigned managed identity, 

287 and your app specifies to use system-assigned managed identity, 

288 Azure VM may still return a token for your user-assigned identity. 

289 

290 This is a service-side behavior that cannot be changed by this library. 

291 `Azure VM docs <https://learn.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/how-to-use-vm-token#get-a-token-using-http>`_ 

292 """ 

293 access_token_to_refresh = None # This could become a public parameter in the future 

294 access_token_from_cache = None 

295 client_id_in_cache = self._managed_identity.get( 

296 ManagedIdentity.ID, "SYSTEM_ASSIGNED_MANAGED_IDENTITY") 

297 now = time.time() 

298 if True: # Attempt cache search even if receiving claims_challenge, 

299 # because we want to locate the existing token (if any) and refresh it 

300 matches = self._token_cache.search( 

301 self._token_cache.CredentialType.ACCESS_TOKEN, 

302 target=[resource], 

303 query=dict( 

304 client_id=client_id_in_cache, 

305 environment=self._get_instance(), 

306 realm=self._tenant, 

307 home_account_id=None, 

308 ), 

309 ) 

310 for entry in matches: 

311 expires_in = int(entry["expires_on"]) - now 

312 if expires_in < 5*60: # Then consider it expired 

313 continue # Removal is not necessary, it will be overwritten 

314 if claims_challenge and not access_token_to_refresh: 

315 # Since caller did not pinpoint the token causing claims challenge, 

316 # we have to assume it is the first token we found in cache. 

317 access_token_to_refresh = entry["secret"] 

318 break 

319 logger.debug("Cache hit an AT") 

320 access_token_from_cache = { # Mimic a real response 

321 "access_token": entry["secret"], 

322 "token_type": entry.get("token_type", "Bearer"), 

323 "expires_in": int(expires_in), # OAuth2 specs defines it as int 

324 self._TOKEN_SOURCE: self._TOKEN_SOURCE_CACHE, 

325 } 

326 if "refresh_on" in entry: 

327 access_token_from_cache["refresh_on"] = int(entry["refresh_on"]) 

328 if int(entry["refresh_on"]) < now: # aging 

329 break # With a fallback in hand, we break here to go refresh 

330 return access_token_from_cache # It is still good as new 

331 try: 

332 result = _obtain_token( 

333 self._http_client, self._managed_identity, resource, 

334 access_token_sha256_to_refresh=hashlib.sha256( 

335 access_token_to_refresh.encode("utf-8")).hexdigest() 

336 if access_token_to_refresh else None, 

337 client_capabilities=self._client_capabilities, 

338 ) 

339 if "access_token" in result: 

340 expires_in = result.get("expires_in", 3600) 

341 if "refresh_in" not in result and expires_in >= 7200: 

342 result["refresh_in"] = int(expires_in / 2) 

343 self._token_cache.add(dict( 

344 client_id=client_id_in_cache, 

345 scope=[resource], 

346 token_endpoint="https://{}/{}".format( 

347 self._get_instance(), self._tenant), 

348 response=result, 

349 params={}, 

350 data={}, 

351 )) 

352 if "refresh_in" in result: 

353 result["refresh_on"] = int(now + result["refresh_in"]) 

354 result[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

355 if (result and "error" not in result) or (not access_token_from_cache): 

356 return result 

357 except: # The exact HTTP exception is transportation-layer dependent 

358 # Typically network error. Potential AAD outage? 

359 if not access_token_from_cache: # It means there is no fall back option 

360 raise # We choose to bubble up the exception 

361 return access_token_from_cache 

362 

363 

364def _scope_to_resource(scope): # This is an experimental reasonable-effort approach 

365 u = urlparse(scope) 

366 if u.scheme: 

367 return "{}://{}".format(u.scheme, u.netloc) 

368 return scope # There is no much else we can do here 

369 

370 

371def _get_arc_endpoint(): 

372 if "IDENTITY_ENDPOINT" in os.environ and "IMDS_ENDPOINT" in os.environ: 

373 return os.environ["IDENTITY_ENDPOINT"] 

374 if ( # Defined in https://eng.ms/docs/cloud-ai-platform/azure-core/azure-management-and-platforms/control-plane-bburns/hybrid-resource-provider/azure-arc-for-servers/specs/extension_authoring 

375 sys.platform == "linux" and os.path.exists("/opt/azcmagent/bin/himds") 

376 or sys.platform == "win32" and os.path.exists(os.path.expandvars( 

377 # Avoid Windows-only "%EnvVar%" syntax so that tests can be run on Linux 

378 r"${ProgramFiles}\AzureConnectedMachineAgent\himds.exe" 

379 )) 

380 ): 

381 return "http://localhost:40342/metadata/identity/oauth2/token" 

382 

383 

384APP_SERVICE = object() 

385AZURE_ARC = object() 

386CLOUD_SHELL = object() # In MSAL Python, token acquisition was done by 

387 # PublicClientApplication(...).acquire_token_interactive(..., prompt="none") 

388MACHINE_LEARNING = object() 

389SERVICE_FABRIC = object() 

390DEFAULT_TO_VM = object() # Unknown environment; default to VM; you may want to probe 

391def get_managed_identity_source(): 

392 """Detect the current environment and return the likely identity source. 

393 

394 When this function returns ``CLOUD_SHELL``, you should use 

395 :func:`msal.PublicClientApplication.acquire_token_interactive` with ``prompt="none"`` 

396 to obtain a token. 

397 """ 

398 if ("IDENTITY_ENDPOINT" in os.environ and "IDENTITY_HEADER" in os.environ 

399 and "IDENTITY_SERVER_THUMBPRINT" in os.environ 

400 ): 

401 return SERVICE_FABRIC 

402 if "IDENTITY_ENDPOINT" in os.environ and "IDENTITY_HEADER" in os.environ: 

403 return APP_SERVICE 

404 if "MSI_ENDPOINT" in os.environ and "MSI_SECRET" in os.environ: 

405 return MACHINE_LEARNING 

406 if _get_arc_endpoint(): 

407 return AZURE_ARC 

408 if _is_running_in_cloud_shell(): 

409 return CLOUD_SHELL 

410 return DEFAULT_TO_VM 

411 

412 

413def _obtain_token( 

414 http_client, managed_identity, resource, 

415 *, 

416 access_token_sha256_to_refresh: Optional[str] = None, 

417 client_capabilities: Optional[List[str]] = None, 

418): 

419 if ("IDENTITY_ENDPOINT" in os.environ and "IDENTITY_HEADER" in os.environ 

420 and "IDENTITY_SERVER_THUMBPRINT" in os.environ 

421 ): 

422 if managed_identity: 

423 logger.debug( 

424 "Ignoring managed_identity parameter. " 

425 "Managed Identity in Service Fabric is configured in the cluster, " 

426 "not during runtime. See also " 

427 "https://learn.microsoft.com/en-us/azure/service-fabric/configure-existing-cluster-enable-managed-identity-token-service") 

428 return _obtain_token_on_service_fabric( 

429 http_client, 

430 os.environ["IDENTITY_ENDPOINT"], 

431 os.environ["IDENTITY_HEADER"], 

432 os.environ["IDENTITY_SERVER_THUMBPRINT"], 

433 resource, 

434 access_token_sha256_to_refresh=access_token_sha256_to_refresh, 

435 client_capabilities=client_capabilities, 

436 ) 

437 if "IDENTITY_ENDPOINT" in os.environ and "IDENTITY_HEADER" in os.environ: 

438 return _obtain_token_on_app_service( 

439 http_client, 

440 os.environ["IDENTITY_ENDPOINT"], 

441 os.environ["IDENTITY_HEADER"], 

442 managed_identity, 

443 resource, 

444 ) 

445 if "MSI_ENDPOINT" in os.environ and "MSI_SECRET" in os.environ: 

446 # Back ported from https://github.com/Azure/azure-sdk-for-python/blob/azure-identity_1.15.0/sdk/identity/azure-identity/azure/identity/_credentials/azure_ml.py 

447 return _obtain_token_on_machine_learning( 

448 http_client, 

449 os.environ["MSI_ENDPOINT"], 

450 os.environ["MSI_SECRET"], 

451 managed_identity, 

452 resource, 

453 ) 

454 arc_endpoint = _get_arc_endpoint() 

455 if arc_endpoint: 

456 if ManagedIdentity.is_user_assigned(managed_identity): 

457 raise ManagedIdentityError( # Note: Azure Identity for Python raised exception too 

458 "Invalid managed_identity parameter. " 

459 "Azure Arc supports only system-assigned managed identity, " 

460 "See also " 

461 "https://learn.microsoft.com/en-us/azure/service-fabric/configure-existing-cluster-enable-managed-identity-token-service") 

462 return _obtain_token_on_arc(http_client, arc_endpoint, resource) 

463 return _obtain_token_on_azure_vm(http_client, managed_identity, resource) 

464 

465 

466def _adjust_param(params, managed_identity, types_mapping=None): 

467 # Modify the params dict in place 

468 id_name = (types_mapping or ManagedIdentity._types_mapping).get( 

469 managed_identity.get(ManagedIdentity.ID_TYPE)) 

470 if id_name: 

471 params[id_name] = managed_identity[ManagedIdentity.ID] 

472 

473def _obtain_token_on_azure_vm(http_client, managed_identity, resource): 

474 # Based on https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/how-to-use-vm-token#get-a-token-using-http 

475 logger.debug("Obtaining token via managed identity on Azure VM") 

476 params = { 

477 "api-version": "2018-02-01", 

478 "resource": resource, 

479 } 

480 _adjust_param(params, managed_identity) 

481 resp = http_client.get( 

482 os.getenv( 

483 "AZURE_POD_IDENTITY_AUTHORITY_HOST", "http://169.254.169.254" 

484 ).strip("/") + "/metadata/identity/oauth2/token", 

485 params=params, 

486 headers={"Metadata": "true"}, 

487 ) 

488 try: 

489 payload = json.loads(resp.text) 

490 if payload.get("access_token") and payload.get("expires_in"): 

491 return { # Normalizing the payload into OAuth2 format 

492 "access_token": payload["access_token"], 

493 "expires_in": int(payload["expires_in"]), 

494 "resource": payload.get("resource"), 

495 "token_type": payload.get("token_type", "Bearer"), 

496 } 

497 return payload # It would be {"error": ..., "error_description": ...} according to https://learn.microsoft.com/en-us/entra/identity/managed-identities-azure-resources/how-to-use-vm-token#error-handling 

498 except json.decoder.JSONDecodeError: 

499 logger.debug("IMDS emits unexpected payload: %s", resp.text) 

500 raise 

501 

502def _obtain_token_on_app_service( 

503 http_client, endpoint, identity_header, managed_identity, resource, 

504): 

505 """Obtains token for 

506 `App Service <https://learn.microsoft.com/en-us/azure/app-service/overview-managed-identity?tabs=portal%2Chttp#rest-endpoint-reference>`_, 

507 Azure Functions, and Azure Automation. 

508 """ 

509 # Prerequisite: Create your app service https://docs.microsoft.com/en-us/azure/app-service/quickstart-python 

510 # Assign it a managed identity https://docs.microsoft.com/en-us/azure/app-service/overview-managed-identity?tabs=portal%2Chttp 

511 # SSH into your container for testing https://docs.microsoft.com/en-us/azure/app-service/configure-linux-open-ssh-session 

512 logger.debug("Obtaining token via managed identity on Azure App Service") 

513 params = { 

514 "api-version": "2019-08-01", 

515 "resource": resource, 

516 } 

517 _adjust_param(params, managed_identity, types_mapping={ 

518 ManagedIdentity.CLIENT_ID: "client_id", 

519 ManagedIdentity.RESOURCE_ID: "mi_res_id", # App Service's resource id uses "mi_res_id" 

520 ManagedIdentity.OBJECT_ID: "object_id", 

521 }) 

522 

523 resp = http_client.get( 

524 endpoint, 

525 params=params, 

526 headers={ 

527 "X-IDENTITY-HEADER": identity_header, 

528 "Metadata": "true", # Unnecessary yet harmless for App Service, 

529 # It will be needed by Azure Automation 

530 # https://docs.microsoft.com/en-us/azure/automation/enable-managed-identity-for-automation#get-access-token-for-system-assigned-managed-identity-using-http-get 

531 }, 

532 ) 

533 try: 

534 payload = json.loads(resp.text) 

535 if payload.get("access_token") and payload.get("expires_on"): 

536 return { # Normalizing the payload into OAuth2 format 

537 "access_token": payload["access_token"], 

538 "expires_in": int(payload["expires_on"]) - int(time.time()), 

539 "resource": payload.get("resource"), 

540 "token_type": payload.get("token_type", "Bearer"), 

541 } 

542 return { 

543 "error": "invalid_scope", # Empirically, wrong resource ends up with a vague statusCode=500 

544 "error_description": "{}, {}".format( 

545 payload.get("statusCode"), payload.get("message")), 

546 } 

547 except json.decoder.JSONDecodeError: 

548 logger.debug("IMDS emits unexpected payload: %s", resp.text) 

549 raise 

550 

551def _obtain_token_on_machine_learning( 

552 http_client, endpoint, secret, managed_identity, resource, 

553): 

554 # Could not find protocol docs from https://docs.microsoft.com/en-us/azure/machine-learning 

555 # The following implementation is back ported from Azure Identity 1.15.0 

556 logger.debug("Obtaining token via managed identity on Azure Machine Learning") 

557 params = {"api-version": "2017-09-01", "resource": resource} 

558 _adjust_param(params, managed_identity) 

559 if params["api-version"] == "2017-09-01" and "client_id" in params: 

560 # Workaround for a known bug in Azure ML 2017 API 

561 params["clientid"] = params.pop("client_id") 

562 resp = http_client.get( 

563 endpoint, 

564 params=params, 

565 headers={"secret": secret}, 

566 ) 

567 try: 

568 payload = json.loads(resp.text) 

569 if payload.get("access_token") and payload.get("expires_on"): 

570 return { # Normalizing the payload into OAuth2 format 

571 "access_token": payload["access_token"], 

572 "expires_in": int(payload["expires_on"]) - int(time.time()), 

573 "resource": payload.get("resource"), 

574 "token_type": payload.get("token_type", "Bearer"), 

575 } 

576 return { 

577 "error": "invalid_scope", # TODO: To be tested 

578 "error_description": "{}".format(payload), 

579 } 

580 except json.decoder.JSONDecodeError: 

581 logger.debug("IMDS emits unexpected payload: %s", resp.text) 

582 raise 

583 

584 

585def _obtain_token_on_service_fabric( 

586 http_client, endpoint, identity_header, server_thumbprint, resource, 

587 *, 

588 access_token_sha256_to_refresh: str = None, 

589 client_capabilities: Optional[List[str]] = None, 

590): 

591 """Obtains token for 

592 `Service Fabric <https://learn.microsoft.com/en-us/azure/service-fabric/>`_ 

593 """ 

594 # Deployment https://learn.microsoft.com/en-us/azure/service-fabric/service-fabric-get-started-containers-linux 

595 # See also https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/identity/azure-identity/tests/managed-identity-live/service-fabric/service_fabric.md 

596 # Protocol https://learn.microsoft.com/en-us/azure/service-fabric/how-to-managed-identity-service-fabric-app-code#acquiring-an-access-token-using-rest-api 

597 logger.debug("Obtaining token via managed identity on Azure Service Fabric") 

598 resp = http_client.get( 

599 endpoint, 

600 params={k: v for k, v in { 

601 "api-version": "2019-07-01-preview", 

602 "resource": resource, 

603 "token_sha256_to_refresh": access_token_sha256_to_refresh, 

604 "xms_cc": ",".join(client_capabilities) if client_capabilities else None, 

605 }.items() if v is not None}, 

606 headers={"Secret": identity_header}, 

607 ) 

608 try: 

609 payload = json.loads(resp.text) 

610 if payload.get("access_token") and payload.get("expires_on"): 

611 return { # Normalizing the payload into OAuth2 format 

612 "access_token": payload["access_token"], 

613 "expires_in": int( # Despite the example in docs shows an integer, 

614 payload["expires_on"] # Azure SDK team's test obtained a string. 

615 ) - int(time.time()), 

616 "resource": payload.get("resource"), 

617 "token_type": payload["token_type"], 

618 } 

619 error = payload.get("error", {}) # https://learn.microsoft.com/en-us/azure/service-fabric/how-to-managed-identity-service-fabric-app-code#error-handling 

620 error_mapping = { # Map Service Fabric errors into OAuth2 errors https://www.rfc-editor.org/rfc/rfc6749#section-5.2 

621 "SecretHeaderNotFound": "unauthorized_client", 

622 "ManagedIdentityNotFound": "invalid_client", 

623 "ArgumentNullOrEmpty": "invalid_scope", 

624 } 

625 return { 

626 "error": error_mapping.get(error.get("code"), "invalid_request"), 

627 "error_description": resp.text, 

628 } 

629 except json.decoder.JSONDecodeError: 

630 logger.debug("IMDS emits unexpected payload: %s", resp.text) 

631 raise 

632 

633 

634_supported_arc_platforms_and_their_prefixes = { 

635 "linux": "/var/opt/azcmagent/tokens", 

636 "win32": os.path.expandvars(r"%ProgramData%\AzureConnectedMachineAgent\Tokens"), 

637} 

638 

639class ArcPlatformNotSupportedError(ManagedIdentityError): 

640 pass 

641 

642def _obtain_token_on_arc(http_client, endpoint, resource): 

643 # https://learn.microsoft.com/en-us/azure/azure-arc/servers/managed-identity-authentication 

644 logger.debug("Obtaining token via managed identity on Azure Arc") 

645 resp = http_client.get( 

646 endpoint, 

647 params={"api-version": "2020-06-01", "resource": resource}, 

648 headers={"Metadata": "true"}, 

649 ) 

650 www_auth = "www-authenticate" # Header in lower case 

651 challenge = { 

652 # Normalized to lowercase, because header names are case-insensitive 

653 # https://datatracker.ietf.org/doc/html/rfc7230#section-3.2 

654 k.lower(): v for k, v in resp.headers.items() if k.lower() == www_auth 

655 }.get(www_auth, "").split("=") # Output will be ["Basic realm", "content"] 

656 if not ( # https://datatracker.ietf.org/doc/html/rfc7617#section-2 

657 len(challenge) == 2 and challenge[0].lower() == "basic realm"): 

658 raise ManagedIdentityError( 

659 "Unrecognizable WWW-Authenticate header: {}".format(resp.headers)) 

660 if sys.platform not in _supported_arc_platforms_and_their_prefixes: 

661 raise ArcPlatformNotSupportedError( 

662 f"Platform {sys.platform} was undefined and unsupported") 

663 filename = os.path.join( 

664 # This algorithm is documented in an internal doc https://msazure.visualstudio.com/One/_wiki/wikis/One.wiki/233012/VM-Extension-Authoring-for-Arc?anchor=2.-obtaining-tokens 

665 _supported_arc_platforms_and_their_prefixes[sys.platform], 

666 os.path.splitext(os.path.basename(challenge[1]))[0] + ".key") 

667 if os.stat(filename).st_size > 4096: # Check size BEFORE loading its content 

668 raise ManagedIdentityError("Local key file shall not be larger than 4KB") 

669 with open(filename) as f: 

670 secret = f.read() 

671 response = http_client.get( 

672 endpoint, 

673 params={"api-version": "2020-06-01", "resource": resource}, 

674 headers={"Metadata": "true", "Authorization": "Basic {}".format(secret)}, 

675 ) 

676 try: 

677 payload = json.loads(response.text) 

678 if payload.get("access_token") and payload.get("expires_in"): 

679 # Example: https://learn.microsoft.com/en-us/azure/azure-arc/servers/media/managed-identity-authentication/bash-token-output-example.png 

680 return { 

681 "access_token": payload["access_token"], 

682 "expires_in": int(payload["expires_in"]), 

683 "token_type": payload.get("token_type", "Bearer"), 

684 "resource": payload.get("resource"), 

685 } 

686 except json.decoder.JSONDecodeError: 

687 pass 

688 return { 

689 "error": "invalid_request", 

690 "error_description": response.text, 

691 }