Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/msal/managed_identity.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

225 statements  

1# Copyright (c) Microsoft Corporation. 

2# All rights reserved. 

3# 

4# This code is licensed under the MIT License. 

5import json 

6import logging 

7import os 

8import socket 

9import sys 

10import time 

11from urllib.parse import urlparse # Python 3+ 

12from collections import UserDict # Python 3+ 

13from typing import Optional, Union # Needed in Python 3.7 & 3.8 

14from .token_cache import TokenCache 

15from .individual_cache import _IndividualCache as IndividualCache 

16from .throttled_http_client import ThrottledHttpClientBase, RetryAfterParser 

17from .cloudshell import _is_running_in_cloud_shell 

18 

19 

20logger = logging.getLogger(__name__) 

21 

22 

23class ManagedIdentityError(ValueError): 

24 pass 

25 

26 

27class ManagedIdentity(UserDict): 

28 """Feed an instance of this class to :class:`msal.ManagedIdentityClient` 

29 to acquire token for the specified managed identity. 

30 """ 

31 # The key names used in config dict 

32 ID_TYPE = "ManagedIdentityIdType" # Contains keyword ManagedIdentity so its json equivalent will be more readable 

33 ID = "Id" 

34 

35 # Valid values for key ID_TYPE 

36 CLIENT_ID = "ClientId" 

37 RESOURCE_ID = "ResourceId" 

38 OBJECT_ID = "ObjectId" 

39 SYSTEM_ASSIGNED = "SystemAssigned" 

40 

41 _types_mapping = { # Maps type name in configuration to type name on wire 

42 CLIENT_ID: "client_id", 

43 RESOURCE_ID: "msi_res_id", # VM's IMDS prefers msi_res_id https://github.com/Azure/azure-rest-api-specs/blob/dba6ed1f03bda88ac6884c0a883246446cc72495/specification/imds/data-plane/Microsoft.InstanceMetadataService/stable/2018-10-01/imds.json#L233-L239 

44 OBJECT_ID: "object_id", 

45 } 

46 

47 @classmethod 

48 def is_managed_identity(cls, unknown): 

49 return (isinstance(unknown, ManagedIdentity) 

50 or cls.is_system_assigned(unknown) 

51 or cls.is_user_assigned(unknown)) 

52 

53 @classmethod 

54 def is_system_assigned(cls, unknown): 

55 return isinstance(unknown, SystemAssignedManagedIdentity) or ( 

56 isinstance(unknown, dict) 

57 and unknown.get(cls.ID_TYPE) == cls.SYSTEM_ASSIGNED) 

58 

59 @classmethod 

60 def is_user_assigned(cls, unknown): 

61 return isinstance(unknown, UserAssignedManagedIdentity) or ( 

62 isinstance(unknown, dict) 

63 and unknown.get(cls.ID_TYPE) in cls._types_mapping 

64 and unknown.get(cls.ID)) 

65 

66 def __init__(self, identifier=None, id_type=None): 

67 # Undocumented. Use subclasses instead. 

68 super(ManagedIdentity, self).__init__({ 

69 self.ID_TYPE: id_type, 

70 self.ID: identifier, 

71 }) 

72 

73 

74class SystemAssignedManagedIdentity(ManagedIdentity): 

75 """Represent a system-assigned managed identity. 

76 

77 It is equivalent to a Python dict of:: 

78 

79 {"ManagedIdentityIdType": "SystemAssigned", "Id": None} 

80 

81 or a JSON blob of:: 

82 

83 {"ManagedIdentityIdType": "SystemAssigned", "Id": null} 

84 """ 

85 def __init__(self): 

86 super(SystemAssignedManagedIdentity, self).__init__(id_type=self.SYSTEM_ASSIGNED) 

87 

88 

89class UserAssignedManagedIdentity(ManagedIdentity): 

