1# Copyright (c) Microsoft Corporation.
2# All rights reserved.
3#
4# This code is licensed under the MIT License.
5import json
6import logging
7import os
8import socket
9import sys
10import time
11from urllib.parse import urlparse # Python 3+
12from collections import UserDict # Python 3+
13from typing import Union # Needed in Python 3.7 & 3.8
14from .token_cache import TokenCache
15from .individual_cache import _IndividualCache as IndividualCache
16from .throttled_http_client import ThrottledHttpClientBase, RetryAfterParser
17from .cloudshell import _is_running_in_cloud_shell
18
19
20logger = logging.getLogger(__name__)
21
22
23class ManagedIdentityError(ValueError):
24 pass
25
26
27class ManagedIdentity(UserDict):
28 """Feed an instance of this class to :class:`msal.ManagedIdentityClient`
29 to acquire token for the specified managed identity.
30 """
31 # The key names used in config dict
32 ID_TYPE = "ManagedIdentityIdType" # Contains keyword ManagedIdentity so its json equivalent will be more readable
33 ID = "Id"
34
35 # Valid values for key ID_TYPE
36 CLIENT_ID = "ClientId"
37 RESOURCE_ID = "ResourceId"
38 OBJECT_ID = "ObjectId"
39 SYSTEM_ASSIGNED = "SystemAssigned"
40
41 _types_mapping = { # Maps type name in configuration to type name on wire
42 CLIENT_ID: "client_id",
43 RESOURCE_ID: "mi_res_id",
44 OBJECT_ID: "object_id",
45 }
46
47 @classmethod
48 def is_managed_identity(cls, unknown):
49 return isinstance(unknown, ManagedIdentity) or (
50 isinstance(unknown, dict) and cls.ID_TYPE in unknown)
51
52 @classmethod
53 def is_system_assigned(cls, unknown):
54 return isinstance(unknown, SystemAssignedManagedIdentity) or (
55 isinstance(unknown, dict)
56 and unknown.get(cls.ID_TYPE) == cls.SYSTEM_ASSIGNED)
57
58 @classmethod
59 def is_user_assigned(cls, unknown):
60 return isinstance(unknown, UserAssignedManagedIdentity) or (
61 isinstance(unknown, dict)
62 and unknown.get(cls.ID_TYPE) in cls._types_mapping
63 and unknown.get(cls.ID))
64
65 def __init__(self, identifier=None, id_type=None):
66 # Undocumented. Use subclasses instead.
67 super(ManagedIdentity, self).__init__({
68 self.ID_TYPE: id_type,
69 self.ID: identifier,
70 })
71
72
73class SystemAssignedManagedIdentity(ManagedIdentity):
74 """Represent a system-assigned managed identity.
75
76 It is equivalent to a Python dict of::
77
78 {"ManagedIdentityIdType": "SystemAssigned", "Id": None}
79
80 or a JSON blob of::
81
82 {"ManagedIdentityIdType": "SystemAssigned", "Id": null}
83 """
84 def __init__(self):
85 super(SystemAssignedManagedIdentity, self).__init__(id_type=self.SYSTEM_ASSIGNED)
86
87
88class UserAssignedManagedIdentity(ManagedIdentity):
89 """Represent a user-assigned managed identity.
90
91 Depends on the id you provided, the outcome is equivalent to one of the below::
92
93 {"ManagedIdentityIdType": "ClientId", "Id": "foo"}
94 {"ManagedIdentityIdType": "ResourceId", "Id": "foo"}
95 {"ManagedIdentityIdType": "ObjectId", "Id": "foo"}
96 """
97 def __init__(self, *, client_id=None, resource_id=None, object_id=None):
98 if client_id and not resource_id and not object_id:
99 super(UserAssignedManagedIdentity, self).__init__(
100 id_type=self.CLIENT_ID, identifier=client_id)
101 elif not client_id and resource_id and not object_id:
102 super(UserAssignedManagedIdentity, self).__init__(
103 id_type=self.RESOURCE_ID, identifier=resource_id)
104 elif not client_id and not resource_id and object_id:
105 super(UserAssignedManagedIdentity, self).__init__(
106 id_type=self.OBJECT_ID, identifier=object_id)
107 else:
108 raise ManagedIdentityError(
109 "You shall specify one of the three parameters: "
110 "client_id, resource_id, object_id")
111
112
113class _ThrottledHttpClient(ThrottledHttpClientBase):
114 def __init__(self, http_client, **kwargs):
115 super(_ThrottledHttpClient, self).__init__(http_client, **kwargs)
116 self.get = IndividualCache( # All MIs (except Cloud Shell) use GETs
117 mapping=self._expiring_mapping,
118 key_maker=lambda func, args, kwargs: "REQ {} hash={} 429/5xx/Retry-After".format(
119 args[0], # It is the endpoint, typically a constant per MI type
120 self._hash(
121 # Managed Identity flavors have inconsistent parameters.
122 # We simply choose to hash them all.
123 str(kwargs.get("params")) + str(kwargs.get("data"))),
124 ),
125 expires_in=RetryAfterParser(5).parse, # 5 seconds default for non-PCA
126 )(http_client.get)
127
128
129class ManagedIdentityClient(object):
130 """This API encapsulates multiple managed identity back-ends:
131 VM, App Service, Azure Automation (Runbooks), Azure Function, Service Fabric,
132 and Azure Arc.
133
134 It also provides token cache support.
135
136 .. note::
137
138 Cloud Shell support is NOT implemented in this class.
139 Since MSAL Python 1.18 in May 2022, it has been implemented in
140 :func:`PublicClientApplication.acquire_token_interactive` via calling pattern
141 ``PublicClientApplication(...).acquire_token_interactive(scopes=[...], prompt="none")``.
142 That is appropriate, because Cloud Shell yields a token with
143 delegated permissions for the end user who has signed in to the Azure Portal
144 (like what a ``PublicClientApplication`` does),
145 not a token with application permissions for an app.
146 """
147 _instance, _tenant = socket.getfqdn(), "managed_identity" # Placeholders
148
149 def __init__(
150 self,
151 managed_identity: Union[
152 dict,
153 ManagedIdentity, # Could use Type[ManagedIdentity] but it is deprecatred in Python 3.9+
154 SystemAssignedManagedIdentity,
155 UserAssignedManagedIdentity,
156 ],
157 *,
158 http_client,
159 token_cache=None,
160 http_cache=None,
161 ):
162 """Create a managed identity client.
163
164 :param managed_identity:
165 It accepts an instance of :class:`SystemAssignedManagedIdentity`
166 or :class:`UserAssignedManagedIdentity`.
167 They are equivalent to a dict with a certain shape,
168 which may be loaded from a JSON configuration file or an env var.
169
170 :param http_client:
171 An http client object. For example, you can use ``requests.Session()``,
172 optionally with exponential backoff behavior demonstrated in this recipe::
173
174 import msal, requests
175 from requests.adapters import HTTPAdapter, Retry
176 s = requests.Session()
177 retries = Retry(total=3, backoff_factor=0.1, status_forcelist=[
178 429, 500, 501, 502, 503, 504])
179 s.mount('https://', HTTPAdapter(max_retries=retries))
180 managed_identity = ...
181 client = msal.ManagedIdentityClient(managed_identity, http_client=s)
182
183 :param token_cache:
184 Optional. It accepts a :class:`msal.TokenCache` instance to store tokens.
185 It will use an in-memory token cache by default.
186
187 :param http_cache:
188 Optional. It has the same characteristics as the
189 :paramref:`msal.ClientApplication.http_cache`.
190
191 Recipe 1: Hard code a managed identity for your app::
192
193 import msal, requests
194 client = msal.ManagedIdentityClient(
195 msal.UserAssignedManagedIdentity(client_id="foo"),
196 http_client=requests.Session(),
197 )
198 token = client.acquire_token_for_client("resource")
199
200 Recipe 2: Write once, run everywhere.
201 If you use different managed identity on different deployment,
202 you may use an environment variable (such as MY_MANAGED_IDENTITY_CONFIG)
203 to store a json blob like
204 ``{"ManagedIdentityIdType": "ClientId", "Id": "foo"}`` or
205 ``{"ManagedIdentityIdType": "SystemAssignedManagedIdentity", "Id": null})``.
206 The following app can load managed identity configuration dynamically::
207
208 import json, os, msal, requests
209 config = os.getenv("MY_MANAGED_IDENTITY_CONFIG")
210 assert config, "An ENV VAR with value should exist"
211 client = msal.ManagedIdentityClient(
212 json.loads(config),
213 http_client=requests.Session(),
214 )
215 token = client.acquire_token_for_client("resource")
216 """
217 self._managed_identity = managed_identity
218 self._http_client = _ThrottledHttpClient(
219 # This class only throttles excess token acquisition requests.
220 # It does not provide retry.
221 # Retry is the http_client or caller's responsibility, not MSAL's.
222 #
223 # FWIW, here is the inconsistent retry recommendation.
224 # 1. Only MI on VM defines exotic 404 and 410 retry recommendations
225 # ( https://learn.microsoft.com/en-us/entra/identity/managed-identities-azure-resources/how-to-use-vm-token#error-handling )
226 # (especially for 410 which was supposed to be a permanent failure).
227 # 2. MI on Service Fabric specifically suggests to not retry on 404.
228 # ( https://learn.microsoft.com/en-us/azure/service-fabric/how-to-managed-cluster-managed-identity-service-fabric-app-code#error-handling )
229 http_client.http_client # Patch the raw (unpatched) http client
230 if isinstance(http_client, ThrottledHttpClientBase) else http_client,
231 http_cache=http_cache,
232 )
233 self._token_cache = token_cache or TokenCache()
234
235 def acquire_token_for_client(self, *, resource): # We may support scope in the future
236 """Acquire token for the managed identity.
237
238 The result will be automatically cached.
239 Subsequent calls will automatically search from cache first.
240
241 .. note::
242
243 Known issue: When an Azure VM has only one user-assigned managed identity,
244 and your app specifies to use system-assigned managed identity,
245 Azure VM may still return a token for your user-assigned identity.
246
247 This is a service-side behavior that cannot be changed by this library.
248 `Azure VM docs <https://learn.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/how-to-use-vm-token#get-a-token-using-http>`_
249 """
250 access_token_from_cache = None
251 client_id_in_cache = self._managed_identity.get(
252 ManagedIdentity.ID, "SYSTEM_ASSIGNED_MANAGED_IDENTITY")
253 if True: # Does not offer an "if not force_refresh" option, because
254 # there would be built-in token cache in the service side anyway
255 matches = self._token_cache.find(
256 self._token_cache.CredentialType.ACCESS_TOKEN,
257 target=[resource],
258 query=dict(
259 client_id=client_id_in_cache,
260 environment=self._instance,
261 realm=self._tenant,
262 home_account_id=None,
263 ),
264 )
265 now = time.time()
266 for entry in matches:
267 expires_in = int(entry["expires_on"]) - now
268 if expires_in < 5*60: # Then consider it expired
269 continue # Removal is not necessary, it will be overwritten
270 logger.debug("Cache hit an AT")
271 access_token_from_cache = { # Mimic a real response
272 "access_token": entry["secret"],
273 "token_type": entry.get("token_type", "Bearer"),
274 "expires_in": int(expires_in), # OAuth2 specs defines it as int
275 }
276 if "refresh_on" in entry:
277 access_token_from_cache["refresh_on"] = int(entry["refresh_on"])
278 if int(entry["refresh_on"]) < now: # aging
279 break # With a fallback in hand, we break here to go refresh
280 return access_token_from_cache # It is still good as new
281 try:
282 result = _obtain_token(self._http_client, self._managed_identity, resource)
283 if "access_token" in result:
284 expires_in = result.get("expires_in", 3600)
285 if "refresh_in" not in result and expires_in >= 7200:
286 result["refresh_in"] = int(expires_in / 2)
287 self._token_cache.add(dict(
288 client_id=client_id_in_cache,
289 scope=[resource],
290 token_endpoint="https://{}/{}".format(self._instance, self._tenant),
291 response=result,
292 params={},
293 data={},
294 ))
295 if "refresh_in" in result:
296 result["refresh_on"] = int(now + result["refresh_in"])
297 if (result and "error" not in result) or (not access_token_from_cache):
298 return result
299 except: # The exact HTTP exception is transportation-layer dependent
300 # Typically network error. Potential AAD outage?
301 if not access_token_from_cache: # It means there is no fall back option
302 raise # We choose to bubble up the exception
303 return access_token_from_cache
304
305
306def _scope_to_resource(scope): # This is an experimental reasonable-effort approach
307 u = urlparse(scope)
308 if u.scheme:
309 return "{}://{}".format(u.scheme, u.netloc)
310 return scope # There is no much else we can do here
311
312
313APP_SERVICE = object()
314AZURE_ARC = object()
315CLOUD_SHELL = object() # In MSAL Python, token acquisition was done by
316 # PublicClientApplication(...).acquire_token_interactive(..., prompt="none")
317MACHINE_LEARNING = object()
318SERVICE_FABRIC = object()
319DEFAULT_TO_VM = object() # Unknown environment; default to VM; you may want to probe
320def get_managed_identity_source():
321 """Detect the current environment and return the likely identity source.
322
323 When this function returns ``CLOUD_SHELL``, you should use
324 :func:`msal.PublicClientApplication.acquire_token_interactive` with ``prompt="none"``
325 to obtain a token.
326 """
327 if ("IDENTITY_ENDPOINT" in os.environ and "IDENTITY_HEADER" in os.environ
328 and "IDENTITY_SERVER_THUMBPRINT" in os.environ
329 ):
330 return SERVICE_FABRIC
331 if "IDENTITY_ENDPOINT" in os.environ and "IDENTITY_HEADER" in os.environ:
332 return APP_SERVICE
333 if "MSI_ENDPOINT" in os.environ and "MSI_SECRET" in os.environ:
334 return MACHINE_LEARNING
335 if "IDENTITY_ENDPOINT" in os.environ and "IMDS_ENDPOINT" in os.environ:
336 return AZURE_ARC
337 if _is_running_in_cloud_shell():
338 return CLOUD_SHELL
339 return DEFAULT_TO_VM
340
341
342def _obtain_token(http_client, managed_identity, resource):
343 # A unified low-level API that talks to different Managed Identity
344 if ("IDENTITY_ENDPOINT" in os.environ and "IDENTITY_HEADER" in os.environ
345 and "IDENTITY_SERVER_THUMBPRINT" in os.environ
346 ):
347 if managed_identity:
348 logger.debug(
349 "Ignoring managed_identity parameter. "
350 "Managed Identity in Service Fabric is configured in the cluster, "
351 "not during runtime. See also "
352 "https://learn.microsoft.com/en-us/azure/service-fabric/configure-existing-cluster-enable-managed-identity-token-service")
353 return _obtain_token_on_service_fabric(
354 http_client,
355 os.environ["IDENTITY_ENDPOINT"],
356 os.environ["IDENTITY_HEADER"],
357 os.environ["IDENTITY_SERVER_THUMBPRINT"],
358 resource,
359 )
360 if "IDENTITY_ENDPOINT" in os.environ and "IDENTITY_HEADER" in os.environ:
361 return _obtain_token_on_app_service(
362 http_client,
363 os.environ["IDENTITY_ENDPOINT"],
364 os.environ["IDENTITY_HEADER"],
365 managed_identity,
366 resource,
367 )
368 if "MSI_ENDPOINT" in os.environ and "MSI_SECRET" in os.environ:
369 # Back ported from https://github.com/Azure/azure-sdk-for-python/blob/azure-identity_1.15.0/sdk/identity/azure-identity/azure/identity/_credentials/azure_ml.py
370 return _obtain_token_on_machine_learning(
371 http_client,
372 os.environ["MSI_ENDPOINT"],
373 os.environ["MSI_SECRET"],
374 managed_identity,
375 resource,
376 )
377 if "IDENTITY_ENDPOINT" in os.environ and "IMDS_ENDPOINT" in os.environ:
378 if ManagedIdentity.is_user_assigned(managed_identity):
379 raise ManagedIdentityError( # Note: Azure Identity for Python raised exception too
380 "Invalid managed_identity parameter. "
381 "Azure Arc supports only system-assigned managed identity, "
382 "See also "
383 "https://learn.microsoft.com/en-us/azure/service-fabric/configure-existing-cluster-enable-managed-identity-token-service")
384 return _obtain_token_on_arc(
385 http_client,
386 os.environ["IDENTITY_ENDPOINT"],
387 resource,
388 )
389 return _obtain_token_on_azure_vm(http_client, managed_identity, resource)
390
391
392def _adjust_param(params, managed_identity):
393 # Modify the params dict in place
394 id_name = ManagedIdentity._types_mapping.get(
395 managed_identity.get(ManagedIdentity.ID_TYPE))
396 if id_name:
397 params[id_name] = managed_identity[ManagedIdentity.ID]
398
399def _obtain_token_on_azure_vm(http_client, managed_identity, resource):
400 # Based on https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/how-to-use-vm-token#get-a-token-using-http
401 logger.debug("Obtaining token via managed identity on Azure VM")
402 params = {
403 "api-version": "2018-02-01",
404 "resource": resource,
405 }
406 _adjust_param(params, managed_identity)
407 resp = http_client.get(
408 "http://169.254.169.254/metadata/identity/oauth2/token",
409 params=params,
410 headers={"Metadata": "true"},
411 )
412 try:
413 payload = json.loads(resp.text)
414 if payload.get("access_token") and payload.get("expires_in"):
415 return { # Normalizing the payload into OAuth2 format
416 "access_token": payload["access_token"],
417 "expires_in": int(payload["expires_in"]),
418 "resource": payload.get("resource"),
419 "token_type": payload.get("token_type", "Bearer"),
420 }
421 return payload # Typically an error, but it is undefined in the doc above
422 except json.decoder.JSONDecodeError:
423 logger.debug("IMDS emits unexpected payload: %s", resp.text)
424 raise
425
426def _obtain_token_on_app_service(
427 http_client, endpoint, identity_header, managed_identity, resource,
428):
429 """Obtains token for
430 `App Service <https://learn.microsoft.com/en-us/azure/app-service/overview-managed-identity?tabs=portal%2Chttp#rest-endpoint-reference>`_,
431 Azure Functions, and Azure Automation.
432 """
433 # Prerequisite: Create your app service https://docs.microsoft.com/en-us/azure/app-service/quickstart-python
434 # Assign it a managed identity https://docs.microsoft.com/en-us/azure/app-service/overview-managed-identity?tabs=portal%2Chttp
435 # SSH into your container for testing https://docs.microsoft.com/en-us/azure/app-service/configure-linux-open-ssh-session
436 logger.debug("Obtaining token via managed identity on Azure App Service")
437 params = {
438 "api-version": "2019-08-01",
439 "resource": resource,
440 }
441 _adjust_param(params, managed_identity)
442 resp = http_client.get(
443 endpoint,
444 params=params,
445 headers={
446 "X-IDENTITY-HEADER": identity_header,
447 "Metadata": "true", # Unnecessary yet harmless for App Service,
448 # It will be needed by Azure Automation
449 # https://docs.microsoft.com/en-us/azure/automation/enable-managed-identity-for-automation#get-access-token-for-system-assigned-managed-identity-using-http-get
450 },
451 )
452 try:
453 payload = json.loads(resp.text)
454 if payload.get("access_token") and payload.get("expires_on"):
455 return { # Normalizing the payload into OAuth2 format
456 "access_token": payload["access_token"],
457 "expires_in": int(payload["expires_on"]) - int(time.time()),
458 "resource": payload.get("resource"),
459 "token_type": payload.get("token_type", "Bearer"),
460 }
461 return {
462 "error": "invalid_scope", # Empirically, wrong resource ends up with a vague statusCode=500
463 "error_description": "{}, {}".format(
464 payload.get("statusCode"), payload.get("message")),
465 }
466 except json.decoder.JSONDecodeError:
467 logger.debug("IMDS emits unexpected payload: %s", resp.text)
468 raise
469
470def _obtain_token_on_machine_learning(
471 http_client, endpoint, secret, managed_identity, resource,
472):
473 # Could not find protocol docs from https://docs.microsoft.com/en-us/azure/machine-learning
474 # The following implementation is back ported from Azure Identity 1.15.0
475 logger.debug("Obtaining token via managed identity on Azure Machine Learning")
476 params = {"api-version": "2017-09-01", "resource": resource}
477 _adjust_param(params, managed_identity)
478 if params["api-version"] == "2017-09-01" and "client_id" in params:
479 # Workaround for a known bug in Azure ML 2017 API
480 params["clientid"] = params.pop("client_id")
481 resp = http_client.get(
482 endpoint,
483 params=params,
484 headers={"secret": secret},
485 )
486 try:
487 payload = json.loads(resp.text)
488 if payload.get("access_token") and payload.get("expires_on"):
489 return { # Normalizing the payload into OAuth2 format
490 "access_token": payload["access_token"],
491 "expires_in": int(payload["expires_on"]) - int(time.time()),
492 "resource": payload.get("resource"),
493 "token_type": payload.get("token_type", "Bearer"),
494 }
495 return {
496 "error": "invalid_scope", # TODO: To be tested
497 "error_description": "{}".format(payload),
498 }
499 except json.decoder.JSONDecodeError:
500 logger.debug("IMDS emits unexpected payload: %s", resp.text)
501 raise
502
503
504def _obtain_token_on_service_fabric(
505 http_client, endpoint, identity_header, server_thumbprint, resource,
506):
507 """Obtains token for
508 `Service Fabric <https://learn.microsoft.com/en-us/azure/service-fabric/>`_
509 """
510 # Deployment https://learn.microsoft.com/en-us/azure/service-fabric/service-fabric-get-started-containers-linux
511 # See also https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/identity/azure-identity/tests/managed-identity-live/service-fabric/service_fabric.md
512 # Protocol https://learn.microsoft.com/en-us/azure/service-fabric/how-to-managed-identity-service-fabric-app-code#acquiring-an-access-token-using-rest-api
513 logger.debug("Obtaining token via managed identity on Azure Service Fabric")
514 resp = http_client.get(
515 endpoint,
516 params={"api-version": "2019-07-01-preview", "resource": resource},
517 headers={"Secret": identity_header},
518 )
519 try:
520 payload = json.loads(resp.text)
521 if payload.get("access_token") and payload.get("expires_on"):
522 return { # Normalizing the payload into OAuth2 format
523 "access_token": payload["access_token"],
524 "expires_in": int( # Despite the example in docs shows an integer,
525 payload["expires_on"] # Azure SDK team's test obtained a string.
526 ) - int(time.time()),
527 "resource": payload.get("resource"),
528 "token_type": payload["token_type"],
529 }
530 error = payload.get("error", {}) # https://learn.microsoft.com/en-us/azure/service-fabric/how-to-managed-identity-service-fabric-app-code#error-handling
531 error_mapping = { # Map Service Fabric errors into OAuth2 errors https://www.rfc-editor.org/rfc/rfc6749#section-5.2
532 "SecretHeaderNotFound": "unauthorized_client",
533 "ManagedIdentityNotFound": "invalid_client",
534 "ArgumentNullOrEmpty": "invalid_scope",
535 }
536 return {
537 "error": error_mapping.get(payload["error"]["code"], "invalid_request"),
538 "error_description": resp.text,
539 }
540 except json.decoder.JSONDecodeError:
541 logger.debug("IMDS emits unexpected payload: %s", resp.text)
542 raise
543
544
545_supported_arc_platforms_and_their_prefixes = {
546 "linux": "/var/opt/azcmagent/tokens",
547 "win32": os.path.expandvars(r"%ProgramData%\AzureConnectedMachineAgent\Tokens"),
548}
549
550class ArcPlatformNotSupportedError(ManagedIdentityError):
551 pass
552
553def _obtain_token_on_arc(http_client, endpoint, resource):
554 # https://learn.microsoft.com/en-us/azure/azure-arc/servers/managed-identity-authentication
555 logger.debug("Obtaining token via managed identity on Azure Arc")
556 resp = http_client.get(
557 endpoint,
558 params={"api-version": "2020-06-01", "resource": resource},
559 headers={"Metadata": "true"},
560 )
561 www_auth = "www-authenticate" # Header in lower case
562 challenge = {
563 # Normalized to lowercase, because header names are case-insensitive
564 # https://datatracker.ietf.org/doc/html/rfc7230#section-3.2
565 k.lower(): v for k, v in resp.headers.items() if k.lower() == www_auth
566 }.get(www_auth, "").split("=") # Output will be ["Basic realm", "content"]
567 if not ( # https://datatracker.ietf.org/doc/html/rfc7617#section-2
568 len(challenge) == 2 and challenge[0].lower() == "basic realm"):
569 raise ManagedIdentityError(
570 "Unrecognizable WWW-Authenticate header: {}".format(resp.headers))
571 if sys.platform not in _supported_arc_platforms_and_their_prefixes:
572 raise ArcPlatformNotSupportedError(
573 f"Platform {sys.platform} was undefined and unsupported")
574 filename = os.path.join(
575 # This algorithm is documented in an internal doc https://msazure.visualstudio.com/One/_wiki/wikis/One.wiki/233012/VM-Extension-Authoring-for-Arc?anchor=2.-obtaining-tokens
576 _supported_arc_platforms_and_their_prefixes[sys.platform],
577 os.path.splitext(os.path.basename(challenge[1]))[0] + ".key")
578 if os.stat(filename).st_size > 4096: # Check size BEFORE loading its content
579 raise ManagedIdentityError("Local key file shall not be larger than 4KB")
580 with open(filename) as f:
581 secret = f.read()
582 response = http_client.get(
583 endpoint,
584 params={"api-version": "2020-06-01", "resource": resource},
585 headers={"Metadata": "true", "Authorization": "Basic {}".format(secret)},
586 )
587 try:
588 payload = json.loads(response.text)
589 if payload.get("access_token") and payload.get("expires_in"):
590 # Example: https://learn.microsoft.com/en-us/azure/azure-arc/servers/media/managed-identity-authentication/bash-token-output-example.png
591 return {
592 "access_token": payload["access_token"],
593 "expires_in": int(payload["expires_in"]),
594 "token_type": payload.get("token_type", "Bearer"),
595 "resource": payload.get("resource"),
596 }
597 except json.decoder.JSONDecodeError:
598 pass
599 return {
600 "error": "invalid_request",
601 "error_description": response.text,
602 }
603