Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/msal/managed_identity.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

208 statements  

1# Copyright (c) Microsoft Corporation. 

2# All rights reserved. 

3# 

4# This code is licensed under the MIT License. 

5import json 

6import logging 

7import os 

8import socket 

9import sys 

10import time 

11from urllib.parse import urlparse # Python 3+ 

12from collections import UserDict # Python 3+ 

13from typing import Union # Needed in Python 3.7 & 3.8 

14from .token_cache import TokenCache 

15from .individual_cache import _IndividualCache as IndividualCache 

16from .throttled_http_client import ThrottledHttpClientBase, RetryAfterParser 

17from .cloudshell import _is_running_in_cloud_shell 

18 

19 

20logger = logging.getLogger(__name__) 

21 

22 

23class ManagedIdentityError(ValueError): 

24 pass 

25 

26 

27class ManagedIdentity(UserDict): 

28 """Feed an instance of this class to :class:`msal.ManagedIdentityClient` 

29 to acquire token for the specified managed identity. 

30 """ 

31 # The key names used in config dict 

32 ID_TYPE = "ManagedIdentityIdType" # Contains keyword ManagedIdentity so its json equivalent will be more readable 

33 ID = "Id" 

34 

35 # Valid values for key ID_TYPE 

36 CLIENT_ID = "ClientId" 

37 RESOURCE_ID = "ResourceId" 

38 OBJECT_ID = "ObjectId" 

39 SYSTEM_ASSIGNED = "SystemAssigned" 

40 

41 _types_mapping = { # Maps type name in configuration to type name on wire 

42 CLIENT_ID: "client_id", 

43 RESOURCE_ID: "mi_res_id", 

44 OBJECT_ID: "object_id", 

45 } 

46 

47 @classmethod 

48 def is_managed_identity(cls, unknown): 

49 return isinstance(unknown, ManagedIdentity) or ( 

50 isinstance(unknown, dict) and cls.ID_TYPE in unknown) 

51 

52 @classmethod 

53 def is_system_assigned(cls, unknown): 

54 return isinstance(unknown, SystemAssignedManagedIdentity) or ( 

55 isinstance(unknown, dict) 

56 and unknown.get(cls.ID_TYPE) == cls.SYSTEM_ASSIGNED) 

57 

58 @classmethod 

59 def is_user_assigned(cls, unknown): 

60 return isinstance(unknown, UserAssignedManagedIdentity) or ( 

61 isinstance(unknown, dict) 

62 and unknown.get(cls.ID_TYPE) in cls._types_mapping 

63 and unknown.get(cls.ID)) 

64 

65 def __init__(self, identifier=None, id_type=None): 

66 # Undocumented. Use subclasses instead. 

67 super(ManagedIdentity, self).__init__({ 

68 self.ID_TYPE: id_type, 

69 self.ID: identifier, 

70 }) 

71 

72 

73class SystemAssignedManagedIdentity(ManagedIdentity): 

74 """Represent a system-assigned managed identity. 

75 

76 It is equivalent to a Python dict of:: 

77 

78 {"ManagedIdentityIdType": "SystemAssigned", "Id": None} 

79 

80 or a JSON blob of:: 

81 

82 {"ManagedIdentityIdType": "SystemAssigned", "Id": null} 

83 """ 

84 def __init__(self): 

85 super(SystemAssignedManagedIdentity, self).__init__(id_type=self.SYSTEM_ASSIGNED) 

86 

87 

88class UserAssignedManagedIdentity(ManagedIdentity): 

89 """Represent a user-assigned managed identity. 

90 

91 Depends on the id you provided, the outcome is equivalent to one of the below:: 

92 

93 {"ManagedIdentityIdType": "ClientId", "Id": "foo"} 

94 {"ManagedIdentityIdType": "ResourceId", "Id": "foo"} 

95 {"ManagedIdentityIdType": "ObjectId", "Id": "foo"} 

96 """ 

97 def __init__(self, *, client_id=None, resource_id=None, object_id=None): 

98 if client_id and not resource_id and not object_id: 

99 super(UserAssignedManagedIdentity, self).__init__( 

100 id_type=self.CLIENT_ID, identifier=client_id) 

