1# Copyright (c) Microsoft Corporation.
2# All rights reserved.
3#
4# This code is licensed under the MIT License.
5import hashlib
6import json
7import logging
8import os
9import sys
10import time
11from urllib.parse import urlparse # Python 3+
12from collections import UserDict # Python 3+
13from typing import List, Optional, Union # Needed in Python 3.7 & 3.8
14from .token_cache import TokenCache
15from .individual_cache import _IndividualCache as IndividualCache
16from .throttled_http_client import ThrottledHttpClientBase, RetryAfterParser
17from .cloudshell import _is_running_in_cloud_shell
18
19
20logger = logging.getLogger(__name__)
21
22
23class ManagedIdentityError(ValueError):
24 pass
25
26
27class ManagedIdentity(UserDict):
28 """Feed an instance of this class to :class:`msal.ManagedIdentityClient`
29 to acquire token for the specified managed identity.
30 """
31 # The key names used in config dict
32 ID_TYPE = "ManagedIdentityIdType" # Contains keyword ManagedIdentity so its json equivalent will be more readable
33 ID = "Id"
34
35 # Valid values for key ID_TYPE
36 CLIENT_ID = "ClientId"
37 RESOURCE_ID = "ResourceId"
38 OBJECT_ID = "ObjectId"
39 SYSTEM_ASSIGNED = "SystemAssigned"
40
41 _types_mapping = { # Maps type name in configuration to type name on wire
42 CLIENT_ID: "client_id",
43 RESOURCE_ID: "msi_res_id", # VM's IMDS prefers msi_res_id https://github.com/Azure/azure-rest-api-specs/blob/dba6ed1f03bda88ac6884c0a883246446cc72495/specification/imds/data-plane/Microsoft.InstanceMetadataService/stable/2018-10-01/imds.json#L233-L239
44 OBJECT_ID: "object_id",
45 }
46
47 @classmethod
48 def is_managed_identity(cls, unknown):
49 return (isinstance(unknown, ManagedIdentity)
50 or cls.is_system_assigned(unknown)
51 or cls.is_user_assigned(unknown))
52
53 @classmethod
54 def is_system_assigned(cls, unknown):
55 return isinstance(unknown, SystemAssignedManagedIdentity) or (
56 isinstance(unknown, dict)
57 and unknown.get(cls.ID_TYPE) == cls.SYSTEM_ASSIGNED)
58
59 @classmethod
60 def is_user_assigned(cls, unknown):
61 return isinstance(unknown, UserAssignedManagedIdentity) or (
62 isinstance(unknown, dict)
63 and unknown.get(cls.ID_TYPE) in cls._types_mapping
64 and unknown.get(cls.ID))
65
66 def __init__(self, identifier=None, id_type=None):
67 # Undocumented. Use subclasses instead.
68 super(ManagedIdentity, self).__init__({
69 self.ID_TYPE: id_type,
70 self.ID: identifier,
71 })
72
73
74class SystemAssignedManagedIdentity(ManagedIdentity):
75 """Represent a system-assigned managed identity.
76
77 It is equivalent to a Python dict of::
78
79 {"ManagedIdentityIdType": "SystemAssigned", "Id": None}
80
81 or a JSON blob of::
82
83 {"ManagedIdentityIdType": "SystemAssigned", "Id": null}
84 """
85 def __init__(self):
86 super(SystemAssignedManagedIdentity, self).__init__(id_type=self.SYSTEM_ASSIGNED)
87
88
89class UserAssignedManagedIdentity(ManagedIdentity):
90 """Represent a user-assigned managed identity.
91
92 Depends on the id you provided, the outcome is equivalent to one of the below::
93
94 {"ManagedIdentityIdType": "ClientId", "Id": "foo"}
95 {"ManagedIdentityIdType": "ResourceId", "Id": "foo"}
96 {"ManagedIdentityIdType": "ObjectId", "Id": "foo"}
97 """
98 def __init__(self, *, client_id=None, resource_id=None, object_id=None):
99 if client_id and not resource_id and not object_id:
100 super(UserAssignedManagedIdentity, self).__init__(
101 id_type=self.CLIENT_ID, identifier=client_id)
102 elif not client_id and resource_id and not object_id:
103 super(UserAssignedManagedIdentity, self).__init__(
104 id_type=self.RESOURCE_ID, identifier=resource_id)
105 elif not client_id and not resource_id and object_id:
106 super(UserAssignedManagedIdentity, self).__init__(
107 id_type=self.OBJECT_ID, identifier=object_id)
108 else:
109 raise ManagedIdentityError(
110 "You shall specify one of the three parameters: "
111 "client_id, resource_id, object_id")
112
113
114class _ThrottledHttpClient(ThrottledHttpClientBase):
115 def __init__(self, *args, **kwargs):
116 super(_ThrottledHttpClient, self).__init__(*args, **kwargs)
117 self.get = IndividualCache( # All MIs (except Cloud Shell) use GETs
118 mapping=self._expiring_mapping,
119 key_maker=lambda func, args, kwargs: "REQ {} hash={} 429/5xx/Retry-After".format(
120 args[0], # It is the endpoint, typically a constant per MI type
121 self._hash(
122 # Managed Identity flavors have inconsistent parameters.
123 # We simply choose to hash them all.
124 str(kwargs.get("params")) + str(kwargs.get("data"))),
125 ),
126 expires_in=RetryAfterParser(5).parse, # 5 seconds default for non-PCA
127 )(self.get) # Note: Decorate the parent get(), not the http_client.get()
128
129
130class ManagedIdentityClient(object):
131 """This API encapsulates multiple managed identity back-ends:
132 VM, App Service, Azure Automation (Runbooks), Azure Function, Service Fabric,
133 and Azure Arc.
134
135 It also provides token cache support.
136
137 .. note::
138
139 Cloud Shell support is NOT implemented in this class.
140 Since MSAL Python 1.18 in May 2022, it has been implemented in
141 :func:`PublicClientApplication.acquire_token_interactive` via calling pattern
142 ``PublicClientApplication(...).acquire_token_interactive(scopes=[...], prompt="none")``.
143 That is appropriate, because Cloud Shell yields a token with
144 delegated permissions for the end user who has signed in to the Azure Portal
145 (like what a ``PublicClientApplication`` does),
146 not a token with application permissions for an app.
147 """
148 __instance = "localhost" # We used to get this value from socket.getfqdn()
149 # but it is unreliable because getfqdn() either hangs or returns empty value
150 # on some misconfigured machines
151 _tenant = "managed_identity"
152 _TOKEN_SOURCE = "token_source"
153 _TOKEN_SOURCE_IDP = "identity_provider"
154 _TOKEN_SOURCE_CACHE = "cache"
155
156 def __init__(
157 self,
158 managed_identity: Union[
159 dict,
160 ManagedIdentity, # Could use Type[ManagedIdentity] but it is deprecated in Python 3.9+
161 SystemAssignedManagedIdentity,
162 UserAssignedManagedIdentity,
163 ],
164 *,
165 http_client,
166 token_cache=None,
167 http_cache=None,
168 client_capabilities: Optional[List[str]] = None,
169 ):
170 """Create a managed identity client.
171
172 :param managed_identity:
173 It accepts an instance of :class:`SystemAssignedManagedIdentity`
174 or :class:`UserAssignedManagedIdentity`.
175 They are equivalent to a dict with a certain shape,
176 which may be loaded from a JSON configuration file or an env var.
177
178 :param http_client:
179 An http client object. For example, you can use ``requests.Session()``,
180 optionally with exponential backoff behavior demonstrated in this recipe::
181
182 import msal, requests
183 from requests.adapters import HTTPAdapter, Retry
184 s = requests.Session()
185 retries = Retry(total=3, backoff_factor=0.1, status_forcelist=[
186 429, 500, 501, 502, 503, 504])
187 s.mount('https://', HTTPAdapter(max_retries=retries))
188 managed_identity = ...
189 client = msal.ManagedIdentityClient(managed_identity, http_client=s)
190
191 :param token_cache:
192 Optional. It accepts a :class:`msal.TokenCache` instance to store tokens.
193 It will use an in-memory token cache by default.
194
195 :param http_cache:
196 Optional. It has the same characteristics as the
197 :paramref:`msal.ClientApplication.http_cache`.
198
199 :param list[str] client_capabilities: (optional)
200 Allows configuration of one or more client capabilities, e.g. ["CP1"].
201
202 Client capability is meant to inform the Microsoft identity platform
203 (STS) what this client is capable for,
204 so STS can decide to turn on certain features.
205
206 Implementation details:
207 Client capability in Managed Identity is relayed as-is
208 via ``xms_cc`` parameter on the wire.
209
210 Recipe 1: Hard code a managed identity for your app::
211
212 import msal, requests
213 client = msal.ManagedIdentityClient(
214 msal.UserAssignedManagedIdentity(client_id="foo"),
215 http_client=requests.Session(),
216 )
217 token = client.acquire_token_for_client("resource")
218
219 Recipe 2: Write once, run everywhere.
220 If you use different managed identity on different deployment,
221 you may use an environment variable (such as MY_MANAGED_IDENTITY_CONFIG)
222 to store a json blob like
223 ``{"ManagedIdentityIdType": "ClientId", "Id": "foo"}`` or
224 ``{"ManagedIdentityIdType": "SystemAssigned", "Id": null}``.
225 The following app can load managed identity configuration dynamically::
226
227 import json, os, msal, requests
228 config = os.getenv("MY_MANAGED_IDENTITY_CONFIG")
229 assert config, "An ENV VAR with value should exist"
230 client = msal.ManagedIdentityClient(
231 json.loads(config),
232 http_client=requests.Session(),
233 )
234 token = client.acquire_token_for_client("resource")
235 """
236 if not ManagedIdentity.is_managed_identity(managed_identity):
237 raise ManagedIdentityError(
238 f"Incorrect managed_identity: {managed_identity}")
239 self._managed_identity = managed_identity
240 self._http_client = _ThrottledHttpClient(
241 # This class only throttles excess token acquisition requests.
242 # It does not provide retry.
243 # Retry is the http_client or caller's responsibility, not MSAL's.
244 #
245 # FWIW, here is the inconsistent retry recommendation.
246 # 1. Only MI on VM defines exotic 404 and 410 retry recommendations
247 # ( https://learn.microsoft.com/en-us/entra/identity/managed-identities-azure-resources/how-to-use-vm-token#error-handling )
248 # (especially for 410 which was supposed to be a permanent failure).
249 # 2. MI on Service Fabric specifically suggests to not retry on 404.
250 # ( https://learn.microsoft.com/en-us/azure/service-fabric/how-to-managed-cluster-managed-identity-service-fabric-app-code#error-handling )
251 http_client,
252 http_cache=http_cache,
253 )
254 self._token_cache = token_cache or TokenCache()
255 self._client_capabilities = client_capabilities
256
257 def acquire_token_for_client(
258 self,
259 *,
260 resource: str, # If/when we support scope, resource will become optional
261 claims_challenge: Optional[str] = None,
262 ):
263 """Acquire token for the managed identity.
264
265 The result will be automatically cached.
266 Subsequent calls will automatically search from cache first.
267
268 :param resource: The resource for which the token is acquired.
269
270 :param claims_challenge:
271 Optional.
272 It is a string representation of a JSON object
273 (which contains lists of claims being requested).
274
275 The tenant admin may choose to revoke all Managed Identity tokens,
276 and then a *claims challenge* will be returned by the target resource,
277 as a `claims_challenge` directive in the `www-authenticate` header,
278 even if the app developer did not opt in for the "CP1" client capability.
279 Upon receiving a `claims_challenge`, MSAL will attempt to acquire a new token.
280
281 .. note::
282
283 Known issue: When an Azure VM has only one user-assigned managed identity,
284 and your app specifies to use system-assigned managed identity,
285 Azure VM may still return a token for your user-assigned identity.
286
287 This is a service-side behavior that cannot be changed by this library.
288 `Azure VM docs <https://learn.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/how-to-use-vm-token#get-a-token-using-http>`_
289 """
290 access_token_to_refresh = None # This could become a public parameter in the future
291 access_token_from_cache = None
292 client_id_in_cache = self._managed_identity.get(
293 ManagedIdentity.ID, "SYSTEM_ASSIGNED_MANAGED_IDENTITY")
294 now = time.time()
295 if True: # Attempt cache search even if receiving claims_challenge,
296 # because we want to locate the existing token (if any) and refresh it
297 matches = self._token_cache.search(
298 self._token_cache.CredentialType.ACCESS_TOKEN,
299 target=[resource],
300 query=dict(
301 client_id=client_id_in_cache,
302 environment=self.__instance,
303 realm=self._tenant,
304 home_account_id=None,
305 ),
306 )
307 for entry in matches:
308 expires_in = int(entry["expires_on"]) - now
309 if expires_in < 5*60: # Then consider it expired
310 continue # Removal is not necessary, it will be overwritten
311 if claims_challenge and not access_token_to_refresh:
312 # Since caller did not pinpoint the token causing claims challenge,
313 # we have to assume it is the first token we found in cache.
314 access_token_to_refresh = entry["secret"]
315 break
316 logger.debug("Cache hit an AT")
317 access_token_from_cache = { # Mimic a real response
318 "access_token": entry["secret"],
319 "token_type": entry.get("token_type", "Bearer"),
320 "expires_in": int(expires_in), # OAuth2 specs defines it as int
321 self._TOKEN_SOURCE: self._TOKEN_SOURCE_CACHE,
322 }
323 if "refresh_on" in entry:
324 access_token_from_cache["refresh_on"] = int(entry["refresh_on"])
325 if int(entry["refresh_on"]) < now: # aging
326 break # With a fallback in hand, we break here to go refresh
327 return access_token_from_cache # It is still good as new
328 try:
329 result = _obtain_token(
330 self._http_client, self._managed_identity, resource,
331 access_token_sha256_to_refresh=hashlib.sha256(
332 access_token_to_refresh.encode("utf-8")).hexdigest()
333 if access_token_to_refresh else None,
334 client_capabilities=self._client_capabilities,
335 )
336 if "access_token" in result:
337 expires_in = result.get("expires_in", 3600)
338 if "refresh_in" not in result and expires_in >= 7200:
339 result["refresh_in"] = int(expires_in / 2)
340 self._token_cache.add(dict(
341 client_id=client_id_in_cache,
342 scope=[resource],
343 token_endpoint="https://{}/{}".format(
344 self.__instance, self._tenant),
345 response=result,
346 params={},
347 data={},
348 ))
349 if "refresh_in" in result:
350 result["refresh_on"] = int(now + result["refresh_in"])
351 result[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP
352 if (result and "error" not in result) or (not access_token_from_cache):
353 return result
354 except: # The exact HTTP exception is transportation-layer dependent
355 # Typically network error. Potential AAD outage?
356 if not access_token_from_cache: # It means there is no fall back option
357 raise # We choose to bubble up the exception
358 return access_token_from_cache
359
360
361def _scope_to_resource(scope): # This is an experimental reasonable-effort approach
362 u = urlparse(scope)
363 if u.scheme:
364 return "{}://{}".format(u.scheme, u.netloc)
365 return scope # There is no much else we can do here
366
367
368def _get_arc_endpoint():
369 if "IDENTITY_ENDPOINT" in os.environ and "IMDS_ENDPOINT" in os.environ:
370 return os.environ["IDENTITY_ENDPOINT"]
371 if ( # Defined in https://eng.ms/docs/cloud-ai-platform/azure-core/azure-management-and-platforms/control-plane-bburns/hybrid-resource-provider/azure-arc-for-servers/specs/extension_authoring
372 sys.platform == "linux" and os.path.exists("/opt/azcmagent/bin/himds")
373 or sys.platform == "win32" and os.path.exists(os.path.expandvars(
374 # Avoid Windows-only "%EnvVar%" syntax so that tests can be run on Linux
375 r"${ProgramFiles}\AzureConnectedMachineAgent\himds.exe"
376 ))
377 ):
378 return "http://localhost:40342/metadata/identity/oauth2/token"
379
380
381APP_SERVICE = object()
382AZURE_ARC = object()
383CLOUD_SHELL = object() # In MSAL Python, token acquisition was done by
384 # PublicClientApplication(...).acquire_token_interactive(..., prompt="none")
385MACHINE_LEARNING = object()
386SERVICE_FABRIC = object()
387DEFAULT_TO_VM = object() # Unknown environment; default to VM; you may want to probe
388def get_managed_identity_source():
389 """Detect the current environment and return the likely identity source.
390
391 When this function returns ``CLOUD_SHELL``, you should use
392 :func:`msal.PublicClientApplication.acquire_token_interactive` with ``prompt="none"``
393 to obtain a token.
394 """
395 if ("IDENTITY_ENDPOINT" in os.environ and "IDENTITY_HEADER" in os.environ
396 and "IDENTITY_SERVER_THUMBPRINT" in os.environ
397 ):
398 return SERVICE_FABRIC
399 if "IDENTITY_ENDPOINT" in os.environ and "IDENTITY_HEADER" in os.environ:
400 return APP_SERVICE
401 if "MSI_ENDPOINT" in os.environ and "MSI_SECRET" in os.environ:
402 return MACHINE_LEARNING
403 if _get_arc_endpoint():
404 return AZURE_ARC
405 if _is_running_in_cloud_shell():
406 return CLOUD_SHELL
407 return DEFAULT_TO_VM
408
409
410def _obtain_token(
411 http_client, managed_identity, resource,
412 *,
413 access_token_sha256_to_refresh: Optional[str] = None,
414 client_capabilities: Optional[List[str]] = None,
415):
416 if ("IDENTITY_ENDPOINT" in os.environ and "IDENTITY_HEADER" in os.environ
417 and "IDENTITY_SERVER_THUMBPRINT" in os.environ
418 ):
419 if managed_identity:
420 logger.debug(
421 "Ignoring managed_identity parameter. "
422 "Managed Identity in Service Fabric is configured in the cluster, "
423 "not during runtime. See also "
424 "https://learn.microsoft.com/en-us/azure/service-fabric/configure-existing-cluster-enable-managed-identity-token-service")
425 return _obtain_token_on_service_fabric(
426 http_client,
427 os.environ["IDENTITY_ENDPOINT"],
428 os.environ["IDENTITY_HEADER"],
429 os.environ["IDENTITY_SERVER_THUMBPRINT"],
430 resource,
431 access_token_sha256_to_refresh=access_token_sha256_to_refresh,
432 client_capabilities=client_capabilities,
433 )
434 if "IDENTITY_ENDPOINT" in os.environ and "IDENTITY_HEADER" in os.environ:
435 return _obtain_token_on_app_service(
436 http_client,
437 os.environ["IDENTITY_ENDPOINT"],
438 os.environ["IDENTITY_HEADER"],
439 managed_identity,
440 resource,
441 )
442 if "MSI_ENDPOINT" in os.environ and "MSI_SECRET" in os.environ:
443 # Back ported from https://github.com/Azure/azure-sdk-for-python/blob/azure-identity_1.15.0/sdk/identity/azure-identity/azure/identity/_credentials/azure_ml.py
444 return _obtain_token_on_machine_learning(
445 http_client,
446 os.environ["MSI_ENDPOINT"],
447 os.environ["MSI_SECRET"],
448 managed_identity,
449 resource,
450 )
451 arc_endpoint = _get_arc_endpoint()
452 if arc_endpoint:
453 if ManagedIdentity.is_user_assigned(managed_identity):
454 raise ManagedIdentityError( # Note: Azure Identity for Python raised exception too
455 "Invalid managed_identity parameter. "
456 "Azure Arc supports only system-assigned managed identity, "
457 "See also "
458 "https://learn.microsoft.com/en-us/azure/service-fabric/configure-existing-cluster-enable-managed-identity-token-service")
459 return _obtain_token_on_arc(http_client, arc_endpoint, resource)
460 return _obtain_token_on_azure_vm(http_client, managed_identity, resource)
461
462
463def _adjust_param(params, managed_identity, types_mapping=None):
464 # Modify the params dict in place
465 id_name = (types_mapping or ManagedIdentity._types_mapping).get(
466 managed_identity.get(ManagedIdentity.ID_TYPE))
467 if id_name:
468 params[id_name] = managed_identity[ManagedIdentity.ID]
469
470def _obtain_token_on_azure_vm(http_client, managed_identity, resource):
471 # Based on https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/how-to-use-vm-token#get-a-token-using-http
472 logger.debug("Obtaining token via managed identity on Azure VM")
473 params = {
474 "api-version": "2018-02-01",
475 "resource": resource,
476 }
477 _adjust_param(params, managed_identity)
478 resp = http_client.get(
479 os.getenv(
480 "AZURE_POD_IDENTITY_AUTHORITY_HOST", "http://169.254.169.254"
481 ).strip("/") + "/metadata/identity/oauth2/token",
482 params=params,
483 headers={"Metadata": "true"},
484 )
485 try:
486 payload = json.loads(resp.text)
487 if payload.get("access_token") and payload.get("expires_in"):
488 return { # Normalizing the payload into OAuth2 format
489 "access_token": payload["access_token"],
490 "expires_in": int(payload["expires_in"]),
491 "resource": payload.get("resource"),
492 "token_type": payload.get("token_type", "Bearer"),
493 }
494 return payload # It would be {"error": ..., "error_description": ...} according to https://learn.microsoft.com/en-us/entra/identity/managed-identities-azure-resources/how-to-use-vm-token#error-handling
495 except json.decoder.JSONDecodeError:
496 logger.debug("IMDS emits unexpected payload: %s", resp.text)
497 raise
498
499def _obtain_token_on_app_service(
500 http_client, endpoint, identity_header, managed_identity, resource,
501):
502 """Obtains token for
503 `App Service <https://learn.microsoft.com/en-us/azure/app-service/overview-managed-identity?tabs=portal%2Chttp#rest-endpoint-reference>`_,
504 Azure Functions, and Azure Automation.
505 """
506 # Prerequisite: Create your app service https://docs.microsoft.com/en-us/azure/app-service/quickstart-python
507 # Assign it a managed identity https://docs.microsoft.com/en-us/azure/app-service/overview-managed-identity?tabs=portal%2Chttp
508 # SSH into your container for testing https://docs.microsoft.com/en-us/azure/app-service/configure-linux-open-ssh-session
509 logger.debug("Obtaining token via managed identity on Azure App Service")
510 params = {
511 "api-version": "2019-08-01",
512 "resource": resource,
513 }
514 _adjust_param(params, managed_identity, types_mapping={
515 ManagedIdentity.CLIENT_ID: "client_id",
516 ManagedIdentity.RESOURCE_ID: "mi_res_id", # App Service's resource id uses "mi_res_id"
517 ManagedIdentity.OBJECT_ID: "object_id",
518 })
519
520 resp = http_client.get(
521 endpoint,
522 params=params,
523 headers={
524 "X-IDENTITY-HEADER": identity_header,
525 "Metadata": "true", # Unnecessary yet harmless for App Service,
526 # It will be needed by Azure Automation
527 # https://docs.microsoft.com/en-us/azure/automation/enable-managed-identity-for-automation#get-access-token-for-system-assigned-managed-identity-using-http-get
528 },
529 )
530 try:
531 payload = json.loads(resp.text)
532 if payload.get("access_token") and payload.get("expires_on"):
533 return { # Normalizing the payload into OAuth2 format
534 "access_token": payload["access_token"],
535 "expires_in": int(payload["expires_on"]) - int(time.time()),
536 "resource": payload.get("resource"),
537 "token_type": payload.get("token_type", "Bearer"),
538 }
539 return {
540 "error": "invalid_scope", # Empirically, wrong resource ends up with a vague statusCode=500
541 "error_description": "{}, {}".format(
542 payload.get("statusCode"), payload.get("message")),
543 }
544 except json.decoder.JSONDecodeError:
545 logger.debug("IMDS emits unexpected payload: %s", resp.text)
546 raise
547
548def _obtain_token_on_machine_learning(
549 http_client, endpoint, secret, managed_identity, resource,
550):
551 # Could not find protocol docs from https://docs.microsoft.com/en-us/azure/machine-learning
552 # The following implementation is back ported from Azure Identity 1.15.0
553 logger.debug("Obtaining token via managed identity on Azure Machine Learning")
554 params = {"api-version": "2017-09-01", "resource": resource}
555 _adjust_param(params, managed_identity)
556 if params["api-version"] == "2017-09-01" and "client_id" in params:
557 # Workaround for a known bug in Azure ML 2017 API
558 params["clientid"] = params.pop("client_id")
559 resp = http_client.get(
560 endpoint,
561 params=params,
562 headers={"secret": secret},
563 )
564 try:
565 payload = json.loads(resp.text)
566 if payload.get("access_token") and payload.get("expires_on"):
567 return { # Normalizing the payload into OAuth2 format
568 "access_token": payload["access_token"],
569 "expires_in": int(payload["expires_on"]) - int(time.time()),
570 "resource": payload.get("resource"),
571 "token_type": payload.get("token_type", "Bearer"),
572 }
573 return {
574 "error": "invalid_scope", # TODO: To be tested
575 "error_description": "{}".format(payload),
576 }
577 except json.decoder.JSONDecodeError:
578 logger.debug("IMDS emits unexpected payload: %s", resp.text)
579 raise
580
581
582def _obtain_token_on_service_fabric(
583 http_client, endpoint, identity_header, server_thumbprint, resource,
584 *,
585 access_token_sha256_to_refresh: str = None,
586 client_capabilities: Optional[List[str]] = None,
587):
588 """Obtains token for
589 `Service Fabric <https://learn.microsoft.com/en-us/azure/service-fabric/>`_
590 """
591 # Deployment https://learn.microsoft.com/en-us/azure/service-fabric/service-fabric-get-started-containers-linux
592 # See also https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/identity/azure-identity/tests/managed-identity-live/service-fabric/service_fabric.md
593 # Protocol https://learn.microsoft.com/en-us/azure/service-fabric/how-to-managed-identity-service-fabric-app-code#acquiring-an-access-token-using-rest-api
594 logger.debug("Obtaining token via managed identity on Azure Service Fabric")
595 resp = http_client.get(
596 endpoint,
597 params={k: v for k, v in {
598 "api-version": "2019-07-01-preview",
599 "resource": resource,
600 "token_sha256_to_refresh": access_token_sha256_to_refresh,
601 "xms_cc": ",".join(client_capabilities) if client_capabilities else None,
602 }.items() if v is not None},
603 headers={"Secret": identity_header},
604 )
605 try:
606 payload = json.loads(resp.text)
607 if payload.get("access_token") and payload.get("expires_on"):
608 return { # Normalizing the payload into OAuth2 format
609 "access_token": payload["access_token"],
610 "expires_in": int( # Despite the example in docs shows an integer,
611 payload["expires_on"] # Azure SDK team's test obtained a string.
612 ) - int(time.time()),
613 "resource": payload.get("resource"),
614 "token_type": payload["token_type"],
615 }
616 error = payload.get("error", {}) # https://learn.microsoft.com/en-us/azure/service-fabric/how-to-managed-identity-service-fabric-app-code#error-handling
617 error_mapping = { # Map Service Fabric errors into OAuth2 errors https://www.rfc-editor.org/rfc/rfc6749#section-5.2
618 "SecretHeaderNotFound": "unauthorized_client",
619 "ManagedIdentityNotFound": "invalid_client",
620 "ArgumentNullOrEmpty": "invalid_scope",
621 }
622 return {
623 "error": error_mapping.get(error.get("code"), "invalid_request"),
624 "error_description": resp.text,
625 }
626 except json.decoder.JSONDecodeError:
627 logger.debug("IMDS emits unexpected payload: %s", resp.text)
628 raise
629
630
631_supported_arc_platforms_and_their_prefixes = {
632 "linux": "/var/opt/azcmagent/tokens",
633 "win32": os.path.expandvars(r"%ProgramData%\AzureConnectedMachineAgent\Tokens"),
634}
635
636class ArcPlatformNotSupportedError(ManagedIdentityError):
637 pass
638
639def _obtain_token_on_arc(http_client, endpoint, resource):
640 # https://learn.microsoft.com/en-us/azure/azure-arc/servers/managed-identity-authentication
641 logger.debug("Obtaining token via managed identity on Azure Arc")
642 resp = http_client.get(
643 endpoint,
644 params={"api-version": "2020-06-01", "resource": resource},
645 headers={"Metadata": "true"},
646 )
647 www_auth = "www-authenticate" # Header in lower case
648 challenge = {
649 # Normalized to lowercase, because header names are case-insensitive
650 # https://datatracker.ietf.org/doc/html/rfc7230#section-3.2
651 k.lower(): v for k, v in resp.headers.items() if k.lower() == www_auth
652 }.get(www_auth, "").split("=") # Output will be ["Basic realm", "content"]
653 if not ( # https://datatracker.ietf.org/doc/html/rfc7617#section-2
654 len(challenge) == 2 and challenge[0].lower() == "basic realm"):
655 raise ManagedIdentityError(
656 "Unrecognizable WWW-Authenticate header: {}".format(resp.headers))
657 if sys.platform not in _supported_arc_platforms_and_their_prefixes:
658 raise ArcPlatformNotSupportedError(
659 f"Platform {sys.platform} was undefined and unsupported")
660 filename = os.path.join(
661 # This algorithm is documented in an internal doc https://msazure.visualstudio.com/One/_wiki/wikis/One.wiki/233012/VM-Extension-Authoring-for-Arc?anchor=2.-obtaining-tokens
662 _supported_arc_platforms_and_their_prefixes[sys.platform],
663 os.path.splitext(os.path.basename(challenge[1]))[0] + ".key")
664 if os.stat(filename).st_size > 4096: # Check size BEFORE loading its content
665 raise ManagedIdentityError("Local key file shall not be larger than 4KB")
666 with open(filename) as f:
667 secret = f.read()
668 response = http_client.get(
669 endpoint,
670 params={"api-version": "2020-06-01", "resource": resource},
671 headers={"Metadata": "true", "Authorization": "Basic {}".format(secret)},
672 )
673 try:
674 payload = json.loads(response.text)
675 if payload.get("access_token") and payload.get("expires_in"):
676 # Example: https://learn.microsoft.com/en-us/azure/azure-arc/servers/media/managed-identity-authentication/bash-token-output-example.png
677 return {
678 "access_token": payload["access_token"],
679 "expires_in": int(payload["expires_in"]),
680 "token_type": payload.get("token_type", "Bearer"),
681 "resource": payload.get("resource"),
682 }
683 except json.decoder.JSONDecodeError:
684 pass
685 return {
686 "error": "invalid_request",
687 "error_description": response.text,
688 }