1# Copyright (c) Microsoft Corporation.
2# All rights reserved.
3#
4# This code is licensed under the MIT License.
5import hashlib
6import json
7import logging
8import os
9import sys
10import time
11import uuid
12from urllib.parse import urlparse # Python 3+
13from collections import UserDict # Python 3+
14from typing import List, Optional, Union # Needed in Python 3.7 & 3.8
15from .token_cache import TokenCache
16from .individual_cache import _IndividualCache as IndividualCache
17from .throttled_http_client import ThrottledHttpClientBase, RetryAfterParser
18from .cloudshell import _is_running_in_cloud_shell
19from .sku import SKU, __version__
20
21
22logger = logging.getLogger(__name__)
23
24
25class ManagedIdentityError(ValueError):
26 pass
27
28
29class ManagedIdentity(UserDict):
30 """Feed an instance of this class to :class:`msal.ManagedIdentityClient`
31 to acquire token for the specified managed identity.
32 """
33 # The key names used in config dict
34 ID_TYPE = "ManagedIdentityIdType" # Contains keyword ManagedIdentity so its json equivalent will be more readable
35 ID = "Id"
36
37 # Valid values for key ID_TYPE
38 CLIENT_ID = "ClientId"
39 RESOURCE_ID = "ResourceId"
40 OBJECT_ID = "ObjectId"
41 SYSTEM_ASSIGNED = "SystemAssigned"
42
43 _types_mapping = { # Maps type name in configuration to type name on wire
44 CLIENT_ID: "client_id",
45 RESOURCE_ID: "msi_res_id", # VM's IMDS prefers msi_res_id https://github.com/Azure/azure-rest-api-specs/blob/dba6ed1f03bda88ac6884c0a883246446cc72495/specification/imds/data-plane/Microsoft.InstanceMetadataService/stable/2018-10-01/imds.json#L233-L239
46 OBJECT_ID: "object_id",
47 }
48
49 @classmethod
50 def is_managed_identity(cls, unknown):
51 return (isinstance(unknown, ManagedIdentity)
52 or cls.is_system_assigned(unknown)
53 or cls.is_user_assigned(unknown))
54
55 @classmethod
56 def is_system_assigned(cls, unknown):
57 return isinstance(unknown, SystemAssignedManagedIdentity) or (
58 isinstance(unknown, dict)
59 and unknown.get(cls.ID_TYPE) == cls.SYSTEM_ASSIGNED)
60
61 @classmethod
62 def is_user_assigned(cls, unknown):
63 return isinstance(unknown, UserAssignedManagedIdentity) or (
64 isinstance(unknown, dict)
65 and unknown.get(cls.ID_TYPE) in cls._types_mapping
66 and unknown.get(cls.ID))
67
68 def __init__(self, identifier=None, id_type=None):
69 # Undocumented. Use subclasses instead.
70 super(ManagedIdentity, self).__init__({
71 self.ID_TYPE: id_type,
72 self.ID: identifier,
73 })
74
75
76class SystemAssignedManagedIdentity(ManagedIdentity):
77 """Represent a system-assigned managed identity.
78
79 It is equivalent to a Python dict of::
80
81 {"ManagedIdentityIdType": "SystemAssigned", "Id": None}
82
83 or a JSON blob of::
84
85 {"ManagedIdentityIdType": "SystemAssigned", "Id": null}
86 """
87 def __init__(self):
88 super(SystemAssignedManagedIdentity, self).__init__(id_type=self.SYSTEM_ASSIGNED)
89
90
91class UserAssignedManagedIdentity(ManagedIdentity):
92 """Represent a user-assigned managed identity.
93
94 Depends on the id you provided, the outcome is equivalent to one of the below::
95
96 {"ManagedIdentityIdType": "ClientId", "Id": "foo"}
97 {"ManagedIdentityIdType": "ResourceId", "Id": "foo"}
98 {"ManagedIdentityIdType": "ObjectId", "Id": "foo"}
99 """
100 def __init__(self, *, client_id=None, resource_id=None, object_id=None):
101 if client_id and not resource_id and not object_id:
102 super(UserAssignedManagedIdentity, self).__init__(
103 id_type=self.CLIENT_ID, identifier=client_id)
104 elif not client_id and resource_id and not object_id:
105 super(UserAssignedManagedIdentity, self).__init__(
106 id_type=self.RESOURCE_ID, identifier=resource_id)
107 elif not client_id and not resource_id and object_id:
108 super(UserAssignedManagedIdentity, self).__init__(
109 id_type=self.OBJECT_ID, identifier=object_id)
110 else:
111 raise ManagedIdentityError(
112 "You shall specify one of the three parameters: "
113 "client_id, resource_id, object_id")
114
115
116class _ThrottledHttpClient(ThrottledHttpClientBase):
117 def __init__(self, *args, **kwargs):
118 super(_ThrottledHttpClient, self).__init__(*args, **kwargs)
119 self.get = IndividualCache( # All MIs (except Cloud Shell) use GETs
120 mapping=self._expiring_mapping,
121 key_maker=lambda func, args, kwargs: "REQ {} hash={} 429/5xx/Retry-After".format(
122 args[0], # It is the endpoint, typically a constant per MI type
123 self._hash(
124 # Managed Identity flavors have inconsistent parameters.
125 # We simply choose to hash them all.
126 str(kwargs.get("params")) + str(kwargs.get("data"))),
127 ),
128 expires_in=RetryAfterParser(5).parse, # 5 seconds default for non-PCA
129 )(self.get) # Note: Decorate the parent get(), not the http_client.get()
130
131
132class ManagedIdentityClient(object):
133 """This API encapsulates multiple managed identity back-ends:
134 VM, App Service, Azure Automation (Runbooks), Azure Function, Service Fabric,
135 and Azure Arc.
136
137 It also provides token cache support.
138
139 .. note::
140
141 Cloud Shell support is NOT implemented in this class.
142 Since MSAL Python 1.18 in May 2022, it has been implemented in
143 :func:`PublicClientApplication.acquire_token_interactive` via calling pattern
144 ``PublicClientApplication(...).acquire_token_interactive(scopes=[...], prompt="none")``.
145 That is appropriate, because Cloud Shell yields a token with
146 delegated permissions for the end user who has signed in to the Azure Portal
147 (like what a ``PublicClientApplication`` does),
148 not a token with application permissions for an app.
149 """
150 __instance = "localhost" # We used to get this value from socket.getfqdn()
151 # but it is unreliable because getfqdn() either hangs or returns empty value
152 # on some misconfigured machines
153 _tenant = "managed_identity"
154 _TOKEN_SOURCE = "token_source"
155 _TOKEN_SOURCE_IDP = "identity_provider"
156 _TOKEN_SOURCE_CACHE = "cache"
157
158 def __init__(
159 self,
160 managed_identity: Union[
161 dict,
162 ManagedIdentity, # Could use Type[ManagedIdentity] but it is deprecated in Python 3.9+
163 SystemAssignedManagedIdentity,
164 UserAssignedManagedIdentity,
165 ],
166 *,
167 http_client,
168 token_cache=None,
169 http_cache=None,
170 client_capabilities: Optional[List[str]] = None,
171 ):
172 """Create a managed identity client.
173
174 :param managed_identity:
175 It accepts an instance of :class:`SystemAssignedManagedIdentity`
176 or :class:`UserAssignedManagedIdentity`.
177 They are equivalent to a dict with a certain shape,
178 which may be loaded from a JSON configuration file or an env var.
179
180 :param http_client:
181 An http client object. For example, you can use ``requests.Session()``,
182 optionally with exponential backoff behavior demonstrated in this recipe::
183
184 import msal, requests
185 from requests.adapters import HTTPAdapter, Retry
186 s = requests.Session()
187 retries = Retry(total=3, backoff_factor=0.1, status_forcelist=[
188 429, 500, 501, 502, 503, 504])
189 s.mount('https://', HTTPAdapter(max_retries=retries))
190 managed_identity = ...
191 client = msal.ManagedIdentityClient(managed_identity, http_client=s)
192
193 :param token_cache:
194 Optional. It accepts a :class:`msal.TokenCache` instance to store tokens.
195 It will use an in-memory token cache by default.
196
197 :param http_cache:
198 Optional. It has the same characteristics as the
199 :paramref:`msal.ClientApplication.http_cache`.
200
201 :param list[str] client_capabilities: (optional)
202 Allows configuration of one or more client capabilities, e.g. ["CP1"].
203
204 Client capability is meant to inform the Microsoft identity platform
205 (STS) what this client is capable for,
206 so STS can decide to turn on certain features.
207
208 Implementation details:
209 Client capability in Managed Identity is relayed as-is
210 via ``xms_cc`` parameter on the wire.
211
212 Recipe 1: Hard code a managed identity for your app::
213
214 import msal, requests
215 client = msal.ManagedIdentityClient(
216 msal.UserAssignedManagedIdentity(client_id="foo"),
217 http_client=requests.Session(),
218 )
219 token = client.acquire_token_for_client("resource")
220
221 Recipe 2: Write once, run everywhere.
222 If you use different managed identity on different deployment,
223 you may use an environment variable (such as MY_MANAGED_IDENTITY_CONFIG)
224 to store a json blob like
225 ``{"ManagedIdentityIdType": "ClientId", "Id": "foo"}`` or
226 ``{"ManagedIdentityIdType": "SystemAssigned", "Id": null}``.
227 The following app can load managed identity configuration dynamically::
228
229 import json, os, msal, requests
230 config = os.getenv("MY_MANAGED_IDENTITY_CONFIG")
231 assert config, "An ENV VAR with value should exist"
232 client = msal.ManagedIdentityClient(
233 json.loads(config),
234 http_client=requests.Session(),
235 )
236 token = client.acquire_token_for_client("resource")
237 """
238 if not ManagedIdentity.is_managed_identity(managed_identity):
239 raise ManagedIdentityError(
240 f"Incorrect managed_identity: {managed_identity}")
241 self._managed_identity = managed_identity
242 self._http_client = _ThrottledHttpClient(
243 # This class only throttles excess token acquisition requests.
244 # It does not provide retry.
245 # Retry is the http_client or caller's responsibility, not MSAL's.
246 #
247 # FWIW, here is the inconsistent retry recommendation.
248 # 1. Only MI on VM defines exotic 404 and 410 retry recommendations
249 # ( https://learn.microsoft.com/en-us/entra/identity/managed-identities-azure-resources/how-to-use-vm-token#error-handling )
250 # (especially for 410 which was supposed to be a permanent failure).
251 # 2. MI on Service Fabric specifically suggests to not retry on 404.
252 # ( https://learn.microsoft.com/en-us/azure/service-fabric/how-to-managed-cluster-managed-identity-service-fabric-app-code#error-handling )
253 http_client,
254 http_cache=http_cache,
255 )
256 self._token_cache = token_cache or TokenCache()
257 self._client_capabilities = client_capabilities
258
259 def acquire_token_for_client(
260 self,
261 *,
262 resource: str, # If/when we support scope, resource will become optional
263 claims_challenge: Optional[str] = None,
264 ):
265 """Acquire token for the managed identity.
266
267 The result will be automatically cached.
268 Subsequent calls will automatically search from cache first.
269
270 :param resource: The resource for which the token is acquired.
271
272 :param claims_challenge:
273 Optional.
274 It is a string representation of a JSON object
275 (which contains lists of claims being requested).
276
277 The tenant admin may choose to revoke all Managed Identity tokens,
278 and then a *claims challenge* will be returned by the target resource,
279 as a `claims_challenge` directive in the `www-authenticate` header,
280 even if the app developer did not opt in for the "CP1" client capability.
281 Upon receiving a `claims_challenge`, MSAL will attempt to acquire a new token.
282
283 .. note::
284
285 Known issue: When an Azure VM has only one user-assigned managed identity,
286 and your app specifies to use system-assigned managed identity,
287 Azure VM may still return a token for your user-assigned identity.
288
289 This is a service-side behavior that cannot be changed by this library.
290 `Azure VM docs <https://learn.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/how-to-use-vm-token#get-a-token-using-http>`_
291 """
292 access_token_to_refresh = None # This could become a public parameter in the future
293 access_token_from_cache = None
294 client_id_in_cache = self._managed_identity.get(
295 ManagedIdentity.ID, "SYSTEM_ASSIGNED_MANAGED_IDENTITY")
296 now = time.time()
297 if True: # Attempt cache search even if receiving claims_challenge,
298 # because we want to locate the existing token (if any) and refresh it
299 matches = self._token_cache.search(
300 self._token_cache.CredentialType.ACCESS_TOKEN,
301 target=[resource],
302 query=dict(
303 client_id=client_id_in_cache,
304 environment=self.__instance,
305 realm=self._tenant,
306 home_account_id=None,
307 ),
308 )
309 for entry in matches:
310 expires_in = int(entry["expires_on"]) - now
311 if expires_in < 5*60: # Then consider it expired
312 continue # Removal is not necessary, it will be overwritten
313 if claims_challenge and not access_token_to_refresh:
314 # Since caller did not pinpoint the token causing claims challenge,
315 # we have to assume it is the first token we found in cache.
316 access_token_to_refresh = entry["secret"]
317 break
318 logger.debug("Cache hit an AT")
319 access_token_from_cache = { # Mimic a real response
320 "access_token": entry["secret"],
321 "token_type": entry.get("token_type", "Bearer"),
322 "expires_in": int(expires_in), # OAuth2 specs defines it as int
323 self._TOKEN_SOURCE: self._TOKEN_SOURCE_CACHE,
324 }
325 if "refresh_on" in entry:
326 access_token_from_cache["refresh_on"] = int(entry["refresh_on"])
327 if int(entry["refresh_on"]) < now: # aging
328 break # With a fallback in hand, we break here to go refresh
329 return access_token_from_cache # It is still good as new
330 try:
331 result = _obtain_token(
332 self._http_client, self._managed_identity, resource,
333 access_token_sha256_to_refresh=hashlib.sha256(
334 access_token_to_refresh.encode("utf-8")).hexdigest()
335 if access_token_to_refresh else None,
336 client_capabilities=self._client_capabilities,
337 )
338 if "access_token" in result:
339 expires_in = result.get("expires_in", 3600)
340 if "refresh_in" not in result and expires_in >= 7200:
341 result["refresh_in"] = int(expires_in / 2)
342 self._token_cache.add(dict(
343 client_id=client_id_in_cache,
344 scope=[resource],
345 token_endpoint="https://{}/{}".format(
346 self.__instance, self._tenant),
347 response=result,
348 params={},
349 data={},
350 ))
351 if "refresh_in" in result:
352 result["refresh_on"] = int(now + result["refresh_in"])
353 result[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP
354 if (result and "error" not in result) or (not access_token_from_cache):
355 return result
356 except: # The exact HTTP exception is transportation-layer dependent
357 # Typically network error. Potential AAD outage?
358 if not access_token_from_cache: # It means there is no fall back option
359 raise # We choose to bubble up the exception
360 return access_token_from_cache
361
362
363def _scope_to_resource(scope): # This is an experimental reasonable-effort approach
364 u = urlparse(scope)
365 if u.scheme:
366 return "{}://{}".format(u.scheme, u.netloc)
367 return scope # There is no much else we can do here
368
369
370def _get_arc_endpoint():
371 if "IDENTITY_ENDPOINT" in os.environ and "IMDS_ENDPOINT" in os.environ:
372 return os.environ["IDENTITY_ENDPOINT"]
373 if ( # Defined in https://eng.ms/docs/cloud-ai-platform/azure-core/azure-management-and-platforms/control-plane-bburns/hybrid-resource-provider/azure-arc-for-servers/specs/extension_authoring
374 sys.platform == "linux" and os.path.exists("/opt/azcmagent/bin/himds")
375 or sys.platform == "win32" and os.path.exists(os.path.expandvars(
376 # Avoid Windows-only "%EnvVar%" syntax so that tests can be run on Linux
377 r"${ProgramFiles}\AzureConnectedMachineAgent\himds.exe"
378 ))
379 ):
380 return "http://localhost:40342/metadata/identity/oauth2/token"
381
382
383APP_SERVICE = object()
384AZURE_ARC = object()
385CLOUD_SHELL = object() # In MSAL Python, token acquisition was done by
386 # PublicClientApplication(...).acquire_token_interactive(..., prompt="none")
387MACHINE_LEARNING = object()
388SERVICE_FABRIC = object()
389DEFAULT_TO_VM = object() # Unknown environment; default to VM; you may want to probe
390def get_managed_identity_source():
391 """Detect the current environment and return the likely identity source.
392
393 When this function returns ``CLOUD_SHELL``, you should use
394 :func:`msal.PublicClientApplication.acquire_token_interactive` with ``prompt="none"``
395 to obtain a token.
396 """
397 if ("IDENTITY_ENDPOINT" in os.environ and "IDENTITY_HEADER" in os.environ
398 and "IDENTITY_SERVER_THUMBPRINT" in os.environ
399 ):
400 return SERVICE_FABRIC
401 if "IDENTITY_ENDPOINT" in os.environ and "IDENTITY_HEADER" in os.environ:
402 return APP_SERVICE
403 if "MSI_ENDPOINT" in os.environ and "MSI_SECRET" in os.environ:
404 return MACHINE_LEARNING
405 if _get_arc_endpoint():
406 return AZURE_ARC
407 if _is_running_in_cloud_shell():
408 return CLOUD_SHELL
409 return DEFAULT_TO_VM
410
411
412def _obtain_token(
413 http_client, managed_identity, resource,
414 *,
415 access_token_sha256_to_refresh: Optional[str] = None,
416 client_capabilities: Optional[List[str]] = None,
417):
418 if ("IDENTITY_ENDPOINT" in os.environ and "IDENTITY_HEADER" in os.environ
419 and "IDENTITY_SERVER_THUMBPRINT" in os.environ
420 ):
421 if managed_identity:
422 logger.debug(
423 "Ignoring managed_identity parameter. "
424 "Managed Identity in Service Fabric is configured in the cluster, "
425 "not during runtime. See also "
426 "https://learn.microsoft.com/en-us/azure/service-fabric/configure-existing-cluster-enable-managed-identity-token-service")
427 return _obtain_token_on_service_fabric(
428 http_client,
429 os.environ["IDENTITY_ENDPOINT"],
430 os.environ["IDENTITY_HEADER"],
431 os.environ["IDENTITY_SERVER_THUMBPRINT"],
432 resource,
433 access_token_sha256_to_refresh=access_token_sha256_to_refresh,
434 client_capabilities=client_capabilities,
435 )
436 if "IDENTITY_ENDPOINT" in os.environ and "IDENTITY_HEADER" in os.environ:
437 return _obtain_token_on_app_service(
438 http_client,
439 os.environ["IDENTITY_ENDPOINT"],
440 os.environ["IDENTITY_HEADER"],
441 managed_identity,
442 resource,
443 )
444 if "MSI_ENDPOINT" in os.environ and "MSI_SECRET" in os.environ:
445 # Back ported from https://github.com/Azure/azure-sdk-for-python/blob/azure-identity_1.15.0/sdk/identity/azure-identity/azure/identity/_credentials/azure_ml.py
446 return _obtain_token_on_machine_learning(
447 http_client,
448 os.environ["MSI_ENDPOINT"],
449 os.environ["MSI_SECRET"],
450 managed_identity,
451 resource,
452 )
453 arc_endpoint = _get_arc_endpoint()
454 if arc_endpoint:
455 if ManagedIdentity.is_user_assigned(managed_identity):
456 raise ManagedIdentityError( # Note: Azure Identity for Python raised exception too
457 "Invalid managed_identity parameter. "
458 "Azure Arc supports only system-assigned managed identity, "
459 "See also "
460 "https://learn.microsoft.com/en-us/azure/service-fabric/configure-existing-cluster-enable-managed-identity-token-service")
461 return _obtain_token_on_arc(http_client, arc_endpoint, resource)
462 return _obtain_token_on_azure_vm(http_client, managed_identity, resource)
463
464
465def _adjust_param(params, managed_identity, types_mapping=None):
466 # Modify the params dict in place
467 id_name = (types_mapping or ManagedIdentity._types_mapping).get(
468 managed_identity.get(ManagedIdentity.ID_TYPE))
469 if id_name:
470 params[id_name] = managed_identity[ManagedIdentity.ID]
471
472def _obtain_token_on_azure_vm(http_client, managed_identity, resource):
473 # Based on https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/how-to-use-vm-token#get-a-token-using-http
474 logger.debug("Obtaining token via managed identity on Azure VM")
475 params = {
476 "api-version": "2018-02-01",
477 "resource": resource,
478 }
479 _adjust_param(params, managed_identity)
480 resp = http_client.get(
481 os.getenv(
482 "AZURE_POD_IDENTITY_AUTHORITY_HOST", "http://169.254.169.254"
483 ).strip("/") + "/metadata/identity/oauth2/token",
484 params=params,
485 headers={
486 "Metadata": "true",
487 "x-client-SKU": SKU,
488 "x-client-Ver": __version__,
489 "x-ms-client-request-id": str(uuid.uuid4()),
490 },
491 )
492 try:
493 payload = json.loads(resp.text)
494 if payload.get("access_token") and payload.get("expires_in"):
495 return { # Normalizing the payload into OAuth2 format
496 "access_token": payload["access_token"],
497 "expires_in": int(payload["expires_in"]),
498 "resource": payload.get("resource"),
499 "token_type": payload.get("token_type", "Bearer"),
500 }
501 return payload # It would be {"error": ..., "error_description": ...} according to https://learn.microsoft.com/en-us/entra/identity/managed-identities-azure-resources/how-to-use-vm-token#error-handling
502 except json.decoder.JSONDecodeError:
503 logger.debug("IMDS emits unexpected payload: %s", resp.text)
504 raise
505
506def _obtain_token_on_app_service(
507 http_client, endpoint, identity_header, managed_identity, resource,
508):
509 """Obtains token for
510 `App Service <https://learn.microsoft.com/en-us/azure/app-service/overview-managed-identity?tabs=portal%2Chttp#rest-endpoint-reference>`_,
511 Azure Functions, and Azure Automation.
512 """
513 # Prerequisite: Create your app service https://docs.microsoft.com/en-us/azure/app-service/quickstart-python
514 # Assign it a managed identity https://docs.microsoft.com/en-us/azure/app-service/overview-managed-identity?tabs=portal%2Chttp
515 # SSH into your container for testing https://docs.microsoft.com/en-us/azure/app-service/configure-linux-open-ssh-session
516 logger.debug("Obtaining token via managed identity on Azure App Service")
517 params = {
518 "api-version": "2019-08-01",
519 "resource": resource,
520 }
521 _adjust_param(params, managed_identity, types_mapping={
522 ManagedIdentity.CLIENT_ID: "client_id",
523 ManagedIdentity.RESOURCE_ID: "mi_res_id", # App Service's resource id uses "mi_res_id"
524 ManagedIdentity.OBJECT_ID: "object_id",
525 })
526
527 resp = http_client.get(
528 endpoint,
529 params=params,
530 headers={
531 "X-IDENTITY-HEADER": identity_header,
532 "Metadata": "true", # Unnecessary yet harmless for App Service,
533 # It will be needed by Azure Automation
534 # https://docs.microsoft.com/en-us/azure/automation/enable-managed-identity-for-automation#get-access-token-for-system-assigned-managed-identity-using-http-get
535 },
536 )
537 try:
538 payload = json.loads(resp.text)
539 if payload.get("access_token") and payload.get("expires_on"):
540 return { # Normalizing the payload into OAuth2 format
541 "access_token": payload["access_token"],
542 "expires_in": int(payload["expires_on"]) - int(time.time()),
543 "resource": payload.get("resource"),
544 "token_type": payload.get("token_type", "Bearer"),
545 }
546 return {
547 "error": "invalid_scope", # Empirically, wrong resource ends up with a vague statusCode=500
548 "error_description": "{}, {}".format(
549 payload.get("statusCode"), payload.get("message")),
550 }
551 except json.decoder.JSONDecodeError:
552 logger.debug("IMDS emits unexpected payload: %s", resp.text)
553 raise
554
555def _obtain_token_on_machine_learning(
556 http_client, endpoint, secret, managed_identity, resource,
557):
558 # Could not find protocol docs from https://docs.microsoft.com/en-us/azure/machine-learning
559 # The following implementation is back ported from Azure Identity 1.15.0
560 logger.debug("Obtaining token via managed identity on Azure Machine Learning")
561 params = {"api-version": "2017-09-01", "resource": resource}
562 _adjust_param(params, managed_identity)
563 if params["api-version"] == "2017-09-01" and "client_id" in params:
564 # Workaround for a known bug in Azure ML 2017 API
565 params["clientid"] = params.pop("client_id")
566 resp = http_client.get(
567 endpoint,
568 params=params,
569 headers={"secret": secret},
570 )
571 try:
572 payload = json.loads(resp.text)
573 if payload.get("access_token") and payload.get("expires_on"):
574 return { # Normalizing the payload into OAuth2 format
575 "access_token": payload["access_token"],
576 "expires_in": int(payload["expires_on"]) - int(time.time()),
577 "resource": payload.get("resource"),
578 "token_type": payload.get("token_type", "Bearer"),
579 }
580 return {
581 "error": "invalid_scope", # TODO: To be tested
582 "error_description": "{}".format(payload),
583 }
584 except json.decoder.JSONDecodeError:
585 logger.debug("IMDS emits unexpected payload: %s", resp.text)
586 raise
587
588
589def _obtain_token_on_service_fabric(
590 http_client, endpoint, identity_header, server_thumbprint, resource,
591 *,
592 access_token_sha256_to_refresh: str = None,
593 client_capabilities: Optional[List[str]] = None,
594):
595 """Obtains token for
596 `Service Fabric <https://learn.microsoft.com/en-us/azure/service-fabric/>`_
597 """
598 # Deployment https://learn.microsoft.com/en-us/azure/service-fabric/service-fabric-get-started-containers-linux
599 # See also https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/identity/azure-identity/tests/managed-identity-live/service-fabric/service_fabric.md
600 # Protocol https://learn.microsoft.com/en-us/azure/service-fabric/how-to-managed-identity-service-fabric-app-code#acquiring-an-access-token-using-rest-api
601 logger.debug("Obtaining token via managed identity on Azure Service Fabric")
602 resp = http_client.get(
603 endpoint,
604 params={k: v for k, v in {
605 "api-version": "2019-07-01-preview",
606 "resource": resource,
607 "token_sha256_to_refresh": access_token_sha256_to_refresh,
608 "xms_cc": ",".join(client_capabilities) if client_capabilities else None,
609 }.items() if v is not None},
610 headers={"Secret": identity_header},
611 )
612 try:
613 payload = json.loads(resp.text)
614 if payload.get("access_token") and payload.get("expires_on"):
615 return { # Normalizing the payload into OAuth2 format
616 "access_token": payload["access_token"],
617 "expires_in": int( # Despite the example in docs shows an integer,
618 payload["expires_on"] # Azure SDK team's test obtained a string.
619 ) - int(time.time()),
620 "resource": payload.get("resource"),
621 "token_type": payload["token_type"],
622 }
623 error = payload.get("error", {}) # https://learn.microsoft.com/en-us/azure/service-fabric/how-to-managed-identity-service-fabric-app-code#error-handling
624 error_mapping = { # Map Service Fabric errors into OAuth2 errors https://www.rfc-editor.org/rfc/rfc6749#section-5.2
625 "SecretHeaderNotFound": "unauthorized_client",
626 "ManagedIdentityNotFound": "invalid_client",
627 "ArgumentNullOrEmpty": "invalid_scope",
628 }
629 return {
630 "error": error_mapping.get(error.get("code"), "invalid_request"),
631 "error_description": resp.text,
632 }
633 except json.decoder.JSONDecodeError:
634 logger.debug("IMDS emits unexpected payload: %s", resp.text)
635 raise
636
637
638_supported_arc_platforms_and_their_prefixes = {
639 "linux": "/var/opt/azcmagent/tokens",
640 "win32": os.path.expandvars(r"%ProgramData%\AzureConnectedMachineAgent\Tokens"),
641}
642
643class ArcPlatformNotSupportedError(ManagedIdentityError):
644 pass
645
646def _obtain_token_on_arc(http_client, endpoint, resource):
647 # https://learn.microsoft.com/en-us/azure/azure-arc/servers/managed-identity-authentication
648 logger.debug("Obtaining token via managed identity on Azure Arc")
649 resp = http_client.get(
650 endpoint,
651 params={"api-version": "2020-06-01", "resource": resource},
652 headers={"Metadata": "true"},
653 )
654 www_auth = "www-authenticate" # Header in lower case
655 challenge = {
656 # Normalized to lowercase, because header names are case-insensitive
657 # https://datatracker.ietf.org/doc/html/rfc7230#section-3.2
658 k.lower(): v for k, v in resp.headers.items() if k.lower() == www_auth
659 }.get(www_auth, "").split("=") # Output will be ["Basic realm", "content"]
660 if not ( # https://datatracker.ietf.org/doc/html/rfc7617#section-2
661 len(challenge) == 2 and challenge[0].lower() == "basic realm"):
662 raise ManagedIdentityError(
663 "Unrecognizable WWW-Authenticate header: {}".format(resp.headers))
664 if sys.platform not in _supported_arc_platforms_and_their_prefixes:
665 raise ArcPlatformNotSupportedError(
666 f"Platform {sys.platform} was undefined and unsupported")
667 filename = os.path.join(
668 # This algorithm is documented in an internal doc https://msazure.visualstudio.com/One/_wiki/wikis/One.wiki/233012/VM-Extension-Authoring-for-Arc?anchor=2.-obtaining-tokens
669 _supported_arc_platforms_and_their_prefixes[sys.platform],
670 os.path.splitext(os.path.basename(challenge[1]))[0] + ".key")
671 if os.stat(filename).st_size > 4096: # Check size BEFORE loading its content
672 raise ManagedIdentityError("Local key file shall not be larger than 4KB")
673 with open(filename) as f:
674 secret = f.read()
675 response = http_client.get(
676 endpoint,
677 params={"api-version": "2020-06-01", "resource": resource},
678 headers={"Metadata": "true", "Authorization": "Basic {}".format(secret)},
679 )
680 try:
681 payload = json.loads(response.text)
682 if payload.get("access_token") and payload.get("expires_in"):
683 # Example: https://learn.microsoft.com/en-us/azure/azure-arc/servers/media/managed-identity-authentication/bash-token-output-example.png
684 return {
685 "access_token": payload["access_token"],
686 "expires_in": int(payload["expires_in"]),
687 "token_type": payload.get("token_type", "Bearer"),
688 "resource": payload.get("resource"),
689 }
690 except json.decoder.JSONDecodeError:
691 pass
692 return {
693 "error": "invalid_request",
694 "error_description": response.text,
695 }