1# Copyright (c) Microsoft Corporation.
2# All rights reserved.
3#
4# This code is licensed under the MIT License.
5import hashlib
6import json
7import logging
8import os
9import socket
10import sys
11import time
12from urllib.parse import urlparse # Python 3+
13from collections import UserDict # Python 3+
14from typing import List, Optional, Union # Needed in Python 3.7 & 3.8
15from .token_cache import TokenCache
16from .individual_cache import _IndividualCache as IndividualCache
17from .throttled_http_client import ThrottledHttpClientBase, RetryAfterParser
18from .cloudshell import _is_running_in_cloud_shell
19
20
21logger = logging.getLogger(__name__)
22
23
24class ManagedIdentityError(ValueError):
25 pass
26
27
28class ManagedIdentity(UserDict):
29 """Feed an instance of this class to :class:`msal.ManagedIdentityClient`
30 to acquire token for the specified managed identity.
31 """
32 # The key names used in config dict
33 ID_TYPE = "ManagedIdentityIdType" # Contains keyword ManagedIdentity so its json equivalent will be more readable
34 ID = "Id"
35
36 # Valid values for key ID_TYPE
37 CLIENT_ID = "ClientId"
38 RESOURCE_ID = "ResourceId"
39 OBJECT_ID = "ObjectId"
40 SYSTEM_ASSIGNED = "SystemAssigned"
41
42 _types_mapping = { # Maps type name in configuration to type name on wire
43 CLIENT_ID: "client_id",
44 RESOURCE_ID: "msi_res_id", # VM's IMDS prefers msi_res_id https://github.com/Azure/azure-rest-api-specs/blob/dba6ed1f03bda88ac6884c0a883246446cc72495/specification/imds/data-plane/Microsoft.InstanceMetadataService/stable/2018-10-01/imds.json#L233-L239
45 OBJECT_ID: "object_id",
46 }
47
48 @classmethod
49 def is_managed_identity(cls, unknown):
50 return (isinstance(unknown, ManagedIdentity)
51 or cls.is_system_assigned(unknown)
52 or cls.is_user_assigned(unknown))
53
54 @classmethod
55 def is_system_assigned(cls, unknown):
56 return isinstance(unknown, SystemAssignedManagedIdentity) or (
57 isinstance(unknown, dict)
58 and unknown.get(cls.ID_TYPE) == cls.SYSTEM_ASSIGNED)
59
60 @classmethod
61 def is_user_assigned(cls, unknown):
62 return isinstance(unknown, UserAssignedManagedIdentity) or (
63 isinstance(unknown, dict)
64 and unknown.get(cls.ID_TYPE) in cls._types_mapping
65 and unknown.get(cls.ID))
66
67 def __init__(self, identifier=None, id_type=None):
68 # Undocumented. Use subclasses instead.
69 super(ManagedIdentity, self).__init__({
70 self.ID_TYPE: id_type,
71 self.ID: identifier,
72 })
73
74
75class SystemAssignedManagedIdentity(ManagedIdentity):
76 """Represent a system-assigned managed identity.
77
78 It is equivalent to a Python dict of::
79
80 {"ManagedIdentityIdType": "SystemAssigned", "Id": None}
81
82 or a JSON blob of::
83
84 {"ManagedIdentityIdType": "SystemAssigned", "Id": null}
85 """
86 def __init__(self):
87 super(SystemAssignedManagedIdentity, self).__init__(id_type=self.SYSTEM_ASSIGNED)
88
89
90class UserAssignedManagedIdentity(ManagedIdentity):
91 """Represent a user-assigned managed identity.
92
93 Depends on the id you provided, the outcome is equivalent to one of the below::
94
95 {"ManagedIdentityIdType": "ClientId", "Id": "foo"}
96 {"ManagedIdentityIdType": "ResourceId", "Id": "foo"}
97 {"ManagedIdentityIdType": "ObjectId", "Id": "foo"}
98 """
99 def __init__(self, *, client_id=None, resource_id=None, object_id=None):
100 if client_id and not resource_id and not object_id:
101 super(UserAssignedManagedIdentity, self).__init__(
102 id_type=self.CLIENT_ID, identifier=client_id)
103 elif not client_id and resource_id and not object_id:
104 super(UserAssignedManagedIdentity, self).__init__(
105 id_type=self.RESOURCE_ID, identifier=resource_id)
106 elif not client_id and not resource_id and object_id:
107 super(UserAssignedManagedIdentity, self).__init__(
108 id_type=self.OBJECT_ID, identifier=object_id)
109 else:
110 raise ManagedIdentityError(
111 "You shall specify one of the three parameters: "
112 "client_id, resource_id, object_id")
113
114
115class _ThrottledHttpClient(ThrottledHttpClientBase):
116 def __init__(self, *args, **kwargs):
117 super(_ThrottledHttpClient, self).__init__(*args, **kwargs)
118 self.get = IndividualCache( # All MIs (except Cloud Shell) use GETs
119 mapping=self._expiring_mapping,
120 key_maker=lambda func, args, kwargs: "REQ {} hash={} 429/5xx/Retry-After".format(
121 args[0], # It is the endpoint, typically a constant per MI type
122 self._hash(
123 # Managed Identity flavors have inconsistent parameters.
124 # We simply choose to hash them all.
125 str(kwargs.get("params")) + str(kwargs.get("data"))),
126 ),
127 expires_in=RetryAfterParser(5).parse, # 5 seconds default for non-PCA
128 )(self.get) # Note: Decorate the parent get(), not the http_client.get()
129
130
131class ManagedIdentityClient(object):
132 """This API encapsulates multiple managed identity back-ends:
133 VM, App Service, Azure Automation (Runbooks), Azure Function, Service Fabric,
134 and Azure Arc.
135
136 It also provides token cache support.
137
138 .. note::
139
140 Cloud Shell support is NOT implemented in this class.
141 Since MSAL Python 1.18 in May 2022, it has been implemented in
142 :func:`PublicClientApplication.acquire_token_interactive` via calling pattern
143 ``PublicClientApplication(...).acquire_token_interactive(scopes=[...], prompt="none")``.
144 That is appropriate, because Cloud Shell yields a token with
145 delegated permissions for the end user who has signed in to the Azure Portal
146 (like what a ``PublicClientApplication`` does),
147 not a token with application permissions for an app.
148 """
149 __instance, _tenant = None, "managed_identity" # Placeholders
150 _TOKEN_SOURCE = "token_source"
151 _TOKEN_SOURCE_IDP = "identity_provider"
152 _TOKEN_SOURCE_CACHE = "cache"
153
154 def __init__(
155 self,
156 managed_identity: Union[
157 dict,
158 ManagedIdentity, # Could use Type[ManagedIdentity] but it is deprecated in Python 3.9+
159 SystemAssignedManagedIdentity,
160 UserAssignedManagedIdentity,
161 ],
162 *,
163 http_client,
164 token_cache=None,
165 http_cache=None,
166 client_capabilities: Optional[List[str]] = None,
167 ):
168 """Create a managed identity client.
169
170 :param managed_identity:
171 It accepts an instance of :class:`SystemAssignedManagedIdentity`
172 or :class:`UserAssignedManagedIdentity`.
173 They are equivalent to a dict with a certain shape,
174 which may be loaded from a JSON configuration file or an env var.
175
176 :param http_client:
177 An http client object. For example, you can use ``requests.Session()``,
178 optionally with exponential backoff behavior demonstrated in this recipe::
179
180 import msal, requests
181 from requests.adapters import HTTPAdapter, Retry
182 s = requests.Session()
183 retries = Retry(total=3, backoff_factor=0.1, status_forcelist=[
184 429, 500, 501, 502, 503, 504])
185 s.mount('https://', HTTPAdapter(max_retries=retries))
186 managed_identity = ...
187 client = msal.ManagedIdentityClient(managed_identity, http_client=s)
188
189 :param token_cache:
190 Optional. It accepts a :class:`msal.TokenCache` instance to store tokens.
191 It will use an in-memory token cache by default.
192
193 :param http_cache:
194 Optional. It has the same characteristics as the
195 :paramref:`msal.ClientApplication.http_cache`.
196
197 :param list[str] client_capabilities: (optional)
198 Allows configuration of one or more client capabilities, e.g. ["CP1"].
199
200 Client capability is meant to inform the Microsoft identity platform
201 (STS) what this client is capable for,
202 so STS can decide to turn on certain features.
203
204 Implementation details:
205 Client capability in Managed Identity is relayed as-is
206 via ``xms_cc`` parameter on the wire.
207
208 Recipe 1: Hard code a managed identity for your app::
209
210 import msal, requests
211 client = msal.ManagedIdentityClient(
212 msal.UserAssignedManagedIdentity(client_id="foo"),
213 http_client=requests.Session(),
214 )
215 token = client.acquire_token_for_client("resource")
216
217 Recipe 2: Write once, run everywhere.
218 If you use different managed identity on different deployment,
219 you may use an environment variable (such as MY_MANAGED_IDENTITY_CONFIG)
220 to store a json blob like
221 ``{"ManagedIdentityIdType": "ClientId", "Id": "foo"}`` or
222 ``{"ManagedIdentityIdType": "SystemAssigned", "Id": null}``.
223 The following app can load managed identity configuration dynamically::
224
225 import json, os, msal, requests
226 config = os.getenv("MY_MANAGED_IDENTITY_CONFIG")
227 assert config, "An ENV VAR with value should exist"
228 client = msal.ManagedIdentityClient(
229 json.loads(config),
230 http_client=requests.Session(),
231 )
232 token = client.acquire_token_for_client("resource")
233 """
234 if not ManagedIdentity.is_managed_identity(managed_identity):
235 raise ManagedIdentityError(
236 f"Incorrect managed_identity: {managed_identity}")
237 self._managed_identity = managed_identity
238 self._http_client = _ThrottledHttpClient(
239 # This class only throttles excess token acquisition requests.
240 # It does not provide retry.
241 # Retry is the http_client or caller's responsibility, not MSAL's.
242 #
243 # FWIW, here is the inconsistent retry recommendation.
244 # 1. Only MI on VM defines exotic 404 and 410 retry recommendations
245 # ( https://learn.microsoft.com/en-us/entra/identity/managed-identities-azure-resources/how-to-use-vm-token#error-handling )
246 # (especially for 410 which was supposed to be a permanent failure).
247 # 2. MI on Service Fabric specifically suggests to not retry on 404.
248 # ( https://learn.microsoft.com/en-us/azure/service-fabric/how-to-managed-cluster-managed-identity-service-fabric-app-code#error-handling )
249 http_client,
250 http_cache=http_cache,
251 )
252 self._token_cache = token_cache or TokenCache()
253 self._client_capabilities = client_capabilities
254
255 def _get_instance(self):
256 if self.__instance is None:
257 self.__instance = socket.getfqdn() # Moved from class definition to here
258 return self.__instance
259
260 def acquire_token_for_client(
261 self,
262 *,
263 resource: str, # If/when we support scope, resource will become optional
264 claims_challenge: Optional[str] = None,
265 ):
266 """Acquire token for the managed identity.
267
268 The result will be automatically cached.
269 Subsequent calls will automatically search from cache first.
270
271 :param resource: The resource for which the token is acquired.
272
273 :param claims_challenge:
274 Optional.
275 It is a string representation of a JSON object
276 (which contains lists of claims being requested).
277
278 The tenant admin may choose to revoke all Managed Identity tokens,
279 and then a *claims challenge* will be returned by the target resource,
280 as a `claims_challenge` directive in the `www-authenticate` header,
281 even if the app developer did not opt in for the "CP1" client capability.
282 Upon receiving a `claims_challenge`, MSAL will attempt to acquire a new token.
283
284 .. note::
285
286 Known issue: When an Azure VM has only one user-assigned managed identity,
287 and your app specifies to use system-assigned managed identity,
288 Azure VM may still return a token for your user-assigned identity.
289
290 This is a service-side behavior that cannot be changed by this library.
291 `Azure VM docs <https://learn.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/how-to-use-vm-token#get-a-token-using-http>`_
292 """
293 access_token_to_refresh = None # This could become a public parameter in the future
294 access_token_from_cache = None
295 client_id_in_cache = self._managed_identity.get(
296 ManagedIdentity.ID, "SYSTEM_ASSIGNED_MANAGED_IDENTITY")
297 now = time.time()
298 if True: # Attempt cache search even if receiving claims_challenge,
299 # because we want to locate the existing token (if any) and refresh it
300 matches = self._token_cache.search(
301 self._token_cache.CredentialType.ACCESS_TOKEN,
302 target=[resource],
303 query=dict(
304 client_id=client_id_in_cache,
305 environment=self._get_instance(),
306 realm=self._tenant,
307 home_account_id=None,
308 ),
309 )
310 for entry in matches:
311 expires_in = int(entry["expires_on"]) - now
312 if expires_in < 5*60: # Then consider it expired
313 continue # Removal is not necessary, it will be overwritten
314 if claims_challenge and not access_token_to_refresh:
315 # Since caller did not pinpoint the token causing claims challenge,
316 # we have to assume it is the first token we found in cache.
317 access_token_to_refresh = entry["secret"]
318 break
319 logger.debug("Cache hit an AT")
320 access_token_from_cache = { # Mimic a real response
321 "access_token": entry["secret"],
322 "token_type": entry.get("token_type", "Bearer"),
323 "expires_in": int(expires_in), # OAuth2 specs defines it as int
324 self._TOKEN_SOURCE: self._TOKEN_SOURCE_CACHE,
325 }
326 if "refresh_on" in entry:
327 access_token_from_cache["refresh_on"] = int(entry["refresh_on"])
328 if int(entry["refresh_on"]) < now: # aging
329 break # With a fallback in hand, we break here to go refresh
330 return access_token_from_cache # It is still good as new
331 try:
332 result = _obtain_token(
333 self._http_client, self._managed_identity, resource,
334 access_token_sha256_to_refresh=hashlib.sha256(
335 access_token_to_refresh.encode("utf-8")).hexdigest()
336 if access_token_to_refresh else None,
337 client_capabilities=self._client_capabilities,
338 )
339 if "access_token" in result:
340 expires_in = result.get("expires_in", 3600)
341 if "refresh_in" not in result and expires_in >= 7200:
342 result["refresh_in"] = int(expires_in / 2)
343 self._token_cache.add(dict(
344 client_id=client_id_in_cache,
345 scope=[resource],
346 token_endpoint="https://{}/{}".format(
347 self._get_instance(), self._tenant),
348 response=result,
349 params={},
350 data={},
351 ))
352 if "refresh_in" in result:
353 result["refresh_on"] = int(now + result["refresh_in"])
354 result[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP
355 if (result and "error" not in result) or (not access_token_from_cache):
356 return result
357 except: # The exact HTTP exception is transportation-layer dependent
358 # Typically network error. Potential AAD outage?
359 if not access_token_from_cache: # It means there is no fall back option
360 raise # We choose to bubble up the exception
361 return access_token_from_cache
362
363
364def _scope_to_resource(scope): # This is an experimental reasonable-effort approach
365 u = urlparse(scope)
366 if u.scheme:
367 return "{}://{}".format(u.scheme, u.netloc)
368 return scope # There is no much else we can do here
369
370
371def _get_arc_endpoint():
372 if "IDENTITY_ENDPOINT" in os.environ and "IMDS_ENDPOINT" in os.environ:
373 return os.environ["IDENTITY_ENDPOINT"]
374 if ( # Defined in https://eng.ms/docs/cloud-ai-platform/azure-core/azure-management-and-platforms/control-plane-bburns/hybrid-resource-provider/azure-arc-for-servers/specs/extension_authoring
375 sys.platform == "linux" and os.path.exists("/opt/azcmagent/bin/himds")
376 or sys.platform == "win32" and os.path.exists(os.path.expandvars(
377 # Avoid Windows-only "%EnvVar%" syntax so that tests can be run on Linux
378 r"${ProgramFiles}\AzureConnectedMachineAgent\himds.exe"
379 ))
380 ):
381 return "http://localhost:40342/metadata/identity/oauth2/token"
382
383
384APP_SERVICE = object()
385AZURE_ARC = object()
386CLOUD_SHELL = object() # In MSAL Python, token acquisition was done by
387 # PublicClientApplication(...).acquire_token_interactive(..., prompt="none")
388MACHINE_LEARNING = object()
389SERVICE_FABRIC = object()
390DEFAULT_TO_VM = object() # Unknown environment; default to VM; you may want to probe
391def get_managed_identity_source():
392 """Detect the current environment and return the likely identity source.
393
394 When this function returns ``CLOUD_SHELL``, you should use
395 :func:`msal.PublicClientApplication.acquire_token_interactive` with ``prompt="none"``
396 to obtain a token.
397 """
398 if ("IDENTITY_ENDPOINT" in os.environ and "IDENTITY_HEADER" in os.environ
399 and "IDENTITY_SERVER_THUMBPRINT" in os.environ
400 ):
401 return SERVICE_FABRIC
402 if "IDENTITY_ENDPOINT" in os.environ and "IDENTITY_HEADER" in os.environ:
403 return APP_SERVICE
404 if "MSI_ENDPOINT" in os.environ and "MSI_SECRET" in os.environ:
405 return MACHINE_LEARNING
406 if _get_arc_endpoint():
407 return AZURE_ARC
408 if _is_running_in_cloud_shell():
409 return CLOUD_SHELL
410 return DEFAULT_TO_VM
411
412
413def _obtain_token(
414 http_client, managed_identity, resource,
415 *,
416 access_token_sha256_to_refresh: Optional[str] = None,
417 client_capabilities: Optional[List[str]] = None,
418):
419 if ("IDENTITY_ENDPOINT" in os.environ and "IDENTITY_HEADER" in os.environ
420 and "IDENTITY_SERVER_THUMBPRINT" in os.environ
421 ):
422 if managed_identity:
423 logger.debug(
424 "Ignoring managed_identity parameter. "
425 "Managed Identity in Service Fabric is configured in the cluster, "
426 "not during runtime. See also "
427 "https://learn.microsoft.com/en-us/azure/service-fabric/configure-existing-cluster-enable-managed-identity-token-service")
428 return _obtain_token_on_service_fabric(
429 http_client,
430 os.environ["IDENTITY_ENDPOINT"],
431 os.environ["IDENTITY_HEADER"],
432 os.environ["IDENTITY_SERVER_THUMBPRINT"],
433 resource,
434 access_token_sha256_to_refresh=access_token_sha256_to_refresh,
435 client_capabilities=client_capabilities,
436 )
437 if "IDENTITY_ENDPOINT" in os.environ and "IDENTITY_HEADER" in os.environ:
438 return _obtain_token_on_app_service(
439 http_client,
440 os.environ["IDENTITY_ENDPOINT"],
441 os.environ["IDENTITY_HEADER"],
442 managed_identity,
443 resource,
444 )
445 if "MSI_ENDPOINT" in os.environ and "MSI_SECRET" in os.environ:
446 # Back ported from https://github.com/Azure/azure-sdk-for-python/blob/azure-identity_1.15.0/sdk/identity/azure-identity/azure/identity/_credentials/azure_ml.py
447 return _obtain_token_on_machine_learning(
448 http_client,
449 os.environ["MSI_ENDPOINT"],
450 os.environ["MSI_SECRET"],
451 managed_identity,
452 resource,
453 )
454 arc_endpoint = _get_arc_endpoint()
455 if arc_endpoint:
456 if ManagedIdentity.is_user_assigned(managed_identity):
457 raise ManagedIdentityError( # Note: Azure Identity for Python raised exception too
458 "Invalid managed_identity parameter. "
459 "Azure Arc supports only system-assigned managed identity, "
460 "See also "
461 "https://learn.microsoft.com/en-us/azure/service-fabric/configure-existing-cluster-enable-managed-identity-token-service")
462 return _obtain_token_on_arc(http_client, arc_endpoint, resource)
463 return _obtain_token_on_azure_vm(http_client, managed_identity, resource)
464
465
466def _adjust_param(params, managed_identity, types_mapping=None):
467 # Modify the params dict in place
468 id_name = (types_mapping or ManagedIdentity._types_mapping).get(
469 managed_identity.get(ManagedIdentity.ID_TYPE))
470 if id_name:
471 params[id_name] = managed_identity[ManagedIdentity.ID]
472
473def _obtain_token_on_azure_vm(http_client, managed_identity, resource):
474 # Based on https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/how-to-use-vm-token#get-a-token-using-http
475 logger.debug("Obtaining token via managed identity on Azure VM")
476 params = {
477 "api-version": "2018-02-01",
478 "resource": resource,
479 }
480 _adjust_param(params, managed_identity)
481 resp = http_client.get(
482 os.getenv(
483 "AZURE_POD_IDENTITY_AUTHORITY_HOST", "http://169.254.169.254"
484 ).strip("/") + "/metadata/identity/oauth2/token",
485 params=params,
486 headers={"Metadata": "true"},
487 )
488 try:
489 payload = json.loads(resp.text)
490 if payload.get("access_token") and payload.get("expires_in"):
491 return { # Normalizing the payload into OAuth2 format
492 "access_token": payload["access_token"],
493 "expires_in": int(payload["expires_in"]),
494 "resource": payload.get("resource"),
495 "token_type": payload.get("token_type", "Bearer"),
496 }
497 return payload # It would be {"error": ..., "error_description": ...} according to https://learn.microsoft.com/en-us/entra/identity/managed-identities-azure-resources/how-to-use-vm-token#error-handling
498 except json.decoder.JSONDecodeError:
499 logger.debug("IMDS emits unexpected payload: %s", resp.text)
500 raise
501
502def _obtain_token_on_app_service(
503 http_client, endpoint, identity_header, managed_identity, resource,
504):
505 """Obtains token for
506 `App Service <https://learn.microsoft.com/en-us/azure/app-service/overview-managed-identity?tabs=portal%2Chttp#rest-endpoint-reference>`_,
507 Azure Functions, and Azure Automation.
508 """
509 # Prerequisite: Create your app service https://docs.microsoft.com/en-us/azure/app-service/quickstart-python
510 # Assign it a managed identity https://docs.microsoft.com/en-us/azure/app-service/overview-managed-identity?tabs=portal%2Chttp
511 # SSH into your container for testing https://docs.microsoft.com/en-us/azure/app-service/configure-linux-open-ssh-session
512 logger.debug("Obtaining token via managed identity on Azure App Service")
513 params = {
514 "api-version": "2019-08-01",
515 "resource": resource,
516 }
517 _adjust_param(params, managed_identity, types_mapping={
518 ManagedIdentity.CLIENT_ID: "client_id",
519 ManagedIdentity.RESOURCE_ID: "mi_res_id", # App Service's resource id uses "mi_res_id"
520 ManagedIdentity.OBJECT_ID: "object_id",
521 })
522
523 resp = http_client.get(
524 endpoint,
525 params=params,
526 headers={
527 "X-IDENTITY-HEADER": identity_header,
528 "Metadata": "true", # Unnecessary yet harmless for App Service,
529 # It will be needed by Azure Automation
530 # https://docs.microsoft.com/en-us/azure/automation/enable-managed-identity-for-automation#get-access-token-for-system-assigned-managed-identity-using-http-get
531 },
532 )
533 try:
534 payload = json.loads(resp.text)
535 if payload.get("access_token") and payload.get("expires_on"):
536 return { # Normalizing the payload into OAuth2 format
537 "access_token": payload["access_token"],
538 "expires_in": int(payload["expires_on"]) - int(time.time()),
539 "resource": payload.get("resource"),
540 "token_type": payload.get("token_type", "Bearer"),
541 }
542 return {
543 "error": "invalid_scope", # Empirically, wrong resource ends up with a vague statusCode=500
544 "error_description": "{}, {}".format(
545 payload.get("statusCode"), payload.get("message")),
546 }
547 except json.decoder.JSONDecodeError:
548 logger.debug("IMDS emits unexpected payload: %s", resp.text)
549 raise
550
551def _obtain_token_on_machine_learning(
552 http_client, endpoint, secret, managed_identity, resource,
553):
554 # Could not find protocol docs from https://docs.microsoft.com/en-us/azure/machine-learning
555 # The following implementation is back ported from Azure Identity 1.15.0
556 logger.debug("Obtaining token via managed identity on Azure Machine Learning")
557 params = {"api-version": "2017-09-01", "resource": resource}
558 _adjust_param(params, managed_identity)
559 if params["api-version"] == "2017-09-01" and "client_id" in params:
560 # Workaround for a known bug in Azure ML 2017 API
561 params["clientid"] = params.pop("client_id")
562 resp = http_client.get(
563 endpoint,
564 params=params,
565 headers={"secret": secret},
566 )
567 try:
568 payload = json.loads(resp.text)
569 if payload.get("access_token") and payload.get("expires_on"):
570 return { # Normalizing the payload into OAuth2 format
571 "access_token": payload["access_token"],
572 "expires_in": int(payload["expires_on"]) - int(time.time()),
573 "resource": payload.get("resource"),
574 "token_type": payload.get("token_type", "Bearer"),
575 }
576 return {
577 "error": "invalid_scope", # TODO: To be tested
578 "error_description": "{}".format(payload),
579 }
580 except json.decoder.JSONDecodeError:
581 logger.debug("IMDS emits unexpected payload: %s", resp.text)
582 raise
583
584
585def _obtain_token_on_service_fabric(
586 http_client, endpoint, identity_header, server_thumbprint, resource,
587 *,
588 access_token_sha256_to_refresh: str = None,
589 client_capabilities: Optional[List[str]] = None,
590):
591 """Obtains token for
592 `Service Fabric <https://learn.microsoft.com/en-us/azure/service-fabric/>`_
593 """
594 # Deployment https://learn.microsoft.com/en-us/azure/service-fabric/service-fabric-get-started-containers-linux
595 # See also https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/identity/azure-identity/tests/managed-identity-live/service-fabric/service_fabric.md
596 # Protocol https://learn.microsoft.com/en-us/azure/service-fabric/how-to-managed-identity-service-fabric-app-code#acquiring-an-access-token-using-rest-api
597 logger.debug("Obtaining token via managed identity on Azure Service Fabric")
598 resp = http_client.get(
599 endpoint,
600 params={k: v for k, v in {
601 "api-version": "2019-07-01-preview",
602 "resource": resource,
603 "token_sha256_to_refresh": access_token_sha256_to_refresh,
604 "xms_cc": ",".join(client_capabilities) if client_capabilities else None,
605 }.items() if v is not None},
606 headers={"Secret": identity_header},
607 )
608 try:
609 payload = json.loads(resp.text)
610 if payload.get("access_token") and payload.get("expires_on"):
611 return { # Normalizing the payload into OAuth2 format
612 "access_token": payload["access_token"],
613 "expires_in": int( # Despite the example in docs shows an integer,
614 payload["expires_on"] # Azure SDK team's test obtained a string.
615 ) - int(time.time()),
616 "resource": payload.get("resource"),
617 "token_type": payload["token_type"],
618 }
619 error = payload.get("error", {}) # https://learn.microsoft.com/en-us/azure/service-fabric/how-to-managed-identity-service-fabric-app-code#error-handling
620 error_mapping = { # Map Service Fabric errors into OAuth2 errors https://www.rfc-editor.org/rfc/rfc6749#section-5.2
621 "SecretHeaderNotFound": "unauthorized_client",
622 "ManagedIdentityNotFound": "invalid_client",
623 "ArgumentNullOrEmpty": "invalid_scope",
624 }
625 return {
626 "error": error_mapping.get(error.get("code"), "invalid_request"),
627 "error_description": resp.text,
628 }
629 except json.decoder.JSONDecodeError:
630 logger.debug("IMDS emits unexpected payload: %s", resp.text)
631 raise
632
633
634_supported_arc_platforms_and_their_prefixes = {
635 "linux": "/var/opt/azcmagent/tokens",
636 "win32": os.path.expandvars(r"%ProgramData%\AzureConnectedMachineAgent\Tokens"),
637}
638
639class ArcPlatformNotSupportedError(ManagedIdentityError):
640 pass
641
642def _obtain_token_on_arc(http_client, endpoint, resource):
643 # https://learn.microsoft.com/en-us/azure/azure-arc/servers/managed-identity-authentication
644 logger.debug("Obtaining token via managed identity on Azure Arc")
645 resp = http_client.get(
646 endpoint,
647 params={"api-version": "2020-06-01", "resource": resource},
648 headers={"Metadata": "true"},
649 )
650 www_auth = "www-authenticate" # Header in lower case
651 challenge = {
652 # Normalized to lowercase, because header names are case-insensitive
653 # https://datatracker.ietf.org/doc/html/rfc7230#section-3.2
654 k.lower(): v for k, v in resp.headers.items() if k.lower() == www_auth
655 }.get(www_auth, "").split("=") # Output will be ["Basic realm", "content"]
656 if not ( # https://datatracker.ietf.org/doc/html/rfc7617#section-2
657 len(challenge) == 2 and challenge[0].lower() == "basic realm"):
658 raise ManagedIdentityError(
659 "Unrecognizable WWW-Authenticate header: {}".format(resp.headers))
660 if sys.platform not in _supported_arc_platforms_and_their_prefixes:
661 raise ArcPlatformNotSupportedError(
662 f"Platform {sys.platform} was undefined and unsupported")
663 filename = os.path.join(
664 # This algorithm is documented in an internal doc https://msazure.visualstudio.com/One/_wiki/wikis/One.wiki/233012/VM-Extension-Authoring-for-Arc?anchor=2.-obtaining-tokens
665 _supported_arc_platforms_and_their_prefixes[sys.platform],
666 os.path.splitext(os.path.basename(challenge[1]))[0] + ".key")
667 if os.stat(filename).st_size > 4096: # Check size BEFORE loading its content
668 raise ManagedIdentityError("Local key file shall not be larger than 4KB")
669 with open(filename) as f:
670 secret = f.read()
671 response = http_client.get(
672 endpoint,
673 params={"api-version": "2020-06-01", "resource": resource},
674 headers={"Metadata": "true", "Authorization": "Basic {}".format(secret)},
675 )
676 try:
677 payload = json.loads(response.text)
678 if payload.get("access_token") and payload.get("expires_in"):
679 # Example: https://learn.microsoft.com/en-us/azure/azure-arc/servers/media/managed-identity-authentication/bash-token-output-example.png
680 return {
681 "access_token": payload["access_token"],
682 "expires_in": int(payload["expires_in"]),
683 "token_type": payload.get("token_type", "Bearer"),
684 "resource": payload.get("resource"),
685 }
686 except json.decoder.JSONDecodeError:
687 pass
688 return {
689 "error": "invalid_request",
690 "error_description": response.text,
691 }