101 elif not client_id and resource_id and not object_id: 

102 super(UserAssignedManagedIdentity, self).__init__( 

103 id_type=self.RESOURCE_ID, identifier=resource_id) 

104 elif not client_id and not resource_id and object_id: 

105 super(UserAssignedManagedIdentity, self).__init__( 

106 id_type=self.OBJECT_ID, identifier=object_id) 

107 else: 

108 raise ManagedIdentityError( 

109 "You shall specify one of the three parameters: " 

110 "client_id, resource_id, object_id") 

111 

112 

113class _ThrottledHttpClient(ThrottledHttpClientBase): 

114 def __init__(self, http_client, **kwargs): 

115 super(_ThrottledHttpClient, self).__init__(http_client, **kwargs) 

116 self.get = IndividualCache( # All MIs (except Cloud Shell) use GETs 

117 mapping=self._expiring_mapping, 

118 key_maker=lambda func, args, kwargs: "REQ {} hash={} 429/5xx/Retry-After".format( 

119 args[0], # It is the endpoint, typically a constant per MI type 

120 self._hash( 

121 # Managed Identity flavors have inconsistent parameters. 

122 # We simply choose to hash them all. 

123 str(kwargs.get("params")) + str(kwargs.get("data"))), 

124 ), 

125 expires_in=RetryAfterParser(5).parse, # 5 seconds default for non-PCA 

126 )(http_client.get) 

127 

128 

129class ManagedIdentityClient(object): 

130 """This API encapsulates multiple managed identity back-ends: 

131 VM, App Service, Azure Automation (Runbooks), Azure Function, Service Fabric, 

132 and Azure Arc. 

133 

134 It also provides token cache support. 

135 

136 .. note:: 

137 

138 Cloud Shell support is NOT implemented in this class. 

139 Since MSAL Python 1.18 in May 2022, it has been implemented in 

140 :func:`PublicClientApplication.acquire_token_interactive` via calling pattern 

141 ``PublicClientApplication(...).acquire_token_interactive(scopes=[...], prompt="none")``. 

142 That is appropriate, because Cloud Shell yields a token with 

143 delegated permissions for the end user who has signed in to the Azure Portal 

144 (like what a ``PublicClientApplication`` does), 

145 not a token with application permissions for an app. 

146 """ 

147 _instance, _tenant = socket.getfqdn(), "managed_identity" # Placeholders 

148 

149 def __init__( 

150 self, 

151 managed_identity: Union[ 

152 dict, 

153 ManagedIdentity, # Could use Type[ManagedIdentity] but it is deprecatred in Python 3.9+ 

154 SystemAssignedManagedIdentity, 

155 UserAssignedManagedIdentity, 

156 ], 

157 *, 

158 http_client, 

159 token_cache=None, 

160 http_cache=None, 

161 ): 

162 """Create a managed identity client. 

163 

164 :param managed_identity: 

165 It accepts an instance of :class:`SystemAssignedManagedIdentity` 

166 or :class:`UserAssignedManagedIdentity`. 

167 They are equivalent to a dict with a certain shape, 

168 which may be loaded from a JSON configuration file or an env var. 

169 

170 :param http_client: 

171 An http client object. For example, you can use ``requests.Session()``, 

172 optionally with exponential backoff behavior demonstrated in this recipe:: 

173 

174 import msal, requests 

175 from requests.adapters import HTTPAdapter, Retry 

176 s = requests.Session() 

177 retries = Retry(total=3, backoff_factor=0.1, status_forcelist=[ 

178 429, 500, 501, 502, 503, 504]) 

179 s.mount('https://', HTTPAdapter(max_retries=retries)) 

180 managed_identity = ... 

181 client = msal.ManagedIdentityClient(managed_identity, http_client=s) 

182 

183 :param token_cache: 

184 Optional. It accepts a :class:`msal.TokenCache` instance to store tokens. 

185 It will use an in-memory token cache by default. 

186 

187 :param http_cache: 

188 Optional. It has the same characteristics as the 

189 :paramref:`msal.ClientApplication.http_cache`. 

190 

191 Recipe 1: Hard code a managed identity for your app:: 

192 

193 import msal, requests 

194 client = msal.ManagedIdentityClient( 

195 msal.UserAssignedManagedIdentity(client_id="foo"), 

196 http_client=requests.Session(), 

197 ) 

198 token = client.acquire_token_for_client("resource") 

199 

200 Recipe 2: Write once, run everywhere. 

201 If you use different managed identity on different deployment, 

202 you may use an environment variable (such as MY_MANAGED_IDENTITY_CONFIG) 

203 to store a json blob like 

204 ``{"ManagedIdentityIdType": "ClientId", "Id": "foo"}`` or 

205 ``{"ManagedIdentityIdType": "SystemAssignedManagedIdentity", "Id": null})``. 

206 The following app can load managed identity configuration dynamically:: 

207 

208 import json, os, msal, requests 

209 config = os.getenv("MY_MANAGED_IDENTITY_CONFIG") 

210 assert config, "An ENV VAR with value should exist" 

211 client = msal.ManagedIdentityClient( 

212 json.loads(config), 

213 http_client=requests.Session(), 

214 ) 

215 token = client.acquire_token_for_client("resource") 

216 """ 