90 """Represent a user-assigned managed identity. 

91 

92 Depends on the id you provided, the outcome is equivalent to one of the below:: 

93 

94 {"ManagedIdentityIdType": "ClientId", "Id": "foo"} 

95 {"ManagedIdentityIdType": "ResourceId", "Id": "foo"} 

96 {"ManagedIdentityIdType": "ObjectId", "Id": "foo"} 

97 """ 

98 def __init__(self, *, client_id=None, resource_id=None, object_id=None): 

99 if client_id and not resource_id and not object_id: 

100 super(UserAssignedManagedIdentity, self).__init__( 

101 id_type=self.CLIENT_ID, identifier=client_id) 

102 elif not client_id and resource_id and not object_id: 

103 super(UserAssignedManagedIdentity, self).__init__( 

104 id_type=self.RESOURCE_ID, identifier=resource_id) 

105 elif not client_id and not resource_id and object_id: 

106 super(UserAssignedManagedIdentity, self).__init__( 

107 id_type=self.OBJECT_ID, identifier=object_id) 

108 else: 

109 raise ManagedIdentityError( 

110 "You shall specify one of the three parameters: " 

111 "client_id, resource_id, object_id") 

112 

113 

114class _ThrottledHttpClient(ThrottledHttpClientBase): 

115 def __init__(self, http_client, **kwargs): 

116 super(_ThrottledHttpClient, self).__init__(http_client, **kwargs) 

117 self.get = IndividualCache( # All MIs (except Cloud Shell) use GETs 

118 mapping=self._expiring_mapping, 

119 key_maker=lambda func, args, kwargs: "REQ {} hash={} 429/5xx/Retry-After".format( 

120 args[0], # It is the endpoint, typically a constant per MI type 

121 self._hash( 

122 # Managed Identity flavors have inconsistent parameters. 

123 # We simply choose to hash them all. 

124 str(kwargs.get("params")) + str(kwargs.get("data"))), 

125 ), 

126 expires_in=RetryAfterParser(5).parse, # 5 seconds default for non-PCA 

127 )(http_client.get) 

128 

129 

130class ManagedIdentityClient(object): 

131 """This API encapsulates multiple managed identity back-ends: 

132 VM, App Service, Azure Automation (Runbooks), Azure Function, Service Fabric, 

133 and Azure Arc. 

134 

135 It also provides token cache support. 

136 

137 .. note:: 

138 

139 Cloud Shell support is NOT implemented in this class. 

140 Since MSAL Python 1.18 in May 2022, it has been implemented in 

141 :func:`PublicClientApplication.acquire_token_interactive` via calling pattern 

142 ``PublicClientApplication(...).acquire_token_interactive(scopes=[...], prompt="none")``. 

143 That is appropriate, because Cloud Shell yields a token with 

144 delegated permissions for the end user who has signed in to the Azure Portal 

145 (like what a ``PublicClientApplication`` does), 

146 not a token with application permissions for an app. 

147 """ 

148 __instance, _tenant = None, "managed_identity" # Placeholders 

149 _TOKEN_SOURCE = "token_source" 

150 _TOKEN_SOURCE_IDP = "identity_provider" 

151 _TOKEN_SOURCE_CACHE = "cache" 

152 

153 def __init__( 

154 self, 

155 managed_identity: Union[ 

156 dict, 

157 ManagedIdentity, # Could use Type[ManagedIdentity] but it is deprecated in Python 3.9+ 

158 SystemAssignedManagedIdentity, 

159 UserAssignedManagedIdentity, 

160 ], 

161 *, 

162 http_client, 

163 token_cache=None, 

164 http_cache=None, 

165 ): 

