1# Copyright (c) Microsoft Corporation.
2# All rights reserved.
3#
4# This code is licensed under the MIT License.
5import json
6import logging
7import os
8import socket
9import sys
10import time
11from urllib.parse import urlparse # Python 3+
12from collections import UserDict # Python 3+
13from typing import Optional, Union # Needed in Python 3.7 & 3.8
14from .token_cache import TokenCache
15from .individual_cache import _IndividualCache as IndividualCache
16from .throttled_http_client import ThrottledHttpClientBase, RetryAfterParser
17from .cloudshell import _is_running_in_cloud_shell
18
19
20logger = logging.getLogger(__name__)
21
22
23class ManagedIdentityError(ValueError):
24 pass
25
26
27class ManagedIdentity(UserDict):
28 """Feed an instance of this class to :class:`msal.ManagedIdentityClient`
29 to acquire token for the specified managed identity.
30 """
31 # The key names used in config dict
32 ID_TYPE = "ManagedIdentityIdType" # Contains keyword ManagedIdentity so its json equivalent will be more readable
33 ID = "Id"
34
35 # Valid values for key ID_TYPE
36 CLIENT_ID = "ClientId"
37 RESOURCE_ID = "ResourceId"
38 OBJECT_ID = "ObjectId"
39 SYSTEM_ASSIGNED = "SystemAssigned"
40
41 _types_mapping = { # Maps type name in configuration to type name on wire
42 CLIENT_ID: "client_id",
43 RESOURCE_ID: "msi_res_id", # VM's IMDS prefers msi_res_id https://github.com/Azure/azure-rest-api-specs/blob/dba6ed1f03bda88ac6884c0a883246446cc72495/specification/imds/data-plane/Microsoft.InstanceMetadataService/stable/2018-10-01/imds.json#L233-L239
44 OBJECT_ID: "object_id",
45 }
46
47 @classmethod
48 def is_managed_identity(cls, unknown):
49 return (isinstance(unknown, ManagedIdentity)
50 or cls.is_system_assigned(unknown)
51 or cls.is_user_assigned(unknown))
52
53 @classmethod
54 def is_system_assigned(cls, unknown):
55 return isinstance(unknown, SystemAssignedManagedIdentity) or (
56 isinstance(unknown, dict)
57 and unknown.get(cls.ID_TYPE) == cls.SYSTEM_ASSIGNED)
58
59 @classmethod
60 def is_user_assigned(cls, unknown):
61 return isinstance(unknown, UserAssignedManagedIdentity) or (
62 isinstance(unknown, dict)
63 and unknown.get(cls.ID_TYPE) in cls._types_mapping
64 and unknown.get(cls.ID))
65
66 def __init__(self, identifier=None, id_type=None):
67 # Undocumented. Use subclasses instead.
68 super(ManagedIdentity, self).__init__({
69 self.ID_TYPE: id_type,
70 self.ID: identifier,
71 })
72
73
74class SystemAssignedManagedIdentity(ManagedIdentity):
75 """Represent a system-assigned managed identity.
76
77 It is equivalent to a Python dict of::
78
79 {"ManagedIdentityIdType": "SystemAssigned", "Id": None}
80
81 or a JSON blob of::
82
83 {"ManagedIdentityIdType": "SystemAssigned", "Id": null}
84 """
85 def __init__(self):
86 super(SystemAssignedManagedIdentity, self).__init__(id_type=self.SYSTEM_ASSIGNED)
87
88
89class UserAssignedManagedIdentity(ManagedIdentity):
90 """Represent a user-assigned managed identity.
91
92 Depends on the id you provided, the outcome is equivalent to one of the below::
93
94 {"ManagedIdentityIdType": "ClientId", "Id": "foo"}
95 {"ManagedIdentityIdType": "ResourceId", "Id": "foo"}
96 {"ManagedIdentityIdType": "ObjectId", "Id": "foo"}
97 """
98 def __init__(self, *, client_id=None, resource_id=None, object_id=None):
99 if client_id and not resource_id and not object_id:
100 super(UserAssignedManagedIdentity, self).__init__(
101 id_type=self.CLIENT_ID, identifier=client_id)
102 elif not client_id and resource_id and not object_id:
103 super(UserAssignedManagedIdentity, self).__init__(
104 id_type=self.RESOURCE_ID, identifier=resource_id)
105 elif not client_id and not resource_id and object_id:
106 super(UserAssignedManagedIdentity, self).__init__(
107 id_type=self.OBJECT_ID, identifier=object_id)
108 else:
109 raise ManagedIdentityError(
110 "You shall specify one of the three parameters: "
111 "client_id, resource_id, object_id")
112
113
114class _ThrottledHttpClient(ThrottledHttpClientBase):
115 def __init__(self, http_client, **kwargs):
116 super(_ThrottledHttpClient, self).__init__(http_client, **kwargs)
117 self.get = IndividualCache( # All MIs (except Cloud Shell) use GETs
118 mapping=self._expiring_mapping,
119 key_maker=lambda func, args, kwargs: "REQ {} hash={} 429/5xx/Retry-After".format(
120 args[0], # It is the endpoint, typically a constant per MI type
121 self._hash(
122 # Managed Identity flavors have inconsistent parameters.
123 # We simply choose to hash them all.
124 str(kwargs.get("params")) + str(kwargs.get("data"))),
125 ),
126 expires_in=RetryAfterParser(5).parse, # 5 seconds default for non-PCA
127 )(http_client.get)
128
129
130class ManagedIdentityClient(object):
131 """This API encapsulates multiple managed identity back-ends:
132 VM, App Service, Azure Automation (Runbooks), Azure Function, Service Fabric,
133 and Azure Arc.
134
135 It also provides token cache support.
136
137 .. note::
138
139 Cloud Shell support is NOT implemented in this class.
140 Since MSAL Python 1.18 in May 2022, it has been implemented in
141 :func:`PublicClientApplication.acquire_token_interactive` via calling pattern
142 ``PublicClientApplication(...).acquire_token_interactive(scopes=[...], prompt="none")``.
143 That is appropriate, because Cloud Shell yields a token with
144 delegated permissions for the end user who has signed in to the Azure Portal
145 (like what a ``PublicClientApplication`` does),
146 not a token with application permissions for an app.
147 """
148 __instance, _tenant = None, "managed_identity" # Placeholders
149 _TOKEN_SOURCE = "token_source"
150 _TOKEN_SOURCE_IDP = "identity_provider"
151 _TOKEN_SOURCE_CACHE = "cache"
152
153 def __init__(
154 self,
155 managed_identity: Union[
156 dict,
157 ManagedIdentity, # Could use Type[ManagedIdentity] but it is deprecated in Python 3.9+
158 SystemAssignedManagedIdentity,
159 UserAssignedManagedIdentity,
160 ],
161 *,
162 http_client,
163 token_cache=None,
164 http_cache=None,
165 ):
166 """Create a managed identity client.
167
168 :param managed_identity:
169 It accepts an instance of :class:`SystemAssignedManagedIdentity`
170 or :class:`UserAssignedManagedIdentity`.
171 They are equivalent to a dict with a certain shape,
172 which may be loaded from a JSON configuration file or an env var.
173
174 :param http_client:
175 An http client object. For example, you can use ``requests.Session()``,
176 optionally with exponential backoff behavior demonstrated in this recipe::
177
178 import msal, requests
179 from requests.adapters import HTTPAdapter, Retry
180 s = requests.Session()
181 retries = Retry(total=3, backoff_factor=0.1, status_forcelist=[
182 429, 500, 501, 502, 503, 504])
183 s.mount('https://', HTTPAdapter(max_retries=retries))
184 managed_identity = ...
185 client = msal.ManagedIdentityClient(managed_identity, http_client=s)
186
187 :param token_cache:
188 Optional. It accepts a :class:`msal.TokenCache` instance to store tokens.
189 It will use an in-memory token cache by default.
190
191 :param http_cache:
192 Optional. It has the same characteristics as the
193 :paramref:`msal.ClientApplication.http_cache`.
194
195 Recipe 1: Hard code a managed identity for your app::
196
197 import msal, requests
198 client = msal.ManagedIdentityClient(
199 msal.UserAssignedManagedIdentity(client_id="foo"),
200 http_client=requests.Session(),
201 )
202 token = client.acquire_token_for_client("resource")
203
204 Recipe 2: Write once, run everywhere.
205 If you use different managed identity on different deployment,
206 you may use an environment variable (such as MY_MANAGED_IDENTITY_CONFIG)
207 to store a json blob like
208 ``{"ManagedIdentityIdType": "ClientId", "Id": "foo"}`` or
209 ``{"ManagedIdentityIdType": "SystemAssigned", "Id": null}``.
210 The following app can load managed identity configuration dynamically::
211
212 import json, os, msal, requests
213 config = os.getenv("MY_MANAGED_IDENTITY_CONFIG")
214 assert config, "An ENV VAR with value should exist"
215 client = msal.ManagedIdentityClient(
216 json.loads(config),
217 http_client=requests.Session(),
218 )
219 token = client.acquire_token_for_client("resource")
220 """
221 if not ManagedIdentity.is_managed_identity(managed_identity):
222 raise ManagedIdentityError(
223 f"Incorrect managed_identity: {managed_identity}")
224 self._managed_identity = managed_identity
225 self._http_client = _ThrottledHttpClient(
226 # This class only throttles excess token acquisition requests.
227 # It does not provide retry.
228 # Retry is the http_client or caller's responsibility, not MSAL's.
229 #
230 # FWIW, here is the inconsistent retry recommendation.
231 # 1. Only MI on VM defines exotic 404 and 410 retry recommendations
232 # ( https://learn.microsoft.com/en-us/entra/identity/managed-identities-azure-resources/how-to-use-vm-token#error-handling )
233 # (especially for 410 which was supposed to be a permanent failure).
234 # 2. MI on Service Fabric specifically suggests to not retry on 404.
235 # ( https://learn.microsoft.com/en-us/azure/service-fabric/how-to-managed-cluster-managed-identity-service-fabric-app-code#error-handling )
236 http_client.http_client # Patch the raw (unpatched) http client
237 if isinstance(http_client, ThrottledHttpClientBase) else http_client,
238 http_cache=http_cache,
239 )
240 self._token_cache = token_cache or TokenCache()
241
242 def _get_instance(self):
243 if self.__instance is None:
244 self.__instance = socket.getfqdn() # Moved from class definition to here
245 return self.__instance
246
247 def acquire_token_for_client(
248 self,
249 *,
250 resource: str, # If/when we support scope, resource will become optional
251 claims_challenge: Optional[str] = None,
252 ):
253 """Acquire token for the managed identity.
254
255 The result will be automatically cached.
256 Subsequent calls will automatically search from cache first.
257
258 :param resource: The resource for which the token is acquired.
259
260 :param claims_challenge:
261 Optional.
262 It is a string representation of a JSON object
263 (which contains lists of claims being requested).
264
265 The tenant admin may choose to revoke all Managed Identity tokens,
266 and then a *claims challenge* will be returned by the target resource,
267 as a `claims_challenge` directive in the `www-authenticate` header,
268 even if the app developer did not opt in for the "CP1" client capability.
269 Upon receiving a `claims_challenge`, MSAL will skip a token cache read,
270 and will attempt to acquire a new token.
271
272 .. note::
273
274 Known issue: When an Azure VM has only one user-assigned managed identity,
275 and your app specifies to use system-assigned managed identity,
276 Azure VM may still return a token for your user-assigned identity.
277
278 This is a service-side behavior that cannot be changed by this library.
279 `Azure VM docs <https://learn.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/how-to-use-vm-token#get-a-token-using-http>`_
280 """
281 access_token_from_cache = None
282 client_id_in_cache = self._managed_identity.get(
283 ManagedIdentity.ID, "SYSTEM_ASSIGNED_MANAGED_IDENTITY")
284 now = time.time()
285 if not claims_challenge: # Then attempt token cache search
286 matches = self._token_cache.find(
287 self._token_cache.CredentialType.ACCESS_TOKEN,
288 target=[resource],
289 query=dict(
290 client_id=client_id_in_cache,
291 environment=self._get_instance(),
292 realm=self._tenant,
293 home_account_id=None,
294 ),
295 )
296 for entry in matches:
297 expires_in = int(entry["expires_on"]) - now
298 if expires_in < 5*60: # Then consider it expired
299 continue # Removal is not necessary, it will be overwritten
300 logger.debug("Cache hit an AT")
301 access_token_from_cache = { # Mimic a real response
302 "access_token": entry["secret"],
303 "token_type": entry.get("token_type", "Bearer"),
304 "expires_in": int(expires_in), # OAuth2 specs defines it as int
305 self._TOKEN_SOURCE: self._TOKEN_SOURCE_CACHE,
306 }
307 if "refresh_on" in entry:
308 access_token_from_cache["refresh_on"] = int(entry["refresh_on"])
309 if int(entry["refresh_on"]) < now: # aging
310 break # With a fallback in hand, we break here to go refresh
311 return access_token_from_cache # It is still good as new
312 try:
313 result = _obtain_token(self._http_client, self._managed_identity, resource)
314 if "access_token" in result:
315 expires_in = result.get("expires_in", 3600)
316 if "refresh_in" not in result and expires_in >= 7200:
317 result["refresh_in"] = int(expires_in / 2)
318 self._token_cache.add(dict(
319 client_id=client_id_in_cache,
320 scope=[resource],
321 token_endpoint="https://{}/{}".format(
322 self._get_instance(), self._tenant),
323 response=result,
324 params={},
325 data={},
326 ))
327 if "refresh_in" in result:
328 result["refresh_on"] = int(now + result["refresh_in"])
329 result[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP
330 if (result and "error" not in result) or (not access_token_from_cache):
331 return result
332 except: # The exact HTTP exception is transportation-layer dependent
333 # Typically network error. Potential AAD outage?
334 if not access_token_from_cache: # It means there is no fall back option
335 raise # We choose to bubble up the exception
336 return access_token_from_cache
337
338
339def _scope_to_resource(scope): # This is an experimental reasonable-effort approach
340 u = urlparse(scope)
341 if u.scheme:
342 return "{}://{}".format(u.scheme, u.netloc)
343 return scope # There is no much else we can do here
344
345
346def _get_arc_endpoint():
347 if "IDENTITY_ENDPOINT" in os.environ and "IMDS_ENDPOINT" in os.environ:
348 return os.environ["IDENTITY_ENDPOINT"]
349 if ( # Defined in https://eng.ms/docs/cloud-ai-platform/azure-core/azure-management-and-platforms/control-plane-bburns/hybrid-resource-provider/azure-arc-for-servers/specs/extension_authoring
350 sys.platform == "linux" and os.path.exists("/opt/azcmagent/bin/himds")
351 or sys.platform == "win32" and os.path.exists(os.path.expandvars(
352 # Avoid Windows-only "%EnvVar%" syntax so that tests can be run on Linux
353 r"${ProgramFiles}\AzureConnectedMachineAgent\himds.exe"
354 ))
355 ):
356 return "http://localhost:40342/metadata/identity/oauth2/token"
357
358
359APP_SERVICE = object()
360AZURE_ARC = object()
361CLOUD_SHELL = object() # In MSAL Python, token acquisition was done by
362 # PublicClientApplication(...).acquire_token_interactive(..., prompt="none")
363MACHINE_LEARNING = object()
364SERVICE_FABRIC = object()
365DEFAULT_TO_VM = object() # Unknown environment; default to VM; you may want to probe
366def get_managed_identity_source():
367 """Detect the current environment and return the likely identity source.
368
369 When this function returns ``CLOUD_SHELL``, you should use
370 :func:`msal.PublicClientApplication.acquire_token_interactive` with ``prompt="none"``
371 to obtain a token.
372 """
373 if ("IDENTITY_ENDPOINT" in os.environ and "IDENTITY_HEADER" in os.environ
374 and "IDENTITY_SERVER_THUMBPRINT" in os.environ
375 ):
376 return SERVICE_FABRIC
377 if "IDENTITY_ENDPOINT" in os.environ and "IDENTITY_HEADER" in os.environ:
378 return APP_SERVICE
379 if "MSI_ENDPOINT" in os.environ and "MSI_SECRET" in os.environ:
380 return MACHINE_LEARNING
381 if _get_arc_endpoint():
382 return AZURE_ARC
383 if _is_running_in_cloud_shell():
384 return CLOUD_SHELL
385 return DEFAULT_TO_VM
386
387
388def _obtain_token(http_client, managed_identity, resource):
389 # A unified low-level API that talks to different Managed Identity
390 if ("IDENTITY_ENDPOINT" in os.environ and "IDENTITY_HEADER" in os.environ
391 and "IDENTITY_SERVER_THUMBPRINT" in os.environ
392 ):
393 if managed_identity:
394 logger.debug(
395 "Ignoring managed_identity parameter. "
396 "Managed Identity in Service Fabric is configured in the cluster, "
397 "not during runtime. See also "
398 "https://learn.microsoft.com/en-us/azure/service-fabric/configure-existing-cluster-enable-managed-identity-token-service")
399 return _obtain_token_on_service_fabric(
400 http_client,
401 os.environ["IDENTITY_ENDPOINT"],
402 os.environ["IDENTITY_HEADER"],
403 os.environ["IDENTITY_SERVER_THUMBPRINT"],
404 resource,
405 )
406 if "IDENTITY_ENDPOINT" in os.environ and "IDENTITY_HEADER" in os.environ:
407 return _obtain_token_on_app_service(
408 http_client,
409 os.environ["IDENTITY_ENDPOINT"],
410 os.environ["IDENTITY_HEADER"],
411 managed_identity,
412 resource,
413 )
414 if "MSI_ENDPOINT" in os.environ and "MSI_SECRET" in os.environ:
415 # Back ported from https://github.com/Azure/azure-sdk-for-python/blob/azure-identity_1.15.0/sdk/identity/azure-identity/azure/identity/_credentials/azure_ml.py
416 return _obtain_token_on_machine_learning(
417 http_client,
418 os.environ["MSI_ENDPOINT"],
419 os.environ["MSI_SECRET"],
420 managed_identity,
421 resource,
422 )
423 arc_endpoint = _get_arc_endpoint()
424 if arc_endpoint:
425 if ManagedIdentity.is_user_assigned(managed_identity):
426 raise ManagedIdentityError( # Note: Azure Identity for Python raised exception too
427 "Invalid managed_identity parameter. "
428 "Azure Arc supports only system-assigned managed identity, "
429 "See also "
430 "https://learn.microsoft.com/en-us/azure/service-fabric/configure-existing-cluster-enable-managed-identity-token-service")
431 return _obtain_token_on_arc(http_client, arc_endpoint, resource)
432 return _obtain_token_on_azure_vm(http_client, managed_identity, resource)
433
434
435def _adjust_param(params, managed_identity, types_mapping=None):
436 # Modify the params dict in place
437 id_name = (types_mapping or ManagedIdentity._types_mapping).get(
438 managed_identity.get(ManagedIdentity.ID_TYPE))
439 if id_name:
440 params[id_name] = managed_identity[ManagedIdentity.ID]
441
442def _obtain_token_on_azure_vm(http_client, managed_identity, resource):
443 # Based on https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/how-to-use-vm-token#get-a-token-using-http
444 logger.debug("Obtaining token via managed identity on Azure VM")
445 params = {
446 "api-version": "2018-02-01",
447 "resource": resource,
448 }
449 _adjust_param(params, managed_identity)
450 resp = http_client.get(
451 "http://169.254.169.254/metadata/identity/oauth2/token",
452 params=params,
453 headers={"Metadata": "true"},
454 )
455 try:
456 payload = json.loads(resp.text)
457 if payload.get("access_token") and payload.get("expires_in"):
458 return { # Normalizing the payload into OAuth2 format
459 "access_token": payload["access_token"],
460 "expires_in": int(payload["expires_in"]),
461 "resource": payload.get("resource"),
462 "token_type": payload.get("token_type", "Bearer"),
463 }
464 return payload # It would be {"error": ..., "error_description": ...} according to https://learn.microsoft.com/en-us/entra/identity/managed-identities-azure-resources/how-to-use-vm-token#error-handling
465 except json.decoder.JSONDecodeError:
466 logger.debug("IMDS emits unexpected payload: %s", resp.text)
467 raise
468
469def _obtain_token_on_app_service(
470 http_client, endpoint, identity_header, managed_identity, resource,
471):
472 """Obtains token for
473 `App Service <https://learn.microsoft.com/en-us/azure/app-service/overview-managed-identity?tabs=portal%2Chttp#rest-endpoint-reference>`_,
474 Azure Functions, and Azure Automation.
475 """
476 # Prerequisite: Create your app service https://docs.microsoft.com/en-us/azure/app-service/quickstart-python
477 # Assign it a managed identity https://docs.microsoft.com/en-us/azure/app-service/overview-managed-identity?tabs=portal%2Chttp
478 # SSH into your container for testing https://docs.microsoft.com/en-us/azure/app-service/configure-linux-open-ssh-session
479 logger.debug("Obtaining token via managed identity on Azure App Service")
480 params = {
481 "api-version": "2019-08-01",
482 "resource": resource,
483 }
484 _adjust_param(params, managed_identity, types_mapping={
485 ManagedIdentity.CLIENT_ID: "client_id",
486 ManagedIdentity.RESOURCE_ID: "mi_res_id", # App Service's resource id uses "mi_res_id"
487 ManagedIdentity.OBJECT_ID: "object_id",
488 })
489
490 resp = http_client.get(
491 endpoint,
492 params=params,
493 headers={
494 "X-IDENTITY-HEADER": identity_header,
495 "Metadata": "true", # Unnecessary yet harmless for App Service,
496 # It will be needed by Azure Automation
497 # https://docs.microsoft.com/en-us/azure/automation/enable-managed-identity-for-automation#get-access-token-for-system-assigned-managed-identity-using-http-get
498 },
499 )
500 try:
501 payload = json.loads(resp.text)
502 if payload.get("access_token") and payload.get("expires_on"):
503 return { # Normalizing the payload into OAuth2 format
504 "access_token": payload["access_token"],
505 "expires_in": int(payload["expires_on"]) - int(time.time()),
506 "resource": payload.get("resource"),
507 "token_type": payload.get("token_type", "Bearer"),
508 }
509 return {
510 "error": "invalid_scope", # Empirically, wrong resource ends up with a vague statusCode=500
511 "error_description": "{}, {}".format(
512 payload.get("statusCode"), payload.get("message")),
513 }
514 except json.decoder.JSONDecodeError:
515 logger.debug("IMDS emits unexpected payload: %s", resp.text)
516 raise
517
518def _obtain_token_on_machine_learning(
519 http_client, endpoint, secret, managed_identity, resource,
520):
521 # Could not find protocol docs from https://docs.microsoft.com/en-us/azure/machine-learning
522 # The following implementation is back ported from Azure Identity 1.15.0
523 logger.debug("Obtaining token via managed identity on Azure Machine Learning")
524 params = {"api-version": "2017-09-01", "resource": resource}
525 _adjust_param(params, managed_identity)
526 if params["api-version"] == "2017-09-01" and "client_id" in params:
527 # Workaround for a known bug in Azure ML 2017 API
528 params["clientid"] = params.pop("client_id")
529 resp = http_client.get(
530 endpoint,
531 params=params,
532 headers={"secret": secret},
533 )
534 try:
535 payload = json.loads(resp.text)
536 if payload.get("access_token") and payload.get("expires_on"):
537 return { # Normalizing the payload into OAuth2 format
538 "access_token": payload["access_token"],
539 "expires_in": int(payload["expires_on"]) - int(time.time()),
540 "resource": payload.get("resource"),
541 "token_type": payload.get("token_type", "Bearer"),
542 }
543 return {
544 "error": "invalid_scope", # TODO: To be tested
545 "error_description": "{}".format(payload),
546 }
547 except json.decoder.JSONDecodeError:
548 logger.debug("IMDS emits unexpected payload: %s", resp.text)
549 raise
550
551
552def _obtain_token_on_service_fabric(
553 http_client, endpoint, identity_header, server_thumbprint, resource,
554):
555 """Obtains token for
556 `Service Fabric <https://learn.microsoft.com/en-us/azure/service-fabric/>`_
557 """
558 # Deployment https://learn.microsoft.com/en-us/azure/service-fabric/service-fabric-get-started-containers-linux
559 # See also https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/identity/azure-identity/tests/managed-identity-live/service-fabric/service_fabric.md
560 # Protocol https://learn.microsoft.com/en-us/azure/service-fabric/how-to-managed-identity-service-fabric-app-code#acquiring-an-access-token-using-rest-api
561 logger.debug("Obtaining token via managed identity on Azure Service Fabric")
562 resp = http_client.get(
563 endpoint,
564 params={"api-version": "2019-07-01-preview", "resource": resource},
565 headers={"Secret": identity_header},
566 )
567 try:
568 payload = json.loads(resp.text)
569 if payload.get("access_token") and payload.get("expires_on"):
570 return { # Normalizing the payload into OAuth2 format
571 "access_token": payload["access_token"],
572 "expires_in": int( # Despite the example in docs shows an integer,
573 payload["expires_on"] # Azure SDK team's test obtained a string.
574 ) - int(time.time()),
575 "resource": payload.get("resource"),
576 "token_type": payload["token_type"],
577 }
578 error = payload.get("error", {}) # https://learn.microsoft.com/en-us/azure/service-fabric/how-to-managed-identity-service-fabric-app-code#error-handling
579 error_mapping = { # Map Service Fabric errors into OAuth2 errors https://www.rfc-editor.org/rfc/rfc6749#section-5.2
580 "SecretHeaderNotFound": "unauthorized_client",
581 "ManagedIdentityNotFound": "invalid_client",
582 "ArgumentNullOrEmpty": "invalid_scope",
583 }
584 return {
585 "error": error_mapping.get(payload["error"]["code"], "invalid_request"),
586 "error_description": resp.text,
587 }
588 except json.decoder.JSONDecodeError:
589 logger.debug("IMDS emits unexpected payload: %s", resp.text)
590 raise
591
592
593_supported_arc_platforms_and_their_prefixes = {
594 "linux": "/var/opt/azcmagent/tokens",
595 "win32": os.path.expandvars(r"%ProgramData%\AzureConnectedMachineAgent\Tokens"),
596}
597
598class ArcPlatformNotSupportedError(ManagedIdentityError):
599 pass
600
601def _obtain_token_on_arc(http_client, endpoint, resource):
602 # https://learn.microsoft.com/en-us/azure/azure-arc/servers/managed-identity-authentication
603 logger.debug("Obtaining token via managed identity on Azure Arc")
604 resp = http_client.get(
605 endpoint,
606 params={"api-version": "2020-06-01", "resource": resource},
607 headers={"Metadata": "true"},
608 )
609 www_auth = "www-authenticate" # Header in lower case
610 challenge = {
611 # Normalized to lowercase, because header names are case-insensitive
612 # https://datatracker.ietf.org/doc/html/rfc7230#section-3.2
613 k.lower(): v for k, v in resp.headers.items() if k.lower() == www_auth
614 }.get(www_auth, "").split("=") # Output will be ["Basic realm", "content"]
615 if not ( # https://datatracker.ietf.org/doc/html/rfc7617#section-2
616 len(challenge) == 2 and challenge[0].lower() == "basic realm"):
617 raise ManagedIdentityError(
618 "Unrecognizable WWW-Authenticate header: {}".format(resp.headers))
619 if sys.platform not in _supported_arc_platforms_and_their_prefixes:
620 raise ArcPlatformNotSupportedError(
621 f"Platform {sys.platform} was undefined and unsupported")
622 filename = os.path.join(
623 # This algorithm is documented in an internal doc https://msazure.visualstudio.com/One/_wiki/wikis/One.wiki/233012/VM-Extension-Authoring-for-Arc?anchor=2.-obtaining-tokens
624 _supported_arc_platforms_and_their_prefixes[sys.platform],
625 os.path.splitext(os.path.basename(challenge[1]))[0] + ".key")
626 if os.stat(filename).st_size > 4096: # Check size BEFORE loading its content
627 raise ManagedIdentityError("Local key file shall not be larger than 4KB")
628 with open(filename) as f:
629 secret = f.read()
630 response = http_client.get(
631 endpoint,
632 params={"api-version": "2020-06-01", "resource": resource},
633 headers={"Metadata": "true", "Authorization": "Basic {}".format(secret)},
634 )
635 try:
636 payload = json.loads(response.text)
637 if payload.get("access_token") and payload.get("expires_in"):
638 # Example: https://learn.microsoft.com/en-us/azure/azure-arc/servers/media/managed-identity-authentication/bash-token-output-example.png
639 return {
640 "access_token": payload["access_token"],
641 "expires_in": int(payload["expires_in"]),
642 "token_type": payload.get("token_type", "Bearer"),
643 "resource": payload.get("resource"),
644 }
645 except json.decoder.JSONDecodeError:
646 pass
647 return {
648 "error": "invalid_request",
649 "error_description": response.text,
650 }