217 self._managed_identity = managed_identity 

218 self._http_client = _ThrottledHttpClient( 

219 # This class only throttles excess token acquisition requests. 

220 # It does not provide retry. 

221 # Retry is the http_client or caller's responsibility, not MSAL's. 

222 # 

223 # FWIW, here is the inconsistent retry recommendation. 

224 # 1. Only MI on VM defines exotic 404 and 410 retry recommendations 

225 # ( https://learn.microsoft.com/en-us/entra/identity/managed-identities-azure-resources/how-to-use-vm-token#error-handling ) 

226 # (especially for 410 which was supposed to be a permanent failure). 

227 # 2. MI on Service Fabric specifically suggests to not retry on 404. 

228 # ( https://learn.microsoft.com/en-us/azure/service-fabric/how-to-managed-cluster-managed-identity-service-fabric-app-code#error-handling ) 

229 http_client.http_client # Patch the raw (unpatched) http client 

230 if isinstance(http_client, ThrottledHttpClientBase) else http_client, 

231 http_cache=http_cache, 

232 ) 

233 self._token_cache = token_cache or TokenCache() 

234 

235 def acquire_token_for_client(self, *, resource): # We may support scope in the future 

236 """Acquire token for the managed identity. 

237 

238 The result will be automatically cached. 

239 Subsequent calls will automatically search from cache first. 

240 

241 .. note:: 

242 

243 Known issue: When an Azure VM has only one user-assigned managed identity, 

244 and your app specifies to use system-assigned managed identity, 

245 Azure VM may still return a token for your user-assigned identity. 

246 

247 This is a service-side behavior that cannot be changed by this library. 

248 `Azure VM docs <https://learn.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/how-to-use-vm-token#get-a-token-using-http>`_ 

249 """ 

250 access_token_from_cache = None 

251 client_id_in_cache = self._managed_identity.get( 

252 ManagedIdentity.ID, "SYSTEM_ASSIGNED_MANAGED_IDENTITY") 

253 if True: # Does not offer an "if not force_refresh" option, because 

254 # there would be built-in token cache in the service side anyway 

255 matches = self._token_cache.find( 

256 self._token_cache.CredentialType.ACCESS_TOKEN, 

257 target=[resource], 

258 query=dict( 

259 client_id=client_id_in_cache, 

260 environment=self._instance, 

261 realm=self._tenant, 

262 home_account_id=None, 

263 ), 

264 ) 

265 now = time.time() 

266 for entry in matches: 

267 expires_in = int(entry["expires_on"]) - now 

268 if expires_in < 5*60: # Then consider it expired 

269 continue # Removal is not necessary, it will be overwritten 

270 logger.debug("Cache hit an AT") 

271 access_token_from_cache = { # Mimic a real response 

272 "access_token": entry["secret"], 

273 "token_type": entry.get("token_type", "Bearer"), 

274 "expires_in": int(expires_in), # OAuth2 specs defines it as int 

275 } 

276 if "refresh_on" in entry: 