166 """Create a managed identity client. 

167 

168 :param managed_identity: 

169 It accepts an instance of :class:`SystemAssignedManagedIdentity` 

170 or :class:`UserAssignedManagedIdentity`. 

171 They are equivalent to a dict with a certain shape, 

172 which may be loaded from a JSON configuration file or an env var. 

173 

174 :param http_client: 

175 An http client object. For example, you can use ``requests.Session()``, 

176 optionally with exponential backoff behavior demonstrated in this recipe:: 

177 

178 import msal, requests 

179 from requests.adapters import HTTPAdapter, Retry 

180 s = requests.Session() 

181 retries = Retry(total=3, backoff_factor=0.1, status_forcelist=[ 

182 429, 500, 501, 502, 503, 504]) 

183 s.mount('https://', HTTPAdapter(max_retries=retries)) 

184 managed_identity = ... 

185 client = msal.ManagedIdentityClient(managed_identity, http_client=s) 

186 

187 :param token_cache: 

188 Optional. It accepts a :class:`msal.TokenCache` instance to store tokens. 

189 It will use an in-memory token cache by default. 

190 

191 :param http_cache: 

192 Optional. It has the same characteristics as the 

193 :paramref:`msal.ClientApplication.http_cache`. 

194 

195 Recipe 1: Hard code a managed identity for your app:: 

196 

197 import msal, requests 

198 client = msal.ManagedIdentityClient( 

199 msal.UserAssignedManagedIdentity(client_id="foo"), 

200 http_client=requests.Session(), 

201 ) 

202 token = client.acquire_token_for_client("resource") 

203 

204 Recipe 2: Write once, run everywhere. 

205 If you use different managed identity on different deployment, 

206 you may use an environment variable (such as MY_MANAGED_IDENTITY_CONFIG) 

207 to store a json blob like 

208 ``{"ManagedIdentityIdType": "ClientId", "Id": "foo"}`` or 

209 ``{"ManagedIdentityIdType": "SystemAssigned", "Id": null}``. 

210 The following app can load managed identity configuration dynamically:: 

211 

212 import json, os, msal, requests 

213 config = os.getenv("MY_MANAGED_IDENTITY_CONFIG") 

214 assert config, "An ENV VAR with value should exist" 

215 client = msal.ManagedIdentityClient( 

216 json.loads(config), 

217 http_client=requests.Session(), 

218 ) 

219 token = client.acquire_token_for_client("resource") 

220 """ 

221 if not ManagedIdentity.is_managed_identity(managed_identity): 

222 raise ManagedIdentityError( 

223 f"Incorrect managed_identity: {managed_identity}") 

224 self._managed_identity = managed_identity 

225 self._http_client = _ThrottledHttpClient( 

226 # This class only throttles excess token acquisition requests. 

227 # It does not provide retry. 

228 # Retry is the http_client or caller's responsibility, not MSAL's. 

229 # 

230 # FWIW, here is the inconsistent retry recommendation. 

231 # 1. Only MI on VM defines exotic 404 and 410 retry recommendations 

232 # ( https://learn.microsoft.com/en-us/entra/identity/managed-identities-azure-resources/how-to-use-vm-token#error-handling ) 

233 # (especially for 410 which was supposed to be a permanent failure). 

234 # 2. MI on Service Fabric specifically suggests to not retry on 404. 

235 # ( https://learn.microsoft.com/en-us/azure/service-fabric/how-to-managed-cluster-managed-identity-service-fabric-app-code#error-handling ) 

236 http_client.http_client # Patch the raw (unpatched) http client 

237 if isinstance(http_client, ThrottledHttpClientBase) else http_client, 

238 http_cache=http_cache, 

239 ) 

240 self._token_cache = token_cache or TokenCache() 

241 

242 def _get_instance(self): 

243 if self.__instance is None: 

244 self.__instance = socket.getfqdn() # Moved from class definition to here 

245 return self.__instance 

246 

247 def acquire_token_for_client( 

248 self, 

249 *, 

250 resource: str, # If/when we support scope, resource will become optional 

251 claims_challenge: Optional[str] = None, 

252 ): 