277 access_token_from_cache["refresh_on"] = int(entry["refresh_on"]) 

278 if int(entry["refresh_on"]) < now: # aging 

279 break # With a fallback in hand, we break here to go refresh 

280 return access_token_from_cache # It is still good as new 

281 try: 

282 result = _obtain_token(self._http_client, self._managed_identity, resource) 

283 if "access_token" in result: 

284 expires_in = result.get("expires_in", 3600) 

285 if "refresh_in" not in result and expires_in >= 7200: 

286 result["refresh_in"] = int(expires_in / 2) 

287 self._token_cache.add(dict( 

288 client_id=client_id_in_cache, 

289 scope=[resource], 

290 token_endpoint="https://{}/{}".format(self._instance, self._tenant), 

291 response=result, 

292 params={}, 

293 data={}, 

294 )) 

295 if "refresh_in" in result: 

296 result["refresh_on"] = int(now + result["refresh_in"]) 

297 if (result and "error" not in result) or (not access_token_from_cache): 

298 return result 

299 except: # The exact HTTP exception is transportation-layer dependent 

300 # Typically network error. Potential AAD outage? 

301 if not access_token_from_cache: # It means there is no fall back option 

302 raise # We choose to bubble up the exception 

303 return access_token_from_cache 

304 

305 

306def _scope_to_resource(scope): # This is an experimental reasonable-effort approach 

307 u = urlparse(scope) 

308 if u.scheme: 

309 return "{}://{}".format(u.scheme, u.netloc) 

310 return scope # There is no much else we can do here 

311 

312 

313APP_SERVICE = object() 

314AZURE_ARC = object() 

315CLOUD_SHELL = object() # In MSAL Python, token acquisition was done by 

316 # PublicClientApplication(...).acquire_token_interactive(..., prompt="none") 

317MACHINE_LEARNING = object() 

318SERVICE_FABRIC = object() 

319DEFAULT_TO_VM = object() # Unknown environment; default to VM; you may want to probe 

320def get_managed_identity_source(): 

321 """Detect the current environment and return the likely identity source. 

322 

323 When this function returns ``CLOUD_SHELL``, you should use 

324 :func:`msal.PublicClientApplication.acquire_token_interactive` with ``prompt="none"`` 

325 to obtain a token. 

326 """ 

327 if ("IDENTITY_ENDPOINT" in os.environ and "IDENTITY_HEADER" in os.environ 

328 and "IDENTITY_SERVER_THUMBPRINT" in os.environ 

329 ): 

330 return SERVICE_FABRIC 

331 if "IDENTITY_ENDPOINT" in os.environ and "IDENTITY_HEADER" in os.environ: 

332 return APP_SERVICE 

333 if "MSI_ENDPOINT" in os.environ and "MSI_SECRET" in os.environ: 

334 return MACHINE_LEARNING 

335 if "IDENTITY_ENDPOINT" in os.environ and "IMDS_ENDPOINT" in os.environ: 

336 return AZURE_ARC 

337 if _is_running_in_cloud_shell(): 

338 return CLOUD_SHELL 

339 return DEFAULT_TO_VM 

340 

341 

342def _obtain_token(http_client, managed_identity, resource): 

343 # A unified low-level API that talks to different Managed Identity 

344 if ("IDENTITY_ENDPOINT" in os.environ and "IDENTITY_HEADER" in os.environ 

345 and "IDENTITY_SERVER_THUMBPRINT" in os.environ 

346 ): 

347 if managed_identity: 

348 logger.debug( 

349 "Ignoring managed_identity parameter. " 

350 "Managed Identity in Service Fabric is configured in the cluster, " 

351 "not during runtime. See also " 

352 "https://learn.microsoft.com/en-us/azure/service-fabric/configure-existing-cluster-enable-managed-identity-token-service") 

353 return _obtain_token_on_service_fabric( 

354 http_client, 

355 os.environ["IDENTITY_ENDPOINT"], 

356 os.environ["IDENTITY_HEADER"], 

357 os.environ["IDENTITY_SERVER_THUMBPRINT"], 

358 resource, 

359 ) 

360 if "IDENTITY_ENDPOINT" in os.environ and "IDENTITY_HEADER" in os.environ: 

361 return _obtain_token_on_app_service( 

362 http_client, 

363 os.environ["IDENTITY_ENDPOINT"], 

364 os.environ["IDENTITY_HEADER"], 

365 managed_identity, 

366 resource, 

367 ) 

368 if "MSI_ENDPOINT" in os.environ and "MSI_SECRET" in os.environ: 

369 # Back ported from https://github.com/Azure/azure-sdk-for-python/blob/azure-identity_1.15.0/sdk/identity/azure-identity/azure/identity/_credentials/azure_ml.py 

370 return _obtain_token_on_machine_learning( 

371 http_client, 

372 os.environ["MSI_ENDPOINT"], 

373 os.environ["MSI_SECRET"], 

374 managed_identity, 

375 resource, 

376 ) 

377 if "IDENTITY_ENDPOINT" in os.environ and "IMDS_ENDPOINT" in os.environ: 

378 if ManagedIdentity.is_user_assigned(managed_identity): 

379 raise ManagedIdentityError( # Note: Azure Identity for Python raised exception too 

380 "Invalid managed_identity parameter. " 

381 "Azure Arc supports only system-assigned managed identity, " 

382 "See also " 

383 "https://learn.microsoft.com/en-us/azure/service-fabric/configure-existing-cluster-enable-managed-identity-token-service") 

384 return _obtain_token_on_arc( 

385 http_client, 

386 os.environ["IDENTITY_ENDPOINT"], 

387 resource, 

388 ) 

389 return _obtain_token_on_azure_vm(http_client, managed_identity, resource) 

390 

391 

392def _adjust_param(params, managed_identity): 

393 # Modify the params dict in place 

394 id_name = ManagedIdentity._types_mapping.get( 

395 managed_identity.get(ManagedIdentity.ID_TYPE)) 

396 if id_name: 

397 params[id_name] = managed_identity[ManagedIdentity.ID] 

398 

399def _obtain_token_on_azure_vm(http_client, managed_identity, resource): 

400 # Based on https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/how-to-use-vm-token#get-a-token-using-http 

401 logger.debug("Obtaining token via managed identity on Azure VM") 

402 params = { 

403 "api-version": "2018-02-01", 

404 "resource": resource, 

405 } 

406 _adjust_param(params, managed_identity) 

407 resp = http_client.get( 

408 "http://169.254.169.254/metadata/identity/oauth2/token", 

409 params=params, 

410 headers={"Metadata": "true"}, 

411 ) 

412 try: 

413 payload = json.loads(resp.text) 

414 if payload.get("access_token") and payload.get("expires_in"): 

415 return { # Normalizing the payload into OAuth2 format 

416 "access_token": payload["access_token"], 

417 "expires_in": int(payload["expires_in"]), 

418 "resource": payload.get("resource"), 

419 "token_type": payload.get("token_type", "Bearer"), 

420 } 

421 return payload # Typically an error, but it is undefined in the doc above 

422 except json.decoder.JSONDecodeError: 

423 logger.debug("IMDS emits unexpected payload: %s", resp.text) 

424 raise 

425 

426def _obtain_token_on_app_service( 

427 http_client, endpoint, identity_header, managed_identity, resource, 

428): 

429 """Obtains token for 

430 `App Service <https://learn.microsoft.com/en-us/azure/app-service/overview-managed-identity?tabs=portal%2Chttp#rest-endpoint-reference>`_, 

431 Azure Functions, and Azure Automation. 

432 """ 

433 # Prerequisite: Create your app service https://docs.microsoft.com/en-us/azure/app-service/quickstart-python 

434 # Assign it a managed identity https://docs.microsoft.com/en-us/azure/app-service/overview-managed-identity?tabs=portal%2Chttp 

435 # SSH into your container for testing https://docs.microsoft.com/en-us/azure/app-service/configure-linux-open-ssh-session 

436 logger.debug("Obtaining token via managed identity on Azure App Service") 

437 params = { 

438 "api-version": "2019-08-01", 

439 "resource": resource, 

440 } 

441 _adjust_param(params, managed_identity) 

442 resp = http_client.get( 

443 endpoint, 

444 params=params, 

445 headers={ 

446 "X-IDENTITY-HEADER": identity_header, 

447 "Metadata": "true", # Unnecessary yet harmless for App Service, 

448 # It will be needed by Azure Automation 

449 # https://docs.microsoft.com/en-us/azure/automation/enable-managed-identity-for-automation#get-access-token-for-system-assigned-managed-identity-using-http-get 

450 }, 

451 ) 

452 try: 

453 payload = json.loads(resp.text) 

454 if payload.get("access_token") and payload.get("expires_on"): 

455 return { # Normalizing the payload into OAuth2 format 

456 "access_token": payload["access_token"], 

457 "expires_in": int(payload["expires_on"]) - int(time.time()), 

458 "resource": payload.get("resource"), 

459 "token_type": payload.get("token_type", "Bearer"), 

460 } 

461 return { 

462 "error": "invalid_scope", # Empirically, wrong resource ends up with a vague statusCode=500 

463 "error_description": "{}, {}".format( 

464 payload.get("statusCode"), payload.get("message")), 

465 } 

466 except json.decoder.JSONDecodeError: 

467 logger.debug("IMDS emits unexpected payload: %s", resp.text) 

468 raise 

469 

470def _obtain_token_on_machine_learning( 

471 http_client, endpoint, secret, managed_identity, resource, 

472): 

473 # Could not find protocol docs from https://docs.microsoft.com/en-us/azure/machine-learning 

474 # The following implementation is back ported from Azure Identity 1.15.0 

475 logger.debug("Obtaining token via managed identity on Azure Machine Learning") 

476 params = {"api-version": "2017-09-01", "resource": resource} 

477 _adjust_param(params, managed_identity) 

478 if params["api-version"] == "2017-09-01" and "client_id" in params: 

479 # Workaround for a known bug in Azure ML 2017 API 

480 params["clientid"] = params.pop("client_id") 

481 resp = http_client.get( 

482 endpoint, 

483 params=params, 

484 headers={"secret": secret}, 

485 ) 

486 try: 

487 payload = json.loads(resp.text) 

488 if payload.get("access_token") and payload.get("expires_on"): 

489 return { # Normalizing the payload into OAuth2 format 

490 "access_token": payload["access_token"], 

491 "expires_in": int(payload["expires_on"]) - int(time.time()), 

492 "resource": payload.get("resource"), 

493 "token_type": payload.get("token_type", "Bearer"), 

494 } 

495 return { 

496 "error": "invalid_scope", # TODO: To be tested 

497 "error_description": "{}".format(payload), 

498 } 

499 except json.decoder.JSONDecodeError: 

500 logger.debug("IMDS emits unexpected payload: %s", resp.text) 

501 raise 

502 

503 

504def _obtain_token_on_service_fabric( 

505 http_client, endpoint, identity_header, server_thumbprint, resource, 

506): 

507 """Obtains token for 

508 `Service Fabric <https://learn.microsoft.com/en-us/azure/service-fabric/>`_ 

509 """ 

510 # Deployment https://learn.microsoft.com/en-us/azure/service-fabric/service-fabric-get-started-containers-linux 

511 # See also https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/identity/azure-identity/tests/managed-identity-live/service-fabric/service_fabric.md 

512 # Protocol https://learn.microsoft.com/en-us/azure/service-fabric/how-to-managed-identity-service-fabric-app-code#acquiring-an-access-token-using-rest-api 

513 logger.debug("Obtaining token via managed identity on Azure Service Fabric") 

514 resp = http_client.get( 

515 endpoint, 

516 params={"api-version": "2019-07-01-preview", "resource": resource}, 

517 headers={"Secret": identity_header}, 

518 ) 

519 try: 

520 payload = json.loads(resp.text) 

521 if payload.get("access_token") and payload.get("expires_on"): 

522 return { # Normalizing the payload into OAuth2 format 

523 "access_token": payload["access_token"], 

524 "expires_in": int( # Despite the example in docs shows an integer, 

525 payload["expires_on"] # Azure SDK team's test obtained a string. 

526 ) - int(time.time()), 

527 "resource": payload.get("resource"), 

528 "token_type": payload["token_type"], 

529 } 