253 """Acquire token for the managed identity. 

254 

255 The result will be automatically cached. 

256 Subsequent calls will automatically search from cache first. 

257 

258 :param resource: The resource for which the token is acquired. 

259 

260 :param claims_challenge: 

261 Optional. 

262 It is a string representation of a JSON object 

263 (which contains lists of claims being requested). 

264 

265 The tenant admin may choose to revoke all Managed Identity tokens, 

266 and then a *claims challenge* will be returned by the target resource, 

267 as a `claims_challenge` directive in the `www-authenticate` header, 

268 even if the app developer did not opt in for the "CP1" client capability. 

269 Upon receiving a `claims_challenge`, MSAL will skip a token cache read, 

270 and will attempt to acquire a new token. 

271 

272 .. note:: 

273 

274 Known issue: When an Azure VM has only one user-assigned managed identity, 

275 and your app specifies to use system-assigned managed identity, 

276 Azure VM may still return a token for your user-assigned identity. 

277 

278 This is a service-side behavior that cannot be changed by this library. 

279 `Azure VM docs <https://learn.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/how-to-use-vm-token#get-a-token-using-http>`_ 

280 """ 

281 access_token_from_cache = None 

282 client_id_in_cache = self._managed_identity.get( 

283 ManagedIdentity.ID, "SYSTEM_ASSIGNED_MANAGED_IDENTITY") 

284 now = time.time() 

285 if not claims_challenge: # Then attempt token cache search 

286 matches = self._token_cache.find( 

287 self._token_cache.CredentialType.ACCESS_TOKEN, 

288 target=[resource], 

289 query=dict( 

290 client_id=client_id_in_cache, 

291 environment=self._get_instance(), 

292 realm=self._tenant, 

293 home_account_id=None, 

294 ), 

295 ) 

296 for entry in matches: 

297 expires_in = int(entry["expires_on"]) - now 

298 if expires_in < 5*60: # Then consider it expired 

299 continue # Removal is not necessary, it will be overwritten 

300 logger.debug("Cache hit an AT") 

301 access_token_from_cache = { # Mimic a real response 

302 "access_token": entry["secret"], 

303 "token_type": entry.get("token_type", "Bearer"), 

304 "expires_in": int(expires_in), # OAuth2 specs defines it as int 

305 self._TOKEN_SOURCE: self._TOKEN_SOURCE_CACHE, 

306 } 

307 if "refresh_on" in entry: 

308 access_token_from_cache["refresh_on"] = int(entry["refresh_on"]) 

309 if int(entry["refresh_on"]) < now: # aging 

310 break # With a fallback in hand, we break here to go refresh 

311 return access_token_from_cache # It is still good as new 

312 try: 

313 result = _obtain_token(self._http_client, self._managed_identity, resource) 

314 if "access_token" in result: 

315 expires_in = result.get("expires_in", 3600) 

316 if "refresh_in" not in result and expires_in >= 7200: 

317 result["refresh_in"] = int(expires_in / 2) 

318 self._token_cache.add(dict( 

319 client_id=client_id_in_cache, 

320 scope=[resource], 

321 token_endpoint="https://{}/{}".format( 

322 self._get_instance(), self._tenant), 

323 response=result, 

324 params={}, 

325 data={}, 

326 )) 

327 if "refresh_in" in result: 

328 result["refresh_on"] = int(now + result["refresh_in"]) 

329 result[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

330 if (result and "error" not in result) or (not access_token_from_cache): 

331 return result 

332 except: # The exact HTTP exception is transportation-layer dependent 

333 # Typically network error. Potential AAD outage? 

334 if not access_token_from_cache: # It means there is no fall back option 

335 raise # We choose to bubble up the exception 

336 return access_token_from_cache 

337 

338 

339def _scope_to_resource(scope): # This is an experimental reasonable-effort approach 

340 u = urlparse(scope) 

341 if u.scheme: 

342 return "{}://{}".format(u.scheme, u.netloc) 

343 return scope # There is no much else we can do here 

344 

345 

346def _get_arc_endpoint(): 

347 if "IDENTITY_ENDPOINT" in os.environ and "IMDS_ENDPOINT" in os.environ: 

348 return os.environ["IDENTITY_ENDPOINT"] 

349 if ( # Defined in https://eng.ms/docs/cloud-ai-platform/azure-core/azure-management-and-platforms/control-plane-bburns/hybrid-resource-provider/azure-arc-for-servers/specs/extension_authoring 

350 sys.platform == "linux" and os.path.exists("/opt/azcmagent/bin/himds") 

351 or sys.platform == "win32" and os.path.exists(os.path.expandvars( 

352 # Avoid Windows-only "%EnvVar%" syntax so that tests can be run on Linux 

353 r"${ProgramFiles}\AzureConnectedMachineAgent\himds.exe" 

354 )) 

355 ): 

356 return "http://localhost:40342/metadata/identity/oauth2/token" 

357 

358 

359APP_SERVICE = object() 

360AZURE_ARC = object() 

361CLOUD_SHELL = object() # In MSAL Python, token acquisition was done by 

362 # PublicClientApplication(...).acquire_token_interactive(..., prompt="none") 

363MACHINE_LEARNING = object() 

364SERVICE_FABRIC = object() 

365DEFAULT_TO_VM = object() # Unknown environment; default to VM; you may want to probe 

366def get_managed_identity_source(): 

367 """Detect the current environment and return the likely identity source. 

368 

369 When this function returns ``CLOUD_SHELL``, you should use 

370 :func:`msal.PublicClientApplication.acquire_token_interactive` with ``prompt="none"`` 

371 to obtain a token. 

372 """ 

373 if ("IDENTITY_ENDPOINT" in os.environ and "IDENTITY_HEADER" in os.environ 

374 and "IDENTITY_SERVER_THUMBPRINT" in os.environ 

375 ): 

376 return SERVICE_FABRIC 

377 if "IDENTITY_ENDPOINT" in os.environ and "IDENTITY_HEADER" in os.environ: 

378 return APP_SERVICE 

379 if "MSI_ENDPOINT" in os.environ and "MSI_SECRET" in os.environ: 

380 return MACHINE_LEARNING 

381 if _get_arc_endpoint(): 

382 return AZURE_ARC 

383 if _is_running_in_cloud_shell(): 

384 return CLOUD_SHELL 

385 return DEFAULT_TO_VM 

386 

387 

388def _obtain_token(http_client, managed_identity, resource): 

389 # A unified low-level API that talks to different Managed Identity 

390 if ("IDENTITY_ENDPOINT" in os.environ and "IDENTITY_HEADER" in os.environ 

391 and "IDENTITY_SERVER_THUMBPRINT" in os.environ 

392 ): 

393 if managed_identity: 

394 logger.debug( 

395 "Ignoring managed_identity parameter. " 

396 "Managed Identity in Service Fabric is configured in the cluster, " 

397 "not during runtime. See also " 

398 "https://learn.microsoft.com/en-us/azure/service-fabric/configure-existing-cluster-enable-managed-identity-token-service") 

399 return _obtain_token_on_service_fabric( 

400 http_client, 

401 os.environ["IDENTITY_ENDPOINT"], 

402 os.environ["IDENTITY_HEADER"], 

403 os.environ["IDENTITY_SERVER_THUMBPRINT"], 

404 resource, 

405 ) 

406 if "IDENTITY_ENDPOINT" in os.environ and "IDENTITY_HEADER" in os.environ: 

407 return _obtain_token_on_app_service( 

408 http_client, 

409 os.environ["IDENTITY_ENDPOINT"], 

410 os.environ["IDENTITY_HEADER"], 

411 managed_identity, 

412 resource, 

413 ) 

414 if "MSI_ENDPOINT" in os.environ and "MSI_SECRET" in os.environ: 

415 # Back ported from https://github.com/Azure/azure-sdk-for-python/blob/azure-identity_1.15.0/sdk/identity/azure-identity/azure/identity/_credentials/azure_ml.py 

416 return _obtain_token_on_machine_learning( 

417 http_client, 

418 os.environ["MSI_ENDPOINT"], 

419 os.environ["MSI_SECRET"], 

420 managed_identity, 

421 resource, 

422 ) 

423 arc_endpoint = _get_arc_endpoint() 

424 if arc_endpoint: 

425 if ManagedIdentity.is_user_assigned(managed_identity): 

426 raise ManagedIdentityError( # Note: Azure Identity for Python raised exception too 

427 "Invalid managed_identity parameter. " 

428 "Azure Arc supports only system-assigned managed identity, " 

429 "See also " 

430 "https://learn.microsoft.com/en-us/azure/service-fabric/configure-existing-cluster-enable-managed-identity-token-service") 

431 return _obtain_token_on_arc(http_client, arc_endpoint, resource) 

432 return _obtain_token_on_azure_vm(http_client, managed_identity, resource) 

433 

434 

435def _adjust_param(params, managed_identity, types_mapping=None): 

436 # Modify the params dict in place 

437 id_name = (types_mapping or ManagedIdentity._types_mapping).get( 

438 managed_identity.get(ManagedIdentity.ID_TYPE)) 

439 if id_name: 

440 params[id_name] = managed_identity[ManagedIdentity.ID] 

441 

442def _obtain_token_on_azure_vm(http_client, managed_identity, resource): 

443 # Based on https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/how-to-use-vm-token#get-a-token-using-http 

444 logger.debug("Obtaining token via managed identity on Azure VM") 

445 params = { 

446 "api-version": "2018-02-01", 

447 "resource": resource, 

448 } 

449 _adjust_param(params, managed_identity) 

450 resp = http_client.get( 

451 "http://169.254.169.254/metadata/identity/oauth2/token", 

452 params=params, 

453 headers={"Metadata": "true"}, 

454 ) 

455 try: 

456 payload = json.loads(resp.text) 

457 if payload.get("access_token") and payload.get("expires_in"): 

458 return { # Normalizing the payload into OAuth2 format 

459 "access_token": payload["access_token"], 

460 "expires_in": int(payload["expires_in"]), 

461 "resource": payload.get("resource"), 

462 "token_type": payload.get("token_type", "Bearer"), 

463 } 

464 return payload # It would be {"error": ..., "error_description": ...} according to https://learn.microsoft.com/en-us/entra/identity/managed-identities-azure-resources/how-to-use-vm-token#error-handling 

465 except json.decoder.JSONDecodeError: 

466 logger.debug("IMDS emits unexpected payload: %s", resp.text) 

467 raise 

468 

469def _obtain_token_on_app_service( 

470 http_client, endpoint, identity_header, managed_identity, resource, 

471): 

472 """Obtains token for 

473 `App Service <https://learn.microsoft.com/en-us/azure/app-service/overview-managed-identity?tabs=portal%2Chttp#rest-endpoint-reference>`_, 

474 Azure Functions, and Azure Automation. 

475 """ 

476 # Prerequisite: Create your app service https://docs.microsoft.com/en-us/azure/app-service/quickstart-python 

477 # Assign it a managed identity https://docs.microsoft.com/en-us/azure/app-service/overview-managed-identity?tabs=portal%2Chttp 

478 # SSH into your container for testing https://docs.microsoft.com/en-us/azure/app-service/configure-linux-open-ssh-session 

479 logger.debug("Obtaining token via managed identity on Azure App Service") 

480 params = { 

481 "api-version": "2019-08-01", 

482 "resource": resource, 

483 } 

484 _adjust_param(params, managed_identity, types_mapping={ 

485 ManagedIdentity.CLIENT_ID: "client_id", 

486 ManagedIdentity.RESOURCE_ID: "mi_res_id", # App Service's resource id uses "mi_res_id" 

487 ManagedIdentity.OBJECT_ID: "object_id", 

488 }) 

489 

490 resp = http_client.get( 

491 endpoint, 

492 params=params, 

493 headers={ 

494 "X-IDENTITY-HEADER": identity_header, 

495 "Metadata": "true", # Unnecessary yet harmless for App Service, 

496 # It will be needed by Azure Automation 

497 # https://docs.microsoft.com/en-us/azure/automation/enable-managed-identity-for-automation#get-access-token-for-system-assigned-managed-identity-using-http-get 

498 }, 

499 ) 

500 try: 

501 payload = json.loads(resp.text) 

502 if payload.get("access_token") and payload.get("expires_on"): 

503 return { # Normalizing the payload into OAuth2 format 

504 "access_token": payload["access_token"], 

505 "expires_in": int(payload["expires_on"]) - int(time.time()), 

506 "resource": payload.get("resource"), 

507 "token_type": payload.get("token_type", "Bearer"), 

508 } 

509 return { 

510 "error": "invalid_scope", # Empirically, wrong resource ends up with a vague statusCode=500 

511 "error_description": "{}, {}".format( 

512 payload.get("statusCode"), payload.get("message")), 

513 } 

514 except json.decoder.JSONDecodeError: 

515 logger.debug("IMDS emits unexpected payload: %s", resp.text) 

516 raise 

517 

518def _obtain_token_on_machine_learning( 

519 http_client, endpoint, secret, managed_identity, resource, 

520): 

521 # Could not find protocol docs from https://docs.microsoft.com/en-us/azure/machine-learning 

522 # The following implementation is back ported from Azure Identity 1.15.0 

523 logger.debug("Obtaining token via managed identity on Azure Machine Learning") 

524 params = {"api-version": "2017-09-01", "resource": resource} 

525 _adjust_param(params, managed_identity) 

526 if params["api-version"] == "2017-09-01" and "client_id" in params: 

527 # Workaround for a known bug in Azure ML 2017 API 

528 params["clientid"] = params.pop("client_id") 

529 resp = http_client.get( 

530 endpoint, 

531 params=params, 

532 headers={"secret": secret}, 

533 ) 

534 try: 

535 payload = json.loads(resp.text) 

536 if payload.get("access_token") and payload.get("expires_on"): 

537 return { # Normalizing the payload into OAuth2 format 

538 "access_token": payload["access_token"], 

539 "expires_in": int(payload["expires_on"]) - int(time.time()), 

540 "resource": payload.get("resource"), 

541 "token_type": payload.get("token_type", "Bearer"), 

542 } 

543 return { 

544 "error": "invalid_scope", # TODO: To be tested 

545 "error_description": "{}".format(payload), 

546 } 

547 except json.decoder.JSONDecodeError: 

548 logger.debug("IMDS emits unexpected payload: %s", resp.text) 

549 raise 

550 

551 

552def _obtain_token_on_service_fabric( 

553 http_client, endpoint, identity_header, server_thumbprint, resource, 

554): 

555 """Obtains token for 

556 `Service Fabric <https://learn.microsoft.com/en-us/azure/service-fabric/>`_ 

557 """ 

558 # Deployment https://learn.microsoft.com/en-us/azure/service-fabric/service-fabric-get-started-containers-linux 

559 # See also https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/identity/azure-identity/tests/managed-identity-live/service-fabric/service_fabric.md 

560 # Protocol https://learn.microsoft.com/en-us/azure/service-fabric/how-to-managed-identity-service-fabric-app-code#acquiring-an-access-token-using-rest-api 

561 logger.debug("Obtaining token via managed identity on Azure Service Fabric") 

562 resp = http_client.get( 

563 endpoint, 

564 params={"api-version": "2019-07-01-preview", "resource": resource}, 

565 headers={"Secret": identity_header}, 

566 ) 

567 try: 

568 payload = json.loads(resp.text) 

569 if payload.get("access_token") and payload.get("expires_on"): 

570 return { # Normalizing the payload into OAuth2 format 

571 "access_token": payload["access_token"], 

572 "expires_in": int( # Despite the example in docs shows an integer, 

573 payload["expires_on"] # Azure SDK team's test obtained a string. 

574 ) - int(time.time()), 

575 "resource": payload.get("resource"), 

576 "token_type": payload["token_type"], 

577 } 

578 error = payload.get("error", {}) # https://learn.microsoft.com/en-us/azure/service-fabric/how-to-managed-identity-service-fabric-app-code#error-handling 

579 error_mapping = { # Map Service Fabric errors into OAuth2 errors https://www.rfc-editor.org/rfc/rfc6749#section-5.2 

580 "SecretHeaderNotFound": "unauthorized_client", 

581 "ManagedIdentityNotFound": "invalid_client", 

582 "ArgumentNullOrEmpty": "invalid_scope", 

583 } 

584 return { 

585 "error": error_mapping.get(payload["error"]["code"], "invalid_request"), 

586 "error_description": resp.text, 

587 } 

588 except json.decoder.JSONDecodeError: 

589 logger.debug("IMDS emits unexpected payload: %s", resp.text) 

590 raise 

591 

592 

593_supported_arc_platforms_and_their_prefixes = { 

594 "linux": "/var/opt/azcmagent/tokens", 

595 "win32": os.path.expandvars(r"%ProgramData%\AzureConnectedMachineAgent\Tokens"), 

596} 

597 

598class ArcPlatformNotSupportedError(ManagedIdentityError): 

599 pass 

600 

601def _obtain_token_on_arc(http_client, endpoint, resource): 

602 # https://learn.microsoft.com/en-us/azure/azure-arc/servers/managed-identity-authentication 

603 logger.debug("Obtaining token via managed identity on Azure Arc") 

604 resp = http_client.get( 

605 endpoint, 

606 params={"api-version": "2020-06-01", "resource": resource}, 

607 headers={"Metadata": "true"}, 

608 ) 

609 www_auth = "www-authenticate" # Header in lower case 

610 challenge = { 

611 # Normalized to lowercase, because header names are case-insensitive 

612 # https://datatracker.ietf.org/doc/html/rfc7230#section-3.2 

613 k.lower(): v for k, v in resp.headers.items() if k.lower() == www_auth 

614 }.get(www_auth, "").split("=") # Output will be ["Basic realm", "content"] 

615 if not ( # https://datatracker.ietf.org/doc/html/rfc7617#section-2 

616 len(challenge) == 2 and challenge[0].lower() == "basic realm"): 

617 raise ManagedIdentityError( 

618 "Unrecognizable WWW-Authenticate header: {}".format(resp.headers)) 

619 if sys.platform not in _supported_arc_platforms_and_their_prefixes: 

620 raise ArcPlatformNotSupportedError( 

621 f"Platform {sys.platform} was undefined and unsupported") 

622 filename = os.path.join( 

623 # This algorithm is documented in an internal doc https://msazure.visualstudio.com/One/_wiki/wikis/One.wiki/233012/VM-Extension-Authoring-for-Arc?anchor=2.-obtaining-tokens 

624 _supported_arc_platforms_and_their_prefixes[sys.platform], 

625 os.path.splitext(os.path.basename(challenge[1]))[0] + ".key") 

626 if os.stat(filename).st_size > 4096: # Check size BEFORE loading its content 

627 raise ManagedIdentityError("Local key file shall not be larger than 4KB") 

628 with open(filename) as f: 

629 secret = f.read() 

630 response = http_client.get( 

631 endpoint, 

632 params={"api-version": "2020-06-01", "resource": resource}, 

633 headers={"Metadata": "true", "Authorization": "Basic {}".format(secret)}, 

634 ) 

635 try: 

636 payload = json.loads(response.text) 

637 if payload.get("access_token") and payload.get("expires_in"): 

638 # Example: https://learn.microsoft.com/en-us/azure/azure-arc/servers/media/managed-identity-authentication/bash-token-output-example.png 

639 return { 

640 "access_token": payload["access_token"], 

641 "expires_in": int(payload["expires_in"]), 

642 "token_type": payload.get("token_type", "Bearer"), 

643 "resource": payload.get("resource"), 

644 } 

645 except json.decoder.JSONDecodeError: 

646 pass 

647 return { 

648 "error": "invalid_request", 

649 "error_description": response.text, 

650 }