530 error = payload.get("error", {}) # https://learn.microsoft.com/en-us/azure/service-fabric/how-to-managed-identity-service-fabric-app-code#error-handling 

531 error_mapping = { # Map Service Fabric errors into OAuth2 errors https://www.rfc-editor.org/rfc/rfc6749#section-5.2 

532 "SecretHeaderNotFound": "unauthorized_client", 

533 "ManagedIdentityNotFound": "invalid_client", 

534 "ArgumentNullOrEmpty": "invalid_scope", 

535 } 

536 return { 

537 "error": error_mapping.get(payload["error"]["code"], "invalid_request"), 

538 "error_description": resp.text, 

539 } 

540 except json.decoder.JSONDecodeError: 

541 logger.debug("IMDS emits unexpected payload: %s", resp.text) 

542 raise 

543 

544 

545_supported_arc_platforms_and_their_prefixes = { 

546 "linux": "/var/opt/azcmagent/tokens", 

547 "win32": os.path.expandvars(r"%ProgramData%\AzureConnectedMachineAgent\Tokens"), 

548} 

549 

550class ArcPlatformNotSupportedError(ManagedIdentityError): 

551 pass 

552 

553def _obtain_token_on_arc(http_client, endpoint, resource): 

554 # https://learn.microsoft.com/en-us/azure/azure-arc/servers/managed-identity-authentication 

555 logger.debug("Obtaining token via managed identity on Azure Arc") 

556 resp = http_client.get( 

557 endpoint, 

558 params={"api-version": "2020-06-01", "resource": resource}, 

559 headers={"Metadata": "true"}, 

560 ) 

561 www_auth = "www-authenticate" # Header in lower case 

562 challenge = { 

563 # Normalized to lowercase, because header names are case-insensitive 

564 # https://datatracker.ietf.org/doc/html/rfc7230#section-3.2 

565 k.lower(): v for k, v in resp.headers.items() if k.lower() == www_auth 

566 }.get(www_auth, "").split("=") # Output will be ["Basic realm", "content"] 

567 if not ( # https://datatracker.ietf.org/doc/html/rfc7617#section-2 

568 len(challenge) == 2 and challenge[0].lower() == "basic realm"): 

569 raise ManagedIdentityError( 

570 "Unrecognizable WWW-Authenticate header: {}".format(resp.headers)) 

571 if sys.platform not in _supported_arc_platforms_and_their_prefixes: 

572 raise ArcPlatformNotSupportedError( 

573 f"Platform {sys.platform} was undefined and unsupported") 

574 filename = os.path.join( 

575 # This algorithm is documented in an internal doc https://msazure.visualstudio.com/One/_wiki/wikis/One.wiki/233012/VM-Extension-Authoring-for-Arc?anchor=2.-obtaining-tokens 

576 _supported_arc_platforms_and_their_prefixes[sys.platform], 

577 os.path.splitext(os.path.basename(challenge[1]))[0] + ".key") 

578 if os.stat(filename).st_size > 4096: # Check size BEFORE loading its content 

579 raise ManagedIdentityError("Local key file shall not be larger than 4KB") 

580 with open(filename) as f: 

581 secret = f.read() 

582 response = http_client.get( 

583 endpoint, 

584 params={"api-version": "2020-06-01", "resource": resource}, 

585 headers={"Metadata": "true", "Authorization": "Basic {}".format(secret)}, 

586 ) 

587 try: 

588 payload = json.loads(response.text) 

589 if payload.get("access_token") and payload.get("expires_in"): 

590 # Example: https://learn.microsoft.com/en-us/azure/azure-arc/servers/media/managed-identity-authentication/bash-token-output-example.png 

591 return { 

592 "access_token": payload["access_token"], 

593 "expires_in": int(payload["expires_in"]), 

594 "token_type": payload.get("token_type", "Bearer"), 

595 "resource": payload.get("resource"), 

596 } 

597 except json.decoder.JSONDecodeError: 

598 pass 

599 return { 

600 "error": "invalid_request", 

601 "error_description": response.text, 

602 } 

603