Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/msal/application.py: 19%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import functools
2import json
3import time
4import logging
5import sys
6import warnings
7from threading import Lock
8import os
10from .oauth2cli import Client, JwtAssertionCreator
11from .oauth2cli.oidc import decode_part
12from .authority import Authority, WORLD_WIDE
13from .mex import send_request as mex_send_request
14from .wstrust_request import send_request as wst_send_request
15from .wstrust_response import *
16from .token_cache import TokenCache, _get_username, _GRANT_TYPE_BROKER
17import msal.telemetry
18from .region import _detect_region
19from .throttled_http_client import ThrottledHttpClient
20from .cloudshell import _is_running_in_cloud_shell
23# The __init__.py will import this. Not the other way around.
24__version__ = "1.30.0" # When releasing, also check and bump our dependencies's versions if needed
26logger = logging.getLogger(__name__)
27_AUTHORITY_TYPE_CLOUDSHELL = "CLOUDSHELL"
29def extract_certs(public_cert_content):
30 # Parses raw public certificate file contents and returns a list of strings
31 # Usage: headers = {"x5c": extract_certs(open("my_cert.pem").read())}
32 public_certificates = re.findall(
33 r'-----BEGIN CERTIFICATE-----(?P<cert_value>[^-]+)-----END CERTIFICATE-----',
34 public_cert_content, re.I)
35 if public_certificates:
36 return [cert.strip() for cert in public_certificates]
37 # The public cert tags are not found in the input,
38 # let's make best effort to exclude a private key pem file.
39 if "PRIVATE KEY" in public_cert_content:
40 raise ValueError(
41 "We expect your public key but detect a private key instead")
42 return [public_cert_content.strip()]
45def _merge_claims_challenge_and_capabilities(capabilities, claims_challenge):
46 # Represent capabilities as {"access_token": {"xms_cc": {"values": capabilities}}}
47 # and then merge/add it into incoming claims
48 if not capabilities:
49 return claims_challenge
50 claims_dict = json.loads(claims_challenge) if claims_challenge else {}
51 for key in ["access_token"]: # We could add "id_token" if we'd decide to
52 claims_dict.setdefault(key, {}).update(xms_cc={"values": capabilities})
53 return json.dumps(claims_dict)
56def _str2bytes(raw):
57 # A conversion based on duck-typing rather than six.text_type
58 try:
59 return raw.encode(encoding="utf-8")
60 except:
61 return raw
64def _parse_pfx(pfx_path, passphrase_bytes):
65 # Cert concepts https://security.stackexchange.com/a/226758/125264
66 from cryptography.hazmat.primitives import hashes, serialization
67 from cryptography.hazmat.primitives.serialization import pkcs12
68 with open(pfx_path, 'rb') as f:
69 private_key, cert, _ = pkcs12.load_key_and_certificates( # cryptography 2.5+
70 # https://cryptography.io/en/latest/hazmat/primitives/asymmetric/serialization/#cryptography.hazmat.primitives.serialization.pkcs12.load_key_and_certificates
71 f.read(), passphrase_bytes)
72 if not (private_key and cert):
73 raise ValueError("Your PFX file shall contain both private key and cert")
74 cert_pem = cert.public_bytes(encoding=serialization.Encoding.PEM).decode() # cryptography 1.0+
75 x5c = [
76 '\n'.join(cert_pem.splitlines()[1:-1]) # Strip the "--- header ---" and "--- footer ---"
77 ]
78 sha256_thumbprint = cert.fingerprint(hashes.SHA256()).hex() # cryptography 0.7+
79 sha1_thumbprint = cert.fingerprint(hashes.SHA1()).hex() # cryptography 0.7+
80 # https://cryptography.io/en/latest/x509/reference/#x-509-certificate-object
81 return private_key, sha256_thumbprint, sha1_thumbprint, x5c
84def _load_private_key_from_pem_str(private_key_pem_str, passphrase_bytes):
85 from cryptography.hazmat.primitives import serialization
86 from cryptography.hazmat.backends import default_backend
87 return serialization.load_pem_private_key( # cryptography 0.6+
88 _str2bytes(private_key_pem_str),
89 passphrase_bytes,
90 backend=default_backend(), # It was a required param until 2020
91 )
94def _pii_less_home_account_id(home_account_id):
95 parts = home_account_id.split(".") # It could contain one or two parts
96 parts[0] = "********"
97 return ".".join(parts)
100def _clean_up(result):
101 if isinstance(result, dict):
102 if "_msalruntime_telemetry" in result or "_msal_python_telemetry" in result:
103 result["msal_telemetry"] = json.dumps({ # Telemetry as an opaque string
104 "msalruntime_telemetry": result.get("_msalruntime_telemetry"),
105 "msal_python_telemetry": result.get("_msal_python_telemetry"),
106 }, separators=(",", ":"))
107 return_value = {
108 k: result[k] for k in result
109 if k != "refresh_in" # MSAL handled refresh_in, customers need not
110 and not k.startswith('_') # Skim internal properties
111 }
112 if "refresh_in" in result: # To encourage proactive refresh
113 return_value["refresh_on"] = int(time.time() + result["refresh_in"])
114 return return_value
115 return result # It could be None
118def _preferred_browser():
119 """Register Edge and return a name suitable for subsequent webbrowser.get(...)
120 when appropriate. Otherwise return None.
121 """
122 # On Linux, only Edge will provide device-based Conditional Access support
123 if sys.platform != "linux": # On other platforms, we have no browser preference
124 return None
125 browser_path = "/usr/bin/microsoft-edge" # Use a full path owned by sys admin
126 # Note: /usr/bin/microsoft-edge, /usr/bin/microsoft-edge-stable, etc.
127 # are symlinks that point to the actual binaries which are found under
128 # /opt/microsoft/msedge/msedge or /opt/microsoft/msedge-beta/msedge.
129 # Either method can be used to detect an Edge installation.
130 user_has_no_preference = "BROWSER" not in os.environ
131 user_wont_mind_edge = "microsoft-edge" in os.environ.get("BROWSER", "") # Note:
132 # BROWSER could contain "microsoft-edge" or "/path/to/microsoft-edge".
133 # Python documentation (https://docs.python.org/3/library/webbrowser.html)
134 # does not document the name being implicitly register,
135 # so there is no public API to know whether the ENV VAR browser would work.
136 # Therefore, we would not bother examine the env var browser's type.
137 # We would just register our own Edge instance.
138 if (user_has_no_preference or user_wont_mind_edge) and os.path.exists(browser_path):
139 try:
140 import webbrowser # Lazy import. Some distro may not have this.
141 browser_name = "msal-edge" # Avoid popular name "microsoft-edge"
142 # otherwise `BROWSER="microsoft-edge"; webbrowser.get("microsoft-edge")`
143 # would return a GenericBrowser instance which won't work.
144 try:
145 registration_available = isinstance(
146 webbrowser.get(browser_name), webbrowser.BackgroundBrowser)
147 except webbrowser.Error:
148 registration_available = False
149 if not registration_available:
150 logger.debug("Register %s with %s", browser_name, browser_path)
151 # By registering our own browser instance with our own name,
152 # rather than populating a process-wide BROWSER enn var,
153 # this approach does not have side effect on non-MSAL code path.
154 webbrowser.register( # Even double-register happens to work fine
155 browser_name, None, webbrowser.BackgroundBrowser(browser_path))
156 return browser_name
157 except ImportError:
158 pass # We may still proceed
159 return None
162class _ClientWithCcsRoutingInfo(Client):
164 def initiate_auth_code_flow(self, **kwargs):
165 if kwargs.get("login_hint"): # eSTS could have utilized this as-is, but nope
166 kwargs["X-AnchorMailbox"] = "UPN:%s" % kwargs["login_hint"]
167 return super(_ClientWithCcsRoutingInfo, self).initiate_auth_code_flow(
168 client_info=1, # To be used as CSS Routing info
169 **kwargs)
171 def obtain_token_by_auth_code_flow(
172 self, auth_code_flow, auth_response, **kwargs):
173 # Note: the obtain_token_by_browser() is also covered by this
174 assert isinstance(auth_code_flow, dict) and isinstance(auth_response, dict)
175 headers = kwargs.pop("headers", {})
176 client_info = json.loads(
177 decode_part(auth_response["client_info"])
178 ) if auth_response.get("client_info") else {}
179 if "uid" in client_info and "utid" in client_info:
180 # Note: The value of X-AnchorMailbox is also case-insensitive
181 headers["X-AnchorMailbox"] = "Oid:{uid}@{utid}".format(**client_info)
182 return super(_ClientWithCcsRoutingInfo, self).obtain_token_by_auth_code_flow(
183 auth_code_flow, auth_response, headers=headers, **kwargs)
185 def obtain_token_by_username_password(self, username, password, **kwargs):
186 headers = kwargs.pop("headers", {})
187 headers["X-AnchorMailbox"] = "upn:{}".format(username)
188 return super(_ClientWithCcsRoutingInfo, self).obtain_token_by_username_password(
189 username, password, headers=headers, **kwargs)
192class ClientApplication(object):
193 """You do not usually directly use this class. Use its subclasses instead:
194 :class:`PublicClientApplication` and :class:`ConfidentialClientApplication`.
195 """
196 ACQUIRE_TOKEN_SILENT_ID = "84"
197 ACQUIRE_TOKEN_BY_REFRESH_TOKEN = "85"
198 ACQUIRE_TOKEN_BY_USERNAME_PASSWORD_ID = "301"
199 ACQUIRE_TOKEN_ON_BEHALF_OF_ID = "523"
200 ACQUIRE_TOKEN_BY_DEVICE_FLOW_ID = "622"
201 ACQUIRE_TOKEN_FOR_CLIENT_ID = "730"
202 ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID = "832"
203 ACQUIRE_TOKEN_INTERACTIVE = "169"
204 GET_ACCOUNTS_ID = "902"
205 REMOVE_ACCOUNT_ID = "903"
207 ATTEMPT_REGION_DISCOVERY = True # "TryAutoDetect"
208 _TOKEN_SOURCE = "token_source"
209 _TOKEN_SOURCE_IDP = "identity_provider"
210 _TOKEN_SOURCE_CACHE = "cache"
211 _TOKEN_SOURCE_BROKER = "broker"
213 _enable_broker = False
214 _AUTH_SCHEME_UNSUPPORTED = (
215 "auth_scheme is currently only available from broker. "
216 "You can enable broker by following these instructions. "
217 "https://msal-python.readthedocs.io/en/latest/#publicclientapplication")
219 def __init__(
220 self, client_id,
221 client_credential=None, authority=None, validate_authority=True,
222 token_cache=None,
223 http_client=None,
224 verify=True, proxies=None, timeout=None,
225 client_claims=None, app_name=None, app_version=None,
226 client_capabilities=None,
227 azure_region=None, # Note: We choose to add this param in this base class,
228 # despite it is currently only needed by ConfidentialClientApplication.
229 # This way, it holds the same positional param place for PCA,
230 # when we would eventually want to add this feature to PCA in future.
231 exclude_scopes=None,
232 http_cache=None,
233 instance_discovery=None,
234 allow_broker=None,
235 enable_pii_log=None,
236 oidc_authority=None,
237 ):
238 """Create an instance of application.
240 :param str client_id: Your app has a client_id after you register it on Microsoft Entra admin center.
242 :param client_credential:
243 For :class:`PublicClientApplication`, you use `None` here.
245 For :class:`ConfidentialClientApplication`,
246 it supports many different input formats for different scenarios.
248 .. admonition:: Support using a client secret.
250 Just feed in a string, such as ``"your client secret"``.
252 .. admonition:: Support using a certificate in X.509 (.pem) format
254 Feed in a dict in this form::
256 {
257 "private_key": "...-----BEGIN PRIVATE KEY-----... in PEM format",
258 "thumbprint": "A1B2C3D4E5F6...",
259 "passphrase": "Passphrase if the private_key is encrypted (Optional. Added in version 1.6.0)",
260 }
262 MSAL Python requires a "private_key" in PEM format.
263 If your cert is in PKCS12 (.pfx) format,
264 you can convert it to X.509 (.pem) format,
265 by ``openssl pkcs12 -in file.pfx -out file.pem -nodes``.
267 The thumbprint is available in your app's registration in Azure Portal.
268 Alternatively, you can `calculate the thumbprint <https://github.com/Azure/azure-sdk-for-python/blob/07d10639d7e47f4852eaeb74aef5d569db499d6e/sdk/identity/azure-identity/azure/identity/_credentials/certificate.py#L94-L97>`_.
270 .. admonition:: Support Subject Name/Issuer Auth with a cert in .pem
272 `Subject Name/Issuer Auth
273 <https://github.com/AzureAD/microsoft-authentication-library-for-python/issues/60>`_
274 is an approach to allow easier certificate rotation.
276 *Added in version 0.5.0*::
278 {
279 "private_key": "...-----BEGIN PRIVATE KEY-----... in PEM format",
280 "thumbprint": "A1B2C3D4E5F6...",
281 "public_certificate": "...-----BEGIN CERTIFICATE-----...",
282 "passphrase": "Passphrase if the private_key is encrypted (Optional. Added in version 1.6.0)",
283 }
285 ``public_certificate`` (optional) is public key certificate
286 which will be sent through 'x5c' JWT header only for
287 subject name and issuer authentication to support cert auto rolls.
289 Per `specs <https://tools.ietf.org/html/rfc7515#section-4.1.6>`_,
290 "the certificate containing
291 the public key corresponding to the key used to digitally sign the
292 JWS MUST be the first certificate. This MAY be followed by
293 additional certificates, with each subsequent certificate being the
294 one used to certify the previous one."
295 However, your certificate's issuer may use a different order.
296 So, if your attempt ends up with an error AADSTS700027 -
297 "The provided signature value did not match the expected signature value",
298 you may try use only the leaf cert (in PEM/str format) instead.
300 .. admonition:: Supporting raw assertion obtained from elsewhere
302 *Added in version 1.13.0*:
303 It can also be a completely pre-signed assertion that you've assembled yourself.
304 Simply pass a container containing only the key "client_assertion", like this::
306 {
307 "client_assertion": "...a JWT with claims aud, exp, iss, jti, nbf, and sub..."
308 }
310 .. admonition:: Supporting reading client cerficates from PFX files
312 *Added in version 1.29.0*:
313 Feed in a dictionary containing the path to a PFX file::
315 {
316 "private_key_pfx_path": "/path/to/your.pfx",
317 "passphrase": "Passphrase if the private_key is encrypted (Optional)",
318 }
320 The following command will generate a .pfx file from your .key and .pem file::
322 openssl pkcs12 -export -out certificate.pfx -inkey privateKey.key -in certificate.pem
324 .. admonition:: Support Subject Name/Issuer Auth with a cert in .pfx
326 *Added in version 1.30.0*:
327 If your .pfx file contains both the private key and public cert,
328 you can opt in for Subject Name/Issuer Auth like this::
330 {
331 "private_key_pfx_path": "/path/to/your.pfx",
332 "public_certificate": True,
333 "passphrase": "Passphrase if the private_key is encrypted (Optional)",
334 }
336 :type client_credential: Union[dict, str, None]
338 :param dict client_claims:
339 *Added in version 0.5.0*:
340 It is a dictionary of extra claims that would be signed by
341 by this :class:`ConfidentialClientApplication` 's private key.
342 For example, you can use {"client_ip": "x.x.x.x"}.
343 You may also override any of the following default claims::
345 {
346 "aud": the_token_endpoint,
347 "iss": self.client_id,
348 "sub": same_as_issuer,
349 "exp": now + 10_min,
350 "iat": now,
351 "jti": a_random_uuid
352 }
354 :param str authority:
355 A URL that identifies a token authority. It should be of the format
356 ``https://login.microsoftonline.com/your_tenant``
357 By default, we will use ``https://login.microsoftonline.com/common``
359 *Changed in version 1.17*: you can also use predefined constant
360 and a builder like this::
362 from msal.authority import (
363 AuthorityBuilder,
364 AZURE_US_GOVERNMENT, AZURE_CHINA, AZURE_PUBLIC)
365 my_authority = AuthorityBuilder(AZURE_PUBLIC, "contoso.onmicrosoft.com")
366 # Now you get an equivalent of
367 # "https://login.microsoftonline.com/contoso.onmicrosoft.com"
369 # You can feed such an authority to msal's ClientApplication
370 from msal import PublicClientApplication
371 app = PublicClientApplication("my_client_id", authority=my_authority, ...)
373 :param bool validate_authority: (optional) Turns authority validation
374 on or off. This parameter default to true.
375 :param TokenCache token_cache:
376 Sets the token cache used by this ClientApplication instance.
377 By default, an in-memory cache will be created and used.
378 :param http_client: (optional)
379 Your implementation of abstract class HttpClient <msal.oauth2cli.http.http_client>
380 Defaults to a requests session instance.
381 Since MSAL 1.11.0, the default session would be configured
382 to attempt one retry on connection error.
383 If you are providing your own http_client,
384 it will be your http_client's duty to decide whether to perform retry.
386 :param verify: (optional)
387 It will be passed to the
388 `verify parameter in the underlying requests library
389 <http://docs.python-requests.org/en/v2.9.1/user/advanced/#ssl-cert-verification>`_
390 This does not apply if you have chosen to pass your own Http client
391 :param proxies: (optional)
392 It will be passed to the
393 `proxies parameter in the underlying requests library
394 <http://docs.python-requests.org/en/v2.9.1/user/advanced/#proxies>`_
395 This does not apply if you have chosen to pass your own Http client
396 :param timeout: (optional)
397 It will be passed to the
398 `timeout parameter in the underlying requests library
399 <http://docs.python-requests.org/en/v2.9.1/user/advanced/#timeouts>`_
400 This does not apply if you have chosen to pass your own Http client
401 :param app_name: (optional)
402 You can provide your application name for Microsoft telemetry purposes.
403 Default value is None, means it will not be passed to Microsoft.
404 :param app_version: (optional)
405 You can provide your application version for Microsoft telemetry purposes.
406 Default value is None, means it will not be passed to Microsoft.
407 :param list[str] client_capabilities: (optional)
408 Allows configuration of one or more client capabilities, e.g. ["CP1"].
410 Client capability is meant to inform the Microsoft identity platform
411 (STS) what this client is capable for,
412 so STS can decide to turn on certain features.
413 For example, if client is capable to handle *claims challenge*,
414 STS can then issue CAE access tokens to resources
415 knowing when the resource emits *claims challenge*
416 the client will be capable to handle.
418 Implementation details:
419 Client capability is implemented using "claims" parameter on the wire,
420 for now.
421 MSAL will combine them into
422 `claims parameter <https://openid.net/specs/openid-connect-core-1_0-final.html#ClaimsParameter>`_
423 which you will later provide via one of the acquire-token request.
425 :param str azure_region: (optional)
426 Instructs MSAL to use the Entra regional token service. This legacy feature is only available to
427 first-party applications. Only ``acquire_token_for_client()`` is supported.
429 Supports 3 values:
431 ``azure_region=None`` - meaning no region is used. This is the default value.
432 ``azure_region="some_region"`` - meaning the specified region is used.
433 ``azure_region=True`` - meaning MSAL will try to auto-detect the region. This is not recommended.
435 .. note::
436 Region auto-discovery has been tested on VMs and on Azure Functions. It is unreliable.
437 Applications using this option should configure a short timeout.
439 For more details and for the values of the region string
440 see https://learn.microsoft.com/entra/msal/dotnet/resources/region-discovery-troubleshooting
442 New in version 1.12.0.
444 :param list[str] exclude_scopes: (optional)
445 Historically MSAL hardcodes `offline_access` scope,
446 which would allow your app to have prolonged access to user's data.
447 If that is unnecessary or undesirable for your app,
448 now you can use this parameter to supply an exclusion list of scopes,
449 such as ``exclude_scopes = ["offline_access"]``.
451 :param dict http_cache:
452 MSAL has long been caching tokens in the ``token_cache``.
453 Recently, MSAL also introduced a concept of ``http_cache``,
454 by automatically caching some finite amount of non-token http responses,
455 so that *long-lived*
456 ``PublicClientApplication`` and ``ConfidentialClientApplication``
457 would be more performant and responsive in some situations.
459 This ``http_cache`` parameter accepts any dict-like object.
460 If not provided, MSAL will use an in-memory dict.
462 If your app is a command-line app (CLI),
463 you would want to persist your http_cache across different CLI runs.
464 The following recipe shows a way to do so::
466 # Just add the following lines at the beginning of your CLI script
467 import sys, atexit, pickle
468 http_cache_filename = sys.argv[0] + ".http_cache"
469 try:
470 with open(http_cache_filename, "rb") as f:
471 persisted_http_cache = pickle.load(f) # Take a snapshot
472 except (
473 FileNotFoundError, # Or IOError in Python 2
474 pickle.UnpicklingError, # A corrupted http cache file
475 ):
476 persisted_http_cache = {} # Recover by starting afresh
477 atexit.register(lambda: pickle.dump(
478 # When exit, flush it back to the file.
479 # It may occasionally overwrite another process's concurrent write,
480 # but that is fine. Subsequent runs will reach eventual consistency.
481 persisted_http_cache, open(http_cache_file, "wb")))
483 # And then you can implement your app as you normally would
484 app = msal.PublicClientApplication(
485 "your_client_id",
486 ...,
487 http_cache=persisted_http_cache, # Utilize persisted_http_cache
488 ...,
489 #token_cache=..., # You may combine the old token_cache trick
490 # Please refer to token_cache recipe at
491 # https://msal-python.readthedocs.io/en/latest/#msal.SerializableTokenCache
492 )
493 app.acquire_token_interactive(["your", "scope"], ...)
495 Content inside ``http_cache`` are cheap to obtain.
496 There is no need to share them among different apps.
498 Content inside ``http_cache`` will contain no tokens nor
499 Personally Identifiable Information (PII). Encryption is unnecessary.
501 New in version 1.16.0.
503 :param boolean instance_discovery:
504 Historically, MSAL would connect to a central endpoint located at
505 ``https://login.microsoftonline.com`` to acquire some metadata,
506 especially when using an unfamiliar authority.
507 This behavior is known as Instance Discovery.
509 This parameter defaults to None, which enables the Instance Discovery.
511 If you know some authorities which you allow MSAL to operate with as-is,
512 without involving any Instance Discovery, the recommended pattern is::
514 known_authorities = frozenset([ # Treat your known authorities as const
515 "https://contoso.com/adfs", "https://login.azs/foo"])
516 ...
517 authority = "https://contoso.com/adfs" # Assuming your app will use this
518 app1 = PublicClientApplication(
519 "client_id",
520 authority=authority,
521 # Conditionally disable Instance Discovery for known authorities
522 instance_discovery=authority not in known_authorities,
523 )
525 If you do not know some authorities beforehand,
526 yet still want MSAL to accept any authority that you will provide,
527 you can use a ``False`` to unconditionally disable Instance Discovery.
529 New in version 1.19.0.
531 :param boolean allow_broker:
532 Deprecated. Please use ``enable_broker_on_windows`` instead.
534 :param boolean enable_pii_log:
535 When enabled, logs may include PII (Personal Identifiable Information).
536 This can be useful in troubleshooting broker behaviors.
537 The default behavior is False.
539 New in version 1.24.0.
541 :param str oidc_authority:
542 *Added in version 1.28.0*:
543 It is a URL that identifies an OpenID Connect (OIDC) authority of
544 the format ``https://contoso.com/tenant``.
545 MSAL will append ".well-known/openid-configuration" to the authority
546 and retrieve the OIDC metadata from there, to figure out the endpoints.
548 Note: Broker will NOT be used for OIDC authority.
549 """
550 self.client_id = client_id
551 self.client_credential = client_credential
552 self.client_claims = client_claims
553 self._client_capabilities = client_capabilities
554 self._instance_discovery = instance_discovery
556 if exclude_scopes and not isinstance(exclude_scopes, list):
557 raise ValueError(
558 "Invalid exclude_scopes={}. It need to be a list of strings.".format(
559 repr(exclude_scopes)))
560 self._exclude_scopes = frozenset(exclude_scopes or [])
561 if "openid" in self._exclude_scopes:
562 raise ValueError(
563 'Invalid exclude_scopes={}. You can not opt out "openid" scope'.format(
564 repr(exclude_scopes)))
566 if http_client:
567 self.http_client = http_client
568 else:
569 import requests # Lazy load
571 self.http_client = requests.Session()
572 self.http_client.verify = verify
573 self.http_client.proxies = proxies
574 # Requests, does not support session - wide timeout
575 # But you can patch that (https://github.com/psf/requests/issues/3341):
576 self.http_client.request = functools.partial(
577 self.http_client.request, timeout=timeout)
579 # Enable a minimal retry. Better than nothing.
580 # https://github.com/psf/requests/blob/v2.25.1/requests/adapters.py#L94-L108
581 a = requests.adapters.HTTPAdapter(max_retries=1)
582 self.http_client.mount("http://", a)
583 self.http_client.mount("https://", a)
584 self.http_client = ThrottledHttpClient(
585 self.http_client,
586 http_cache=http_cache,
587 default_throttle_time=60
588 # The default value 60 was recommended mainly for PCA at the end of
589 # https://identitydivision.visualstudio.com/devex/_git/AuthLibrariesApiReview?version=GBdev&path=%2FService%20protection%2FIntial%20set%20of%20protection%20measures.md&_a=preview
590 if isinstance(self, PublicClientApplication) else 5,
591 )
593 self.app_name = app_name
594 self.app_version = app_version
596 # Here the self.authority will not be the same type as authority in input
597 if oidc_authority and authority:
598 raise ValueError("You can not provide both authority and oidc_authority")
599 try:
600 authority_to_use = authority or "https://{}/common/".format(WORLD_WIDE)
601 self.authority = Authority(
602 authority_to_use,
603 self.http_client,
604 validate_authority=validate_authority,
605 instance_discovery=self._instance_discovery,
606 oidc_authority_url=oidc_authority,
607 )
608 except ValueError: # Those are explicit authority validation errors
609 raise
610 except Exception: # The rest are typically connection errors
611 if validate_authority and azure_region and not oidc_authority:
612 # Since caller opts in to use region, here we tolerate connection
613 # errors happened during authority validation at non-region endpoint
614 self.authority = Authority(
615 authority_to_use,
616 self.http_client,
617 instance_discovery=False,
618 )
619 else:
620 raise
622 self._decide_broker(allow_broker, enable_pii_log)
623 self.token_cache = token_cache or TokenCache()
624 self._region_configured = azure_region
625 self._region_detected = None
626 self.client, self._regional_client = self._build_client(
627 client_credential, self.authority)
628 self.authority_groups = None
629 self._telemetry_buffer = {}
630 self._telemetry_lock = Lock()
632 def _decide_broker(self, allow_broker, enable_pii_log):
633 is_confidential_app = self.client_credential or isinstance(
634 self, ConfidentialClientApplication)
635 if is_confidential_app and allow_broker:
636 raise ValueError("allow_broker=True is only supported in PublicClientApplication")
637 # Historically, we chose to support ClientApplication("client_id", allow_broker=True)
638 if allow_broker:
639 warnings.warn(
640 "allow_broker is deprecated. "
641 "Please use PublicClientApplication(..., enable_broker_on_windows=True)",
642 DeprecationWarning)
643 self._enable_broker = self._enable_broker or (
644 # When we started the broker project on Windows platform,
645 # the allow_broker was meant to be cross-platform. Now we realize
646 # that other platforms have different redirect_uri requirements,
647 # so the old allow_broker is deprecated and will only for Windows.
648 allow_broker and sys.platform == "win32")
649 if (self._enable_broker and not is_confidential_app
650 and not self.authority.is_adfs and not self.authority._is_b2c):
651 try:
652 from . import broker # Trigger Broker's initialization
653 if enable_pii_log:
654 broker._enable_pii_log()
655 except RuntimeError:
656 self._enable_broker = False
657 logger.exception(
658 "Broker is unavailable on this platform. "
659 "We will fallback to non-broker.")
660 logger.debug("Broker enabled? %s", self._enable_broker)
662 def is_pop_supported(self):
663 """Returns True if this client supports Proof-of-Possession Access Token."""
664 return self._enable_broker
666 def _decorate_scope(
667 self, scopes,
668 reserved_scope=frozenset(['openid', 'profile', 'offline_access'])):
669 if not isinstance(scopes, (list, set, tuple)):
670 raise ValueError("The input scopes should be a list, tuple, or set")
671 scope_set = set(scopes) # Input scopes is typically a list. Copy it to a set.
672 if scope_set & reserved_scope:
673 # These scopes are reserved for the API to provide good experience.
674 # We could make the developer pass these and then if they do they will
675 # come back asking why they don't see refresh token or user information.
676 raise ValueError(
677 """You cannot use any scope value that is reserved.
678Your input: {}
679The reserved list: {}""".format(list(scope_set), list(reserved_scope)))
680 raise ValueError(
681 "You cannot use any scope value that is in this reserved list: {}".format(
682 list(reserved_scope)))
684 # client_id can also be used as a scope in B2C
685 decorated = scope_set | reserved_scope
686 decorated -= self._exclude_scopes
687 return list(decorated)
689 def _build_telemetry_context(
690 self, api_id, correlation_id=None, refresh_reason=None):
691 return msal.telemetry._TelemetryContext(
692 self._telemetry_buffer, self._telemetry_lock, api_id,
693 correlation_id=correlation_id, refresh_reason=refresh_reason)
695 def _get_regional_authority(self, central_authority):
696 if not self._region_configured: # User did not opt-in to ESTS-R
697 return None # Short circuit to completely bypass region detection
698 self._region_detected = self._region_detected or _detect_region(
699 self.http_client if self._region_configured is not None else None)
700 if (self._region_configured != self.ATTEMPT_REGION_DISCOVERY
701 and self._region_configured != self._region_detected):
702 logger.warning('Region configured ({}) != region detected ({})'.format(
703 repr(self._region_configured), repr(self._region_detected)))
704 region_to_use = (
705 self._region_detected
706 if self._region_configured == self.ATTEMPT_REGION_DISCOVERY
707 else self._region_configured) # It will retain the None i.e. opted out
708 logger.debug('Region to be used: {}'.format(repr(region_to_use)))
709 if region_to_use:
710 regional_host = ("{}.login.microsoft.com".format(region_to_use)
711 if central_authority.instance in (
712 # The list came from point 3 of the algorithm section in this internal doc
713 # https://identitydivision.visualstudio.com/DevEx/_git/AuthLibrariesApiReview?path=/PinAuthToRegion/AAD%20SDK%20Proposal%20to%20Pin%20Auth%20to%20region.md&anchor=algorithm&_a=preview
714 "login.microsoftonline.com",
715 "login.microsoft.com",
716 "login.windows.net",
717 "sts.windows.net",
718 )
719 else "{}.{}".format(region_to_use, central_authority.instance))
720 return Authority( # The central_authority has already been validated
721 "https://{}/{}".format(regional_host, central_authority.tenant),
722 self.http_client,
723 instance_discovery=False,
724 )
725 return None
727 def _build_client(self, client_credential, authority, skip_regional_client=False):
728 client_assertion = None
729 client_assertion_type = None
730 default_headers = {
731 "x-client-sku": "MSAL.Python", "x-client-ver": __version__,
732 "x-client-os": sys.platform,
733 "x-ms-lib-capability": "retry-after, h429",
734 }
735 if self.app_name:
736 default_headers['x-app-name'] = self.app_name
737 if self.app_version:
738 default_headers['x-app-ver'] = self.app_version
739 default_body = {"client_info": 1}
740 if isinstance(client_credential, dict):
741 client_assertion_type = Client.CLIENT_ASSERTION_TYPE_JWT
742 # Use client_credential.get("...") rather than "..." in client_credential
743 # so that we can ignore an empty string came from an empty ENV VAR.
744 if client_credential.get("client_assertion"):
745 client_assertion = client_credential['client_assertion']
746 else:
747 headers = {}
748 sha1_thumbprint = sha256_thumbprint = None
749 passphrase_bytes = _str2bytes(
750 client_credential["passphrase"]
751 ) if client_credential.get("passphrase") else None
752 if client_credential.get("private_key_pfx_path"):
753 private_key, sha256_thumbprint, sha1_thumbprint, x5c = _parse_pfx(
754 client_credential["private_key_pfx_path"],
755 passphrase_bytes)
756 if client_credential.get("public_certificate") is True and x5c:
757 headers["x5c"] = x5c
758 elif (
759 client_credential.get("private_key") # PEM blob
760 and client_credential.get("thumbprint")):
761 sha1_thumbprint = client_credential["thumbprint"]
762 if passphrase_bytes:
763 private_key = _load_private_key_from_pem_str(
764 client_credential['private_key'], passphrase_bytes)
765 else: # PEM without passphrase
766 private_key = client_credential['private_key']
767 else:
768 raise ValueError(
769 "client_credential needs to follow this format "
770 "https://msal-python.readthedocs.io/en/latest/#msal.ClientApplication.params.client_credential")
771 if ("x5c" not in headers # So the .pfx file contains no certificate
772 and isinstance(client_credential.get('public_certificate'), str)
773 ): # Then we treat the public_certificate value as PEM content
774 headers["x5c"] = extract_certs(client_credential['public_certificate'])
775 if sha256_thumbprint and not authority.is_adfs:
776 assertion_params = {
777 "algorithm": "PS256", "sha256_thumbprint": sha256_thumbprint,
778 }
779 else: # Fall back
780 if not sha1_thumbprint:
781 raise ValueError("You shall provide a thumbprint in SHA1.")
782 assertion_params = {
783 "algorithm": "RS256", "sha1_thumbprint": sha1_thumbprint,
784 }
785 assertion = JwtAssertionCreator(
786 private_key, headers=headers, **assertion_params)
787 client_assertion = assertion.create_regenerative_assertion(
788 audience=authority.token_endpoint, issuer=self.client_id,
789 additional_claims=self.client_claims or {})
790 else:
791 default_body['client_secret'] = client_credential
792 central_configuration = {
793 "authorization_endpoint": authority.authorization_endpoint,
794 "token_endpoint": authority.token_endpoint,
795 "device_authorization_endpoint": authority.device_authorization_endpoint,
796 }
797 central_client = _ClientWithCcsRoutingInfo(
798 central_configuration,
799 self.client_id,
800 http_client=self.http_client,
801 default_headers=default_headers,
802 default_body=default_body,
803 client_assertion=client_assertion,
804 client_assertion_type=client_assertion_type,
805 on_obtaining_tokens=lambda event: self.token_cache.add(dict(
806 event, environment=authority.instance)),
807 on_removing_rt=self.token_cache.remove_rt,
808 on_updating_rt=self.token_cache.update_rt)
810 regional_client = None
811 if (client_credential # Currently regional endpoint only serves some CCA flows
812 and not skip_regional_client):
813 regional_authority = self._get_regional_authority(authority)
814 if regional_authority:
815 regional_configuration = {
816 "authorization_endpoint": regional_authority.authorization_endpoint,
817 "token_endpoint": regional_authority.token_endpoint,
818 "device_authorization_endpoint":
819 regional_authority.device_authorization_endpoint,
820 }
821 regional_client = _ClientWithCcsRoutingInfo(
822 regional_configuration,
823 self.client_id,
824 http_client=self.http_client,
825 default_headers=default_headers,
826 default_body=default_body,
827 client_assertion=client_assertion,
828 client_assertion_type=client_assertion_type,
829 on_obtaining_tokens=lambda event: self.token_cache.add(dict(
830 event, environment=authority.instance)),
831 on_removing_rt=self.token_cache.remove_rt,
832 on_updating_rt=self.token_cache.update_rt)
833 return central_client, regional_client
835 def initiate_auth_code_flow(
836 self,
837 scopes, # type: list[str]
838 redirect_uri=None,
839 state=None, # Recommended by OAuth2 for CSRF protection
840 prompt=None,
841 login_hint=None, # type: Optional[str]
842 domain_hint=None, # type: Optional[str]
843 claims_challenge=None,
844 max_age=None,
845 response_mode=None, # type: Optional[str]
846 ):
847 """Initiate an auth code flow.
849 Later when the response reaches your redirect_uri,
850 you can use :func:`~acquire_token_by_auth_code_flow()`
851 to complete the authentication/authorization.
853 :param list scopes:
854 It is a list of case-sensitive strings.
855 :param str redirect_uri:
856 Optional. If not specified, server will use the pre-registered one.
857 :param str state:
858 An opaque value used by the client to
859 maintain state between the request and callback.
860 If absent, this library will automatically generate one internally.
861 :param str prompt:
862 By default, no prompt value will be sent, not even string ``"none"``.
863 You will have to specify a value explicitly.
864 Its valid values are the constants defined in
865 :class:`Prompt <msal.Prompt>`.
867 :param str login_hint:
868 Optional. Identifier of the user. Generally a User Principal Name (UPN).
869 :param domain_hint:
870 Can be one of "consumers" or "organizations" or your tenant domain "contoso.com".
871 If included, it will skip the email-based discovery process that user goes
872 through on the sign-in page, leading to a slightly more streamlined user experience.
873 More information on possible values available in
874 `Auth Code Flow doc <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-auth-code-flow#request-an-authorization-code>`_ and
875 `domain_hint doc <https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-oapx/86fb452d-e34a-494e-ac61-e526e263b6d8>`_.
877 :param int max_age:
878 OPTIONAL. Maximum Authentication Age.
879 Specifies the allowable elapsed time in seconds
880 since the last time the End-User was actively authenticated.
881 If the elapsed time is greater than this value,
882 Microsoft identity platform will actively re-authenticate the End-User.
884 MSAL Python will also automatically validate the auth_time in ID token.
886 New in version 1.15.
888 :param str response_mode:
889 OPTIONAL. Specifies the method with which response parameters should be returned.
890 The default value is equivalent to ``query``, which is still secure enough in MSAL Python
891 (because MSAL Python does not transfer tokens via query parameter in the first place).
892 For even better security, we recommend using the value ``form_post``.
893 In "form_post" mode, response parameters
894 will be encoded as HTML form values that are transmitted via the HTTP POST method and
895 encoded in the body using the application/x-www-form-urlencoded format.
896 Valid values can be either "form_post" for HTTP POST to callback URI or
897 "query" (the default) for HTTP GET with parameters encoded in query string.
898 More information on possible values
899 `here <https://openid.net/specs/oauth-v2-multiple-response-types-1_0.html#ResponseModes>`
900 and `here <https://openid.net/specs/oauth-v2-form-post-response-mode-1_0.html#FormPostResponseMode>`
902 :return:
903 The auth code flow. It is a dict in this form::
905 {
906 "auth_uri": "https://...", // Guide user to visit this
907 "state": "...", // You may choose to verify it by yourself,
908 // or just let acquire_token_by_auth_code_flow()
909 // do that for you.
910 "...": "...", // Everything else are reserved and internal
911 }
913 The caller is expected to:
915 1. somehow store this content, typically inside the current session,
916 2. guide the end user (i.e. resource owner) to visit that auth_uri,
917 3. and then relay this dict and subsequent auth response to
918 :func:`~acquire_token_by_auth_code_flow()`.
919 """
920 client = _ClientWithCcsRoutingInfo(
921 {"authorization_endpoint": self.authority.authorization_endpoint},
922 self.client_id,
923 http_client=self.http_client)
924 flow = client.initiate_auth_code_flow(
925 redirect_uri=redirect_uri, state=state, login_hint=login_hint,
926 prompt=prompt,
927 scope=self._decorate_scope(scopes),
928 domain_hint=domain_hint,
929 claims=_merge_claims_challenge_and_capabilities(
930 self._client_capabilities, claims_challenge),
931 max_age=max_age,
932 response_mode=response_mode,
933 )
934 flow["claims_challenge"] = claims_challenge
935 return flow
937 def get_authorization_request_url(
938 self,
939 scopes, # type: list[str]
940 login_hint=None, # type: Optional[str]
941 state=None, # Recommended by OAuth2 for CSRF protection
942 redirect_uri=None,
943 response_type="code", # Could be "token" if you use Implicit Grant
944 prompt=None,
945 nonce=None,
946 domain_hint=None, # type: Optional[str]
947 claims_challenge=None,
948 **kwargs):
949 """Constructs a URL for you to start a Authorization Code Grant.
951 :param list[str] scopes: (Required)
952 Scopes requested to access a protected API (a resource).
953 :param str state: Recommended by OAuth2 for CSRF protection.
954 :param str login_hint:
955 Identifier of the user. Generally a User Principal Name (UPN).
956 :param str redirect_uri:
957 Address to return to upon receiving a response from the authority.
958 :param str response_type:
959 Default value is "code" for an OAuth2 Authorization Code grant.
961 You could use other content such as "id_token" or "token",
962 which would trigger an Implicit Grant, but that is
963 `not recommended <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-implicit-grant-flow#is-the-implicit-grant-suitable-for-my-app>`_.
965 :param str prompt:
966 By default, no prompt value will be sent, not even string ``"none"``.
967 You will have to specify a value explicitly.
968 Its valid values are the constants defined in
969 :class:`Prompt <msal.Prompt>`.
970 :param nonce:
971 A cryptographically random value used to mitigate replay attacks. See also
972 `OIDC specs <https://openid.net/specs/openid-connect-core-1_0.html#AuthRequest>`_.
973 :param domain_hint:
974 Can be one of "consumers" or "organizations" or your tenant domain "contoso.com".
975 If included, it will skip the email-based discovery process that user goes
976 through on the sign-in page, leading to a slightly more streamlined user experience.
977 More information on possible values available in
978 `Auth Code Flow doc <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-auth-code-flow#request-an-authorization-code>`_ and
979 `domain_hint doc <https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-oapx/86fb452d-e34a-494e-ac61-e526e263b6d8>`_.
980 :param claims_challenge:
981 The claims_challenge parameter requests specific claims requested by the resource provider
982 in the form of a claims_challenge directive in the www-authenticate header to be
983 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
984 It is a string of a JSON object which contains lists of claims being requested from these locations.
986 :return: The authorization url as a string.
987 """
988 authority = kwargs.pop("authority", None) # Historically we support this
989 if authority:
990 warnings.warn(
991 "We haven't decided if this method will accept authority parameter")
992 # The previous implementation is, it will use self.authority by default.
993 # Multi-tenant app can use new authority on demand
994 the_authority = Authority(
995 authority,
996 self.http_client,
997 instance_discovery=self._instance_discovery,
998 ) if authority else self.authority
1000 client = _ClientWithCcsRoutingInfo(
1001 {"authorization_endpoint": the_authority.authorization_endpoint},
1002 self.client_id,
1003 http_client=self.http_client)
1004 warnings.warn(
1005 "Change your get_authorization_request_url() "
1006 "to initiate_auth_code_flow()", DeprecationWarning)
1007 with warnings.catch_warnings(record=True):
1008 return client.build_auth_request_uri(
1009 response_type=response_type,
1010 redirect_uri=redirect_uri, state=state, login_hint=login_hint,
1011 prompt=prompt,
1012 scope=self._decorate_scope(scopes),
1013 nonce=nonce,
1014 domain_hint=domain_hint,
1015 claims=_merge_claims_challenge_and_capabilities(
1016 self._client_capabilities, claims_challenge),
1017 )
1019 def acquire_token_by_auth_code_flow(
1020 self, auth_code_flow, auth_response, scopes=None, **kwargs):
1021 """Validate the auth response being redirected back, and obtain tokens.
1023 It automatically provides nonce protection.
1025 :param dict auth_code_flow:
1026 The same dict returned by :func:`~initiate_auth_code_flow()`.
1027 :param dict auth_response:
1028 A dict of the query string received from auth server.
1029 :param list[str] scopes:
1030 Scopes requested to access a protected API (a resource).
1032 Most of the time, you can leave it empty.
1034 If you requested user consent for multiple resources, here you will
1035 need to provide a subset of what you required in
1036 :func:`~initiate_auth_code_flow()`.
1038 OAuth2 was designed mostly for singleton services,
1039 where tokens are always meant for the same resource and the only
1040 changes are in the scopes.
1041 In Microsoft Entra, tokens can be issued for multiple 3rd party resources.
1042 You can ask authorization code for multiple resources,
1043 but when you redeem it, the token is for only one intended
1044 recipient, called audience.
1045 So the developer need to specify a scope so that we can restrict the
1046 token to be issued for the corresponding audience.
1048 :return:
1049 * A dict containing "access_token" and/or "id_token", among others,
1050 depends on what scope was used.
1051 (See https://tools.ietf.org/html/rfc6749#section-5.1)
1052 * A dict containing "error", optionally "error_description", "error_uri".
1053 (It is either `this <https://tools.ietf.org/html/rfc6749#section-4.1.2.1>`_
1054 or `that <https://tools.ietf.org/html/rfc6749#section-5.2>`_)
1055 * Most client-side data error would result in ValueError exception.
1056 So the usage pattern could be without any protocol details::
1058 def authorize(): # A controller in a web app
1059 try:
1060 result = msal_app.acquire_token_by_auth_code_flow(
1061 session.get("flow", {}), request.args)
1062 if "error" in result:
1063 return render_template("error.html", result)
1064 use(result) # Token(s) are available in result and cache
1065 except ValueError: # Usually caused by CSRF
1066 pass # Simply ignore them
1067 return redirect(url_for("index"))
1068 """
1069 self._validate_ssh_cert_input_data(kwargs.get("data", {}))
1070 telemetry_context = self._build_telemetry_context(
1071 self.ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID)
1072 response = _clean_up(self.client.obtain_token_by_auth_code_flow(
1073 auth_code_flow,
1074 auth_response,
1075 scope=self._decorate_scope(scopes) if scopes else None,
1076 headers=telemetry_context.generate_headers(),
1077 data=dict(
1078 kwargs.pop("data", {}),
1079 claims=_merge_claims_challenge_and_capabilities(
1080 self._client_capabilities,
1081 auth_code_flow.pop("claims_challenge", None))),
1082 **kwargs))
1083 if "access_token" in response:
1084 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP
1085 telemetry_context.update_telemetry(response)
1086 return response
1088 def acquire_token_by_authorization_code(
1089 self,
1090 code,
1091 scopes, # Syntactically required. STS accepts empty value though.
1092 redirect_uri=None,
1093 # REQUIRED, if the "redirect_uri" parameter was included in the
1094 # authorization request as described in Section 4.1.1, and their
1095 # values MUST be identical.
1096 nonce=None,
1097 claims_challenge=None,
1098 **kwargs):
1099 """The second half of the Authorization Code Grant.
1101 :param code: The authorization code returned from Authorization Server.
1102 :param list[str] scopes: (Required)
1103 Scopes requested to access a protected API (a resource).
1105 If you requested user consent for multiple resources, here you will
1106 typically want to provide a subset of what you required in AuthCode.
1108 OAuth2 was designed mostly for singleton services,
1109 where tokens are always meant for the same resource and the only
1110 changes are in the scopes.
1111 In Microsoft Entra, tokens can be issued for multiple 3rd party resources.
1112 You can ask authorization code for multiple resources,
1113 but when you redeem it, the token is for only one intended
1114 recipient, called audience.
1115 So the developer need to specify a scope so that we can restrict the
1116 token to be issued for the corresponding audience.
1118 :param nonce:
1119 If you provided a nonce when calling :func:`get_authorization_request_url`,
1120 same nonce should also be provided here, so that we'll validate it.
1121 An exception will be raised if the nonce in id token mismatches.
1123 :param claims_challenge:
1124 The claims_challenge parameter requests specific claims requested by the resource provider
1125 in the form of a claims_challenge directive in the www-authenticate header to be
1126 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
1127 It is a string of a JSON object which contains lists of claims being requested from these locations.
1129 :return: A dict representing the json response from Microsoft Entra:
1131 - A successful response would contain "access_token" key,
1132 - an error response would contain "error" and usually "error_description".
1133 """
1134 # If scope is absent on the wire, STS will give you a token associated
1135 # to the FIRST scope sent during the authorization request.
1136 # So in theory, you can omit scope here when you were working with only
1137 # one scope. But, MSAL decorates your scope anyway, so they are never
1138 # really empty.
1139 assert isinstance(scopes, list), "Invalid parameter type"
1140 self._validate_ssh_cert_input_data(kwargs.get("data", {}))
1141 warnings.warn(
1142 "Change your acquire_token_by_authorization_code() "
1143 "to acquire_token_by_auth_code_flow()", DeprecationWarning)
1144 with warnings.catch_warnings(record=True):
1145 telemetry_context = self._build_telemetry_context(
1146 self.ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID)
1147 response = _clean_up(self.client.obtain_token_by_authorization_code(
1148 code, redirect_uri=redirect_uri,
1149 scope=self._decorate_scope(scopes),
1150 headers=telemetry_context.generate_headers(),
1151 data=dict(
1152 kwargs.pop("data", {}),
1153 claims=_merge_claims_challenge_and_capabilities(
1154 self._client_capabilities, claims_challenge)),
1155 nonce=nonce,
1156 **kwargs))
1157 if "access_token" in response:
1158 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP
1159 telemetry_context.update_telemetry(response)
1160 return response
1162 def get_accounts(self, username=None):
1163 """Get a list of accounts which previously signed in, i.e. exists in cache.
1165 An account can later be used in :func:`~acquire_token_silent`
1166 to find its tokens.
1168 :param username:
1169 Filter accounts with this username only. Case insensitive.
1170 :return: A list of account objects.
1171 Each account is a dict. For now, we only document its "username" field.
1172 Your app can choose to display those information to end user,
1173 and allow user to choose one of his/her accounts to proceed.
1174 """
1175 accounts = self._find_msal_accounts(environment=self.authority.instance)
1176 if not accounts: # Now try other aliases of this authority instance
1177 for alias in self._get_authority_aliases(self.authority.instance):
1178 accounts = self._find_msal_accounts(environment=alias)
1179 if accounts:
1180 break
1181 if username:
1182 # Federated account["username"] from AAD could contain mixed case
1183 lowercase_username = username.lower()
1184 accounts = [a for a in accounts
1185 if a["username"].lower() == lowercase_username]
1186 if not accounts:
1187 logger.debug(( # This would also happen when the cache is empty
1188 "get_accounts(username='{}') finds no account. "
1189 "If tokens were acquired without 'profile' scope, "
1190 "they would contain no username for filtering. "
1191 "Consider calling get_accounts(username=None) instead."
1192 ).format(username))
1193 # Does not further filter by existing RTs here. It probably won't matter.
1194 # Because in most cases Accounts and RTs co-exist.
1195 # Even in the rare case when an RT is revoked and then removed,
1196 # acquire_token_silent() would then yield no result,
1197 # apps would fall back to other acquire methods. This is the standard pattern.
1198 return accounts
1200 def _find_msal_accounts(self, environment):
1201 interested_authority_types = [
1202 TokenCache.AuthorityType.ADFS, TokenCache.AuthorityType.MSSTS]
1203 if _is_running_in_cloud_shell():
1204 interested_authority_types.append(_AUTHORITY_TYPE_CLOUDSHELL)
1205 grouped_accounts = {
1206 a.get("home_account_id"): # Grouped by home tenant's id
1207 { # These are minimal amount of non-tenant-specific account info
1208 "home_account_id": a.get("home_account_id"),
1209 "environment": a.get("environment"),
1210 "username": a.get("username"),
1211 "account_source": a.get("account_source"),
1213 # The following fields for backward compatibility, for now
1214 "authority_type": a.get("authority_type"),
1215 "local_account_id": a.get("local_account_id"), # Tenant-specific
1216 "realm": a.get("realm"), # Tenant-specific
1217 }
1218 for a in self.token_cache.search(
1219 TokenCache.CredentialType.ACCOUNT,
1220 query={"environment": environment})
1221 if a["authority_type"] in interested_authority_types
1222 }
1223 return list(grouped_accounts.values())
1225 def _get_instance_metadata(self): # This exists so it can be mocked in unit test
1226 resp = self.http_client.get(
1227 "https://login.microsoftonline.com/common/discovery/instance?api-version=1.1&authorization_endpoint=https://login.microsoftonline.com/common/oauth2/authorize", # TBD: We may extend this to use self._instance_discovery endpoint
1228 headers={'Accept': 'application/json'})
1229 resp.raise_for_status()
1230 return json.loads(resp.text)['metadata']
1232 def _get_authority_aliases(self, instance):
1233 if self._instance_discovery is False:
1234 return []
1235 if self.authority._is_known_to_developer:
1236 # Then it is an ADFS/B2C/known_authority_hosts situation
1237 # which may not reach the central endpoint, so we skip it.
1238 return []
1239 if not self.authority_groups:
1240 self.authority_groups = [
1241 set(group['aliases']) for group in self._get_instance_metadata()]
1242 for group in self.authority_groups:
1243 if instance in group:
1244 return [alias for alias in group if alias != instance]
1245 return []
1247 def remove_account(self, account):
1248 """Sign me out and forget me from token cache"""
1249 if self._enable_broker:
1250 from .broker import _signout_silently
1251 error = _signout_silently(self.client_id, account["local_account_id"])
1252 if error:
1253 logger.debug("_signout_silently() returns error: %s", error)
1254 # Broker sign-out has been attempted, even if the _forget_me() below throws.
1255 self._forget_me(account)
1257 def _sign_out(self, home_account):
1258 # Remove all relevant RTs and ATs from token cache
1259 owned_by_home_account = {
1260 "environment": home_account["environment"],
1261 "home_account_id": home_account["home_account_id"],} # realm-independent
1262 app_metadata = self._get_app_metadata(home_account["environment"])
1263 # Remove RTs/FRTs, and they are realm-independent
1264 for rt in [ # Remove RTs from a static list (rather than from a dynamic generator),
1265 # to avoid changing self.token_cache while it is being iterated
1266 rt for rt in self.token_cache.search(
1267 TokenCache.CredentialType.REFRESH_TOKEN, query=owned_by_home_account)
1268 # Do RT's app ownership check as a precaution, in case family apps
1269 # and 3rd-party apps share same token cache, although they should not.
1270 if rt["client_id"] == self.client_id or (
1271 app_metadata.get("family_id") # Now let's settle family business
1272 and rt.get("family_id") == app_metadata["family_id"])
1273 ]:
1274 self.token_cache.remove_rt(rt)
1275 for at in list(self.token_cache.search( # Remove ATs from a static list,
1276 # to avoid changing self.token_cache while it is being iterated
1277 TokenCache.CredentialType.ACCESS_TOKEN, query=owned_by_home_account,
1278 # Regardless of realm, b/c we've removed realm-independent RTs anyway
1279 )):
1280 # To avoid the complexity of locating sibling family app's AT,
1281 # we skip AT's app ownership check.
1282 # It means ATs for other apps will also be removed, it is OK because:
1283 # * non-family apps are not supposed to share token cache to begin with;
1284 # * Even if it happens, we keep other app's RT already, so SSO still works
1285 self.token_cache.remove_at(at)
1287 def _forget_me(self, home_account):
1288 # It implies signout, and then also remove all relevant accounts and IDTs
1289 self._sign_out(home_account)
1290 owned_by_home_account = {
1291 "environment": home_account["environment"],
1292 "home_account_id": home_account["home_account_id"],} # realm-independent
1293 for idt in list(self.token_cache.search( # Remove IDTs from a static list,
1294 # to avoid changing self.token_cache while it is being iterated
1295 TokenCache.CredentialType.ID_TOKEN, query=owned_by_home_account, # regardless of realm
1296 )):
1297 self.token_cache.remove_idt(idt)
1298 for a in list(self.token_cache.search( # Remove Accounts from a static list,
1299 # to avoid changing self.token_cache while it is being iterated
1300 TokenCache.CredentialType.ACCOUNT, query=owned_by_home_account, # regardless of realm
1301 )):
1302 self.token_cache.remove_account(a)
1304 def _acquire_token_by_cloud_shell(self, scopes, data=None):
1305 from .cloudshell import _obtain_token
1306 response = _obtain_token(
1307 self.http_client, scopes, client_id=self.client_id, data=data)
1308 if "error" not in response:
1309 self.token_cache.add(dict(
1310 client_id=self.client_id,
1311 scope=response["scope"].split() if "scope" in response else scopes,
1312 token_endpoint=self.authority.token_endpoint,
1313 response=response,
1314 data=data or {},
1315 authority_type=_AUTHORITY_TYPE_CLOUDSHELL,
1316 ))
1317 if "access_token" in response:
1318 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_BROKER
1319 return response
1321 def acquire_token_silent(
1322 self,
1323 scopes, # type: List[str]
1324 account, # type: Optional[Account]
1325 authority=None, # See get_authorization_request_url()
1326 force_refresh=False, # type: Optional[boolean]
1327 claims_challenge=None,
1328 auth_scheme=None,
1329 **kwargs):
1330 """Acquire an access token for given account, without user interaction.
1332 It has same parameters as the :func:`~acquire_token_silent_with_error`.
1333 The difference is the behavior of the return value.
1334 This method will combine the cache empty and refresh error
1335 into one return value, `None`.
1336 If your app does not care about the exact token refresh error during
1337 token cache look-up, then this method is easier and recommended.
1339 :return:
1340 - A dict containing no "error" key,
1341 and typically contains an "access_token" key,
1342 if cache lookup succeeded.
1343 - None when cache lookup does not yield a token.
1344 """
1345 if not account:
1346 return None # A backward-compatible NO-OP to drop the account=None usage
1347 result = _clean_up(self._acquire_token_silent_with_error(
1348 scopes, account, authority=authority, force_refresh=force_refresh,
1349 claims_challenge=claims_challenge, auth_scheme=auth_scheme, **kwargs))
1350 return result if result and "error" not in result else None
1352 def acquire_token_silent_with_error(
1353 self,
1354 scopes, # type: List[str]
1355 account, # type: Optional[Account]
1356 authority=None, # See get_authorization_request_url()
1357 force_refresh=False, # type: Optional[boolean]
1358 claims_challenge=None,
1359 auth_scheme=None,
1360 **kwargs):
1361 """Acquire an access token for given account, without user interaction.
1363 It is done either by finding a valid access token from cache,
1364 or by finding a valid refresh token from cache and then automatically
1365 use it to redeem a new access token.
1367 This method will differentiate cache empty from token refresh error.
1368 If your app cares the exact token refresh error during
1369 token cache look-up, then this method is suitable.
1370 Otherwise, the other method :func:`~acquire_token_silent` is recommended.
1372 :param list[str] scopes: (Required)
1373 Scopes requested to access a protected API (a resource).
1374 :param account: (Required)
1375 One of the account object returned by :func:`~get_accounts`.
1376 Starting from MSAL Python 1.23,
1377 a ``None`` input will become a NO-OP and always return ``None``.
1378 :param force_refresh:
1379 If True, it will skip Access Token look-up,
1380 and try to find a Refresh Token to obtain a new Access Token.
1381 :param claims_challenge:
1382 The claims_challenge parameter requests specific claims requested by the resource provider
1383 in the form of a claims_challenge directive in the www-authenticate header to be
1384 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
1385 It is a string of a JSON object which contains lists of claims being requested from these locations.
1386 :param object auth_scheme:
1387 You can provide an ``msal.auth_scheme.PopAuthScheme`` object
1388 so that MSAL will get a Proof-of-Possession (POP) token for you.
1390 New in version 1.26.0.
1392 :return:
1393 - A dict containing no "error" key,
1394 and typically contains an "access_token" key,
1395 if cache lookup succeeded.
1396 - None when there is simply no token in the cache.
1397 - A dict containing an "error" key, when token refresh failed.
1398 """
1399 if not account:
1400 return None # A backward-compatible NO-OP to drop the account=None usage
1401 return _clean_up(self._acquire_token_silent_with_error(
1402 scopes, account, authority=authority, force_refresh=force_refresh,
1403 claims_challenge=claims_challenge, auth_scheme=auth_scheme, **kwargs))
1405 def _acquire_token_silent_with_error(
1406 self,
1407 scopes, # type: List[str]
1408 account, # type: Optional[Account]
1409 authority=None, # See get_authorization_request_url()
1410 force_refresh=False, # type: Optional[boolean]
1411 claims_challenge=None,
1412 auth_scheme=None,
1413 **kwargs):
1414 assert isinstance(scopes, list), "Invalid parameter type"
1415 self._validate_ssh_cert_input_data(kwargs.get("data", {}))
1416 correlation_id = msal.telemetry._get_new_correlation_id()
1417 if authority:
1418 warnings.warn("We haven't decided how/if this method will accept authority parameter")
1419 # the_authority = Authority(
1420 # authority,
1421 # self.http_client,
1422 # instance_discovery=self._instance_discovery,
1423 # ) if authority else self.authority
1424 result = self._acquire_token_silent_from_cache_and_possibly_refresh_it(
1425 scopes, account, self.authority, force_refresh=force_refresh,
1426 claims_challenge=claims_challenge,
1427 correlation_id=correlation_id,
1428 auth_scheme=auth_scheme,
1429 **kwargs)
1430 if result and "error" not in result:
1431 return result
1432 final_result = result
1433 for alias in self._get_authority_aliases(self.authority.instance):
1434 if not list(self.token_cache.search( # Need a list to test emptiness
1435 self.token_cache.CredentialType.REFRESH_TOKEN,
1436 # target=scopes, # MUST NOT filter by scopes, because:
1437 # 1. AAD RTs are scope-independent;
1438 # 2. therefore target is optional per schema;
1439 query={"environment": alias})):
1440 # Skip heavy weight logic when RT for this alias doesn't exist
1441 continue
1442 the_authority = Authority(
1443 "https://" + alias + "/" + self.authority.tenant,
1444 self.http_client,
1445 instance_discovery=False,
1446 )
1447 result = self._acquire_token_silent_from_cache_and_possibly_refresh_it(
1448 scopes, account, the_authority, force_refresh=force_refresh,
1449 claims_challenge=claims_challenge,
1450 correlation_id=correlation_id,
1451 auth_scheme=auth_scheme,
1452 **kwargs)
1453 if result:
1454 if "error" not in result:
1455 return result
1456 final_result = result
1457 if final_result and final_result.get("suberror"):
1458 final_result["classification"] = { # Suppress these suberrors, per #57
1459 "bad_token": "",
1460 "token_expired": "",
1461 "protection_policy_required": "",
1462 "client_mismatch": "",
1463 "device_authentication_failed": "",
1464 }.get(final_result["suberror"], final_result["suberror"])
1465 return final_result
1467 def _acquire_token_silent_from_cache_and_possibly_refresh_it(
1468 self,
1469 scopes, # type: List[str]
1470 account, # type: Optional[Account]
1471 authority, # This can be different than self.authority
1472 force_refresh=False, # type: Optional[boolean]
1473 claims_challenge=None,
1474 correlation_id=None,
1475 http_exceptions=None,
1476 auth_scheme=None,
1477 **kwargs):
1478 # This internal method has two calling patterns:
1479 # it accepts a non-empty account to find token for a user,
1480 # and accepts account=None to find a token for the current app.
1481 access_token_from_cache = None
1482 if not (force_refresh or claims_challenge or auth_scheme): # Then attempt AT cache
1483 query={
1484 "client_id": self.client_id,
1485 "environment": authority.instance,
1486 "realm": authority.tenant,
1487 "home_account_id": (account or {}).get("home_account_id"),
1488 }
1489 key_id = kwargs.get("data", {}).get("key_id")
1490 if key_id: # Some token types (SSH-certs, POP) are bound to a key
1491 query["key_id"] = key_id
1492 now = time.time()
1493 refresh_reason = msal.telemetry.AT_ABSENT
1494 for entry in self.token_cache.search( # A generator allows us to
1495 # break early in cache-hit without finding a full list
1496 self.token_cache.CredentialType.ACCESS_TOKEN,
1497 target=scopes,
1498 query=query,
1499 ): # This loop is about token search, not about token deletion.
1500 # Note that search() holds a lock during this loop;
1501 # that is fine because this loop is fast
1502 expires_in = int(entry["expires_on"]) - now
1503 if expires_in < 5*60: # Then consider it expired
1504 refresh_reason = msal.telemetry.AT_EXPIRED
1505 continue # Removal is not necessary, it will be overwritten
1506 logger.debug("Cache hit an AT")
1507 access_token_from_cache = { # Mimic a real response
1508 "access_token": entry["secret"],
1509 "token_type": entry.get("token_type", "Bearer"),
1510 "expires_in": int(expires_in), # OAuth2 specs defines it as int
1511 self._TOKEN_SOURCE: self._TOKEN_SOURCE_CACHE,
1512 }
1513 if "refresh_on" in entry:
1514 access_token_from_cache["refresh_on"] = int(entry["refresh_on"])
1515 if int(entry["refresh_on"]) < now: # aging
1516 refresh_reason = msal.telemetry.AT_AGING
1517 break # With a fallback in hand, we break here to go refresh
1518 self._build_telemetry_context(-1).hit_an_access_token()
1519 return access_token_from_cache # It is still good as new
1520 else:
1521 refresh_reason = msal.telemetry.FORCE_REFRESH # TODO: It could also mean claims_challenge
1522 assert refresh_reason, "It should have been established at this point"
1523 if not http_exceptions: # It can be a tuple of exceptions
1524 # The exact HTTP exceptions are transportation-layer dependent
1525 from requests.exceptions import RequestException # Lazy load
1526 http_exceptions = (RequestException,)
1527 try:
1528 data = kwargs.get("data", {})
1529 if account and account.get("authority_type") == _AUTHORITY_TYPE_CLOUDSHELL:
1530 if auth_scheme:
1531 raise ValueError("auth_scheme is not supported in Cloud Shell")
1532 return self._acquire_token_by_cloud_shell(scopes, data=data)
1534 if self._enable_broker and account and account.get("account_source") in (
1535 _GRANT_TYPE_BROKER, # Broker successfully established this account previously.
1536 None, # Unknown data from older MSAL. Broker might still work.
1537 ):
1538 from .broker import _acquire_token_silently
1539 response = _acquire_token_silently(
1540 "https://{}/{}".format(self.authority.instance, self.authority.tenant),
1541 self.client_id,
1542 account["local_account_id"],
1543 scopes,
1544 claims=_merge_claims_challenge_and_capabilities(
1545 self._client_capabilities, claims_challenge),
1546 correlation_id=correlation_id,
1547 auth_scheme=auth_scheme,
1548 **data)
1549 if response: # Broker provides a decisive outcome
1550 account_was_established_by_broker = account.get(
1551 "account_source") == _GRANT_TYPE_BROKER
1552 broker_attempt_succeeded_just_now = "error" not in response
1553 if account_was_established_by_broker or broker_attempt_succeeded_just_now:
1554 return self._process_broker_response(response, scopes, data)
1556 if auth_scheme:
1557 raise ValueError(self._AUTH_SCHEME_UNSUPPORTED)
1558 if account:
1559 result = self._acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family(
1560 authority, self._decorate_scope(scopes), account,
1561 refresh_reason=refresh_reason, claims_challenge=claims_challenge,
1562 correlation_id=correlation_id,
1563 **kwargs)
1564 else: # The caller is acquire_token_for_client()
1565 result = self._acquire_token_for_client(
1566 scopes, refresh_reason, claims_challenge=claims_challenge,
1567 **kwargs)
1568 if result and "access_token" in result:
1569 result[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP
1570 if (result and "error" not in result) or (not access_token_from_cache):
1571 return result
1572 except http_exceptions:
1573 # Typically network error. Potential AAD outage?
1574 if not access_token_from_cache: # It means there is no fall back option
1575 raise # We choose to bubble up the exception
1576 return access_token_from_cache
1578 def _process_broker_response(self, response, scopes, data):
1579 if "error" not in response:
1580 self.token_cache.add(dict(
1581 client_id=self.client_id,
1582 scope=response["scope"].split() if "scope" in response else scopes,
1583 token_endpoint=self.authority.token_endpoint,
1584 response=response,
1585 data=data,
1586 _account_id=response["_account_id"],
1587 environment=self.authority.instance, # Be consistent with non-broker flows
1588 grant_type=_GRANT_TYPE_BROKER, # A pseudo grant type for TokenCache to mark account_source as broker
1589 ))
1590 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_BROKER
1591 return _clean_up(response)
1593 def _acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family(
1594 self, authority, scopes, account, **kwargs):
1595 query = {
1596 "environment": authority.instance,
1597 "home_account_id": (account or {}).get("home_account_id"),
1598 # "realm": authority.tenant, # AAD RTs are tenant-independent
1599 }
1600 app_metadata = self._get_app_metadata(authority.instance)
1601 if not app_metadata: # Meaning this app is now used for the first time.
1602 # When/if we have a way to directly detect current app's family,
1603 # we'll rewrite this block, to support multiple families.
1604 # For now, we try existing RTs (*). If it works, we are in that family.
1605 # (*) RTs of a different app/family are not supposed to be
1606 # shared with or accessible by us in the first place.
1607 at = self._acquire_token_silent_by_finding_specific_refresh_token(
1608 authority, scopes,
1609 dict(query, family_id="1"), # A hack, we have only 1 family for now
1610 rt_remover=lambda rt_item: None, # NO-OP b/c RTs are likely not mine
1611 break_condition=lambda response: # Break loop when app not in family
1612 # Based on an AAD-only behavior mentioned in internal doc here
1613 # https://msazure.visualstudio.com/One/_git/ESTS-Docs/pullrequest/1138595
1614 "client_mismatch" in response.get("error_additional_info", []),
1615 **kwargs)
1616 if at and "error" not in at:
1617 return at
1618 last_resp = None
1619 if app_metadata.get("family_id"): # Meaning this app belongs to this family
1620 last_resp = at = self._acquire_token_silent_by_finding_specific_refresh_token(
1621 authority, scopes, dict(query, family_id=app_metadata["family_id"]),
1622 **kwargs)
1623 if at and "error" not in at:
1624 return at
1625 # Either this app is an orphan, so we will naturally use its own RT;
1626 # or all attempts above have failed, so we fall back to non-foci behavior.
1627 return self._acquire_token_silent_by_finding_specific_refresh_token(
1628 authority, scopes, dict(query, client_id=self.client_id),
1629 **kwargs) or last_resp
1631 def _get_app_metadata(self, environment):
1632 return self.token_cache._get_app_metadata(
1633 environment=environment, client_id=self.client_id, default={})
1635 def _acquire_token_silent_by_finding_specific_refresh_token(
1636 self, authority, scopes, query,
1637 rt_remover=None, break_condition=lambda response: False,
1638 refresh_reason=None, correlation_id=None, claims_challenge=None,
1639 **kwargs):
1640 matches = list(self.token_cache.search( # We want a list to test emptiness
1641 self.token_cache.CredentialType.REFRESH_TOKEN,
1642 # target=scopes, # AAD RTs are scope-independent
1643 query=query))
1644 logger.debug("Found %d RTs matching %s", len(matches), {
1645 k: _pii_less_home_account_id(v) if k == "home_account_id" and v else v
1646 for k, v in query.items()
1647 })
1649 response = None # A distinguishable value to mean cache is empty
1650 if not matches: # Then exit early to avoid expensive operations
1651 return response
1652 client, _ = self._build_client(
1653 # Potentially expensive if building regional client
1654 self.client_credential, authority, skip_regional_client=True)
1655 telemetry_context = self._build_telemetry_context(
1656 self.ACQUIRE_TOKEN_SILENT_ID,
1657 correlation_id=correlation_id, refresh_reason=refresh_reason)
1658 for entry in sorted( # Since unfit RTs would not be aggressively removed,
1659 # we start from newer RTs which are more likely fit.
1660 matches,
1661 key=lambda e: int(e.get("last_modification_time", "0")),
1662 reverse=True):
1663 logger.debug("Cache attempts an RT")
1664 headers = telemetry_context.generate_headers()
1665 if query.get("home_account_id"): # Then use it as CCS Routing info
1666 headers["X-AnchorMailbox"] = "Oid:{}".format( # case-insensitive value
1667 query["home_account_id"].replace(".", "@"))
1668 response = client.obtain_token_by_refresh_token(
1669 entry, rt_getter=lambda token_item: token_item["secret"],
1670 on_removing_rt=lambda rt_item: None, # Disable RT removal,
1671 # because an invalid_grant could be caused by new MFA policy,
1672 # the RT could still be useful for other MFA-less scope or tenant
1673 on_obtaining_tokens=lambda event: self.token_cache.add(dict(
1674 event,
1675 environment=authority.instance,
1676 skip_account_creation=True, # To honor a concurrent remove_account()
1677 )),
1678 scope=scopes,
1679 headers=headers,
1680 data=dict(
1681 kwargs.pop("data", {}),
1682 claims=_merge_claims_challenge_and_capabilities(
1683 self._client_capabilities, claims_challenge)),
1684 **kwargs)
1685 telemetry_context.update_telemetry(response)
1686 if "error" not in response:
1687 return response
1688 logger.debug("Refresh failed. {error}: {error_description}".format(
1689 error=response.get("error"),
1690 error_description=response.get("error_description"),
1691 ))
1692 if break_condition(response):
1693 break
1694 return response # Returns the latest error (if any), or just None
1696 def _validate_ssh_cert_input_data(self, data):
1697 if data.get("token_type") == "ssh-cert":
1698 if not data.get("req_cnf"):
1699 raise ValueError(
1700 "When requesting an SSH certificate, "
1701 "you must include a string parameter named 'req_cnf' "
1702 "containing the public key in JWK format "
1703 "(https://tools.ietf.org/html/rfc7517).")
1704 if not data.get("key_id"):
1705 raise ValueError(
1706 "When requesting an SSH certificate, "
1707 "you must include a string parameter named 'key_id' "
1708 "which identifies the key in the 'req_cnf' argument.")
1710 def acquire_token_by_refresh_token(self, refresh_token, scopes, **kwargs):
1711 """Acquire token(s) based on a refresh token (RT) obtained from elsewhere.
1713 You use this method only when you have old RTs from elsewhere,
1714 and now you want to migrate them into MSAL.
1715 Calling this method results in new tokens automatically storing into MSAL.
1717 You do NOT need to use this method if you are already using MSAL.
1718 MSAL maintains RT automatically inside its token cache,
1719 and an access token can be retrieved
1720 when you call :func:`~acquire_token_silent`.
1722 :param str refresh_token: The old refresh token, as a string.
1724 :param list scopes:
1725 The scopes associate with this old RT.
1726 Each scope needs to be in the Microsoft identity platform (v2) format.
1727 See `Scopes not resources <https://docs.microsoft.com/en-us/azure/active-directory/develop/migrate-python-adal-msal#scopes-not-resources>`_.
1729 :return:
1730 * A dict contains "error" and some other keys, when error happened.
1731 * A dict contains no "error" key means migration was successful.
1732 """
1733 self._validate_ssh_cert_input_data(kwargs.get("data", {}))
1734 telemetry_context = self._build_telemetry_context(
1735 self.ACQUIRE_TOKEN_BY_REFRESH_TOKEN,
1736 refresh_reason=msal.telemetry.FORCE_REFRESH)
1737 response = _clean_up(self.client.obtain_token_by_refresh_token(
1738 refresh_token,
1739 scope=self._decorate_scope(scopes),
1740 headers=telemetry_context.generate_headers(),
1741 rt_getter=lambda rt: rt,
1742 on_updating_rt=False,
1743 on_removing_rt=lambda rt_item: None, # No OP
1744 **kwargs))
1745 if "access_token" in response:
1746 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP
1747 telemetry_context.update_telemetry(response)
1748 return response
1750 def acquire_token_by_username_password(
1751 self, username, password, scopes, claims_challenge=None,
1752 # Note: We shouldn't need to surface enable_msa_passthrough,
1753 # because this ROPC won't work with MSA account anyway.
1754 auth_scheme=None,
1755 **kwargs):
1756 """Gets a token for a given resource via user credentials.
1758 See this page for constraints of Username Password Flow.
1759 https://github.com/AzureAD/microsoft-authentication-library-for-python/wiki/Username-Password-Authentication
1761 :param str username: Typically a UPN in the form of an email address.
1762 :param str password: The password.
1763 :param list[str] scopes:
1764 Scopes requested to access a protected API (a resource).
1765 :param claims_challenge:
1766 The claims_challenge parameter requests specific claims requested by the resource provider
1767 in the form of a claims_challenge directive in the www-authenticate header to be
1768 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
1769 It is a string of a JSON object which contains lists of claims being requested from these locations.
1771 :param object auth_scheme:
1772 You can provide an ``msal.auth_scheme.PopAuthScheme`` object
1773 so that MSAL will get a Proof-of-Possession (POP) token for you.
1775 New in version 1.26.0.
1777 :return: A dict representing the json response from Microsoft Entra:
1779 - A successful response would contain "access_token" key,
1780 - an error response would contain "error" and usually "error_description".
1781 """
1782 claims = _merge_claims_challenge_and_capabilities(
1783 self._client_capabilities, claims_challenge)
1784 if self._enable_broker:
1785 from .broker import _signin_silently
1786 response = _signin_silently(
1787 "https://{}/{}".format(self.authority.instance, self.authority.tenant),
1788 self.client_id,
1789 scopes, # Decorated scopes won't work due to offline_access
1790 MSALRuntime_Username=username,
1791 MSALRuntime_Password=password,
1792 validateAuthority="no" if (
1793 self.authority._is_known_to_developer
1794 or self._instance_discovery is False) else None,
1795 claims=claims,
1796 auth_scheme=auth_scheme,
1797 )
1798 return self._process_broker_response(response, scopes, kwargs.get("data", {}))
1800 if auth_scheme:
1801 raise ValueError(self._AUTH_SCHEME_UNSUPPORTED)
1802 scopes = self._decorate_scope(scopes)
1803 telemetry_context = self._build_telemetry_context(
1804 self.ACQUIRE_TOKEN_BY_USERNAME_PASSWORD_ID)
1805 headers = telemetry_context.generate_headers()
1806 data = dict(kwargs.pop("data", {}), claims=claims)
1807 response = None
1808 if not self.authority.is_adfs:
1809 user_realm_result = self.authority.user_realm_discovery(
1810 username, correlation_id=headers[msal.telemetry.CLIENT_REQUEST_ID])
1811 if user_realm_result.get("account_type") == "Federated":
1812 response = _clean_up(self._acquire_token_by_username_password_federated(
1813 user_realm_result, username, password, scopes=scopes,
1814 data=data,
1815 headers=headers, **kwargs))
1816 if response is None: # Either ADFS or not federated
1817 response = _clean_up(self.client.obtain_token_by_username_password(
1818 username, password, scope=scopes,
1819 headers=headers,
1820 data=data,
1821 **kwargs))
1822 if "access_token" in response:
1823 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP
1824 telemetry_context.update_telemetry(response)
1825 return response
1827 def _acquire_token_by_username_password_federated(
1828 self, user_realm_result, username, password, scopes=None, **kwargs):
1829 wstrust_endpoint = {}
1830 if user_realm_result.get("federation_metadata_url"):
1831 wstrust_endpoint = mex_send_request(
1832 user_realm_result["federation_metadata_url"],
1833 self.http_client)
1834 if wstrust_endpoint is None:
1835 raise ValueError("Unable to find wstrust endpoint from MEX. "
1836 "This typically happens when attempting MSA accounts. "
1837 "More details available here. "
1838 "https://github.com/AzureAD/microsoft-authentication-library-for-python/wiki/Username-Password-Authentication")
1839 logger.debug("wstrust_endpoint = %s", wstrust_endpoint)
1840 wstrust_result = wst_send_request(
1841 username, password,
1842 user_realm_result.get("cloud_audience_urn", "urn:federation:MicrosoftOnline"),
1843 wstrust_endpoint.get("address",
1844 # Fallback to an AAD supplied endpoint
1845 user_realm_result.get("federation_active_auth_url")),
1846 wstrust_endpoint.get("action"), self.http_client)
1847 if not ("token" in wstrust_result and "type" in wstrust_result):
1848 raise RuntimeError("Unsuccessful RSTR. %s" % wstrust_result)
1849 GRANT_TYPE_SAML1_1 = 'urn:ietf:params:oauth:grant-type:saml1_1-bearer'
1850 grant_type = {
1851 SAML_TOKEN_TYPE_V1: GRANT_TYPE_SAML1_1,
1852 SAML_TOKEN_TYPE_V2: self.client.GRANT_TYPE_SAML2,
1853 WSS_SAML_TOKEN_PROFILE_V1_1: GRANT_TYPE_SAML1_1,
1854 WSS_SAML_TOKEN_PROFILE_V2: self.client.GRANT_TYPE_SAML2
1855 }.get(wstrust_result.get("type"))
1856 if not grant_type:
1857 raise RuntimeError(
1858 "RSTR returned unknown token type: %s", wstrust_result.get("type"))
1859 self.client.grant_assertion_encoders.setdefault( # Register a non-standard type
1860 grant_type, self.client.encode_saml_assertion)
1861 return self.client.obtain_token_by_assertion(
1862 wstrust_result["token"], grant_type, scope=scopes,
1863 on_obtaining_tokens=lambda event: self.token_cache.add(dict(
1864 event,
1865 environment=self.authority.instance,
1866 username=username, # Useful in case IDT contains no such info
1867 )),
1868 **kwargs)
1871class PublicClientApplication(ClientApplication): # browser app or mobile app
1873 DEVICE_FLOW_CORRELATION_ID = "_correlation_id"
1874 CONSOLE_WINDOW_HANDLE = object()
1876 def __init__(self, client_id, client_credential=None, **kwargs):
1877 """Same as :func:`ClientApplication.__init__`,
1878 except that ``client_credential`` parameter shall remain ``None``.
1880 .. note::
1882 You may set enable_broker_on_windows to True.
1884 **What is a broker, and why use it?**
1886 A broker is a component installed on your device.
1887 Broker implicitly gives your device an identity. By using a broker,
1888 your device becomes a factor that can satisfy MFA (Multi-factor authentication).
1889 This factor would become mandatory
1890 if a tenant's admin enables a corresponding Conditional Access (CA) policy.
1891 The broker's presence allows Microsoft identity platform
1892 to have higher confidence that the tokens are being issued to your device,
1893 and that is more secure.
1895 An additional benefit of broker is,
1896 it runs as a long-lived process with your device's OS,
1897 and maintains its own cache,
1898 so that your broker-enabled apps (even a CLI)
1899 could automatically SSO from a previously established signed-in session.
1901 **You shall only enable broker when your app:**
1903 1. is running on supported platforms,
1904 and already registered their corresponding redirect_uri
1906 * ``ms-appx-web://Microsoft.AAD.BrokerPlugin/your_client_id``
1907 if your app is expected to run on Windows 10+
1909 2. installed broker dependency,
1910 e.g. ``pip install msal[broker]>=1.25,<2``.
1912 3. tested with ``acquire_token_interactive()`` and ``acquire_token_silent()``.
1914 **The fallback behaviors of MSAL Python's broker support**
1916 MSAL will either error out, or silently fallback to non-broker flows.
1918 1. MSAL will ignore the `enable_broker_...` and bypass broker
1919 on those auth flows that are known to be NOT supported by broker.
1920 This includes ADFS, B2C, etc..
1921 For other "could-use-broker" scenarios, please see below.
1922 2. MSAL errors out when app developer opted-in to use broker
1923 but a direct dependency "mid-tier" package is not installed.
1924 Error message guides app developer to declare the correct dependency
1925 ``msal[broker]``.
1926 We error out here because the error is actionable to app developers.
1927 3. MSAL silently "deactivates" the broker and fallback to non-broker,
1928 when opted-in, dependency installed yet failed to initialize.
1929 We anticipate this would happen on a device whose OS is too old
1930 or the underlying broker component is somehow unavailable.
1931 There is not much an app developer or the end user can do here.
1932 Eventually, the conditional access policy shall
1933 force the user to switch to a different device.
1934 4. MSAL errors out when broker is opted in, installed, initialized,
1935 but subsequent token request(s) failed.
1937 :param boolean enable_broker_on_windows:
1938 This setting is only effective if your app is running on Windows 10+.
1939 This parameter defaults to None, which means MSAL will not utilize a broker.
1941 New in MSAL Python 1.25.0.
1942 """
1943 if client_credential is not None:
1944 raise ValueError("Public Client should not possess credentials")
1945 # Using kwargs notation for now. We will switch to keyword-only arguments.
1946 enable_broker_on_windows = kwargs.pop("enable_broker_on_windows", False)
1947 self._enable_broker = enable_broker_on_windows and sys.platform == "win32"
1948 super(PublicClientApplication, self).__init__(
1949 client_id, client_credential=None, **kwargs)
1951 def acquire_token_interactive(
1952 self,
1953 scopes, # type: list[str]
1954 prompt=None,
1955 login_hint=None, # type: Optional[str]
1956 domain_hint=None, # type: Optional[str]
1957 claims_challenge=None,
1958 timeout=None,
1959 port=None,
1960 extra_scopes_to_consent=None,
1961 max_age=None,
1962 parent_window_handle=None,
1963 on_before_launching_ui=None,
1964 auth_scheme=None,
1965 **kwargs):
1966 """Acquire token interactively i.e. via a local browser.
1968 Prerequisite: In Azure Portal, configure the Redirect URI of your
1969 "Mobile and Desktop application" as ``http://localhost``.
1970 If you opts in to use broker during ``PublicClientApplication`` creation,
1971 your app also need this Redirect URI:
1972 ``ms-appx-web://Microsoft.AAD.BrokerPlugin/YOUR_CLIENT_ID``
1974 :param list scopes:
1975 It is a list of case-sensitive strings.
1976 :param str prompt:
1977 By default, no prompt value will be sent, not even string ``"none"``.
1978 You will have to specify a value explicitly.
1979 Its valid values are the constants defined in
1980 :class:`Prompt <msal.Prompt>`.
1981 :param str login_hint:
1982 Optional. Identifier of the user. Generally a User Principal Name (UPN).
1983 :param domain_hint:
1984 Can be one of "consumers" or "organizations" or your tenant domain "contoso.com".
1985 If included, it will skip the email-based discovery process that user goes
1986 through on the sign-in page, leading to a slightly more streamlined user experience.
1987 More information on possible values available in
1988 `Auth Code Flow doc <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-auth-code-flow#request-an-authorization-code>`_ and
1989 `domain_hint doc <https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-oapx/86fb452d-e34a-494e-ac61-e526e263b6d8>`_.
1991 :param claims_challenge:
1992 The claims_challenge parameter requests specific claims requested by the resource provider
1993 in the form of a claims_challenge directive in the www-authenticate header to be
1994 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
1995 It is a string of a JSON object which contains lists of claims being requested from these locations.
1997 :param int timeout:
1998 This method will block the current thread.
1999 This parameter specifies the timeout value in seconds.
2000 Default value ``None`` means wait indefinitely.
2002 :param int port:
2003 The port to be used to listen to an incoming auth response.
2004 By default we will use a system-allocated port.
2005 (The rest of the redirect_uri is hard coded as ``http://localhost``.)
2007 :param list extra_scopes_to_consent:
2008 "Extra scopes to consent" is a concept only available in Microsoft Entra.
2009 It refers to other resources you might want to prompt to consent for,
2010 in the same interaction, but for which you won't get back a
2011 token for in this particular operation.
2013 :param int max_age:
2014 OPTIONAL. Maximum Authentication Age.
2015 Specifies the allowable elapsed time in seconds
2016 since the last time the End-User was actively authenticated.
2017 If the elapsed time is greater than this value,
2018 Microsoft identity platform will actively re-authenticate the End-User.
2020 MSAL Python will also automatically validate the auth_time in ID token.
2022 New in version 1.15.
2024 :param int parent_window_handle:
2025 Required if your app is running on Windows and opted in to use broker.
2027 If your app is a GUI app,
2028 you are recommended to also provide its window handle,
2029 so that the sign in UI window will properly pop up on top of your window.
2031 If your app is a console app (most Python scripts are console apps),
2032 you can use a placeholder value ``msal.PublicClientApplication.CONSOLE_WINDOW_HANDLE``.
2034 New in version 1.20.0.
2036 :param function on_before_launching_ui:
2037 A callback with the form of
2038 ``lambda ui="xyz", **kwargs: print("A {} will be launched".format(ui))``,
2039 where ``ui`` will be either "browser" or "broker".
2040 You can use it to inform your end user to expect a pop-up window.
2042 New in version 1.20.0.
2044 :param object auth_scheme:
2045 You can provide an ``msal.auth_scheme.PopAuthScheme`` object
2046 so that MSAL will get a Proof-of-Possession (POP) token for you.
2048 New in version 1.26.0.
2050 :return:
2051 - A dict containing no "error" key,
2052 and typically contains an "access_token" key.
2053 - A dict containing an "error" key, when token refresh failed.
2054 """
2055 data = kwargs.pop("data", {})
2056 enable_msa_passthrough = kwargs.pop( # MUST remove it from kwargs
2057 "enable_msa_passthrough", # Keep it as a hidden param, for now.
2058 # OPTIONAL. MSA-Passthrough is a legacy configuration,
2059 # needed by a small amount of Microsoft first-party apps,
2060 # which would login MSA accounts via ".../organizations" authority.
2061 # If you app belongs to this category, AND you are enabling broker,
2062 # you would want to enable this flag. Default value is False.
2063 # More background of MSA-PT is available from this internal docs:
2064 # https://microsoft.sharepoint.com/:w:/t/Identity-DevEx/EatIUauX3c9Ctw1l7AQ6iM8B5CeBZxc58eoQCE0IuZ0VFw?e=tgc3jP&CID=39c853be-76ea-79d7-ee73-f1b2706ede05
2065 False
2066 ) and data.get("token_type") != "ssh-cert" # Work around a known issue as of PyMsalRuntime 0.8
2067 self._validate_ssh_cert_input_data(data)
2068 if not on_before_launching_ui:
2069 on_before_launching_ui = lambda **kwargs: None
2070 if _is_running_in_cloud_shell() and prompt == "none":
2071 # Note: _acquire_token_by_cloud_shell() is always silent,
2072 # so we would not fire on_before_launching_ui()
2073 return self._acquire_token_by_cloud_shell(scopes, data=data)
2074 claims = _merge_claims_challenge_and_capabilities(
2075 self._client_capabilities, claims_challenge)
2076 if self._enable_broker:
2077 if parent_window_handle is None:
2078 raise ValueError(
2079 "parent_window_handle is required when you opted into using broker. "
2080 "You need to provide the window handle of your GUI application, "
2081 "or use msal.PublicClientApplication.CONSOLE_WINDOW_HANDLE "
2082 "when and only when your application is a console app.")
2083 if extra_scopes_to_consent:
2084 logger.warning(
2085 "Ignoring parameter extra_scopes_to_consent, "
2086 "which is not supported by broker")
2087 response = self._acquire_token_interactive_via_broker(
2088 scopes,
2089 parent_window_handle,
2090 enable_msa_passthrough,
2091 claims,
2092 data,
2093 on_before_launching_ui,
2094 auth_scheme,
2095 prompt=prompt,
2096 login_hint=login_hint,
2097 max_age=max_age,
2098 )
2099 return self._process_broker_response(response, scopes, data)
2101 if auth_scheme:
2102 raise ValueError(self._AUTH_SCHEME_UNSUPPORTED)
2103 on_before_launching_ui(ui="browser")
2104 telemetry_context = self._build_telemetry_context(
2105 self.ACQUIRE_TOKEN_INTERACTIVE)
2106 response = _clean_up(self.client.obtain_token_by_browser(
2107 scope=self._decorate_scope(scopes) if scopes else None,
2108 extra_scope_to_consent=extra_scopes_to_consent,
2109 redirect_uri="http://localhost:{port}".format(
2110 # Hardcode the host, for now. AAD portal rejects 127.0.0.1 anyway
2111 port=port or 0),
2112 prompt=prompt,
2113 login_hint=login_hint,
2114 max_age=max_age,
2115 timeout=timeout,
2116 auth_params={
2117 "claims": claims,
2118 "domain_hint": domain_hint,
2119 },
2120 data=dict(data, claims=claims),
2121 headers=telemetry_context.generate_headers(),
2122 browser_name=_preferred_browser(),
2123 **kwargs))
2124 if "access_token" in response:
2125 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP
2126 telemetry_context.update_telemetry(response)
2127 return response
2129 def _acquire_token_interactive_via_broker(
2130 self,
2131 scopes, # type: list[str]
2132 parent_window_handle, # type: int
2133 enable_msa_passthrough, # type: boolean
2134 claims, # type: str
2135 data, # type: dict
2136 on_before_launching_ui, # type: callable
2137 auth_scheme, # type: object
2138 prompt=None,
2139 login_hint=None, # type: Optional[str]
2140 max_age=None,
2141 **kwargs):
2142 from .broker import _signin_interactively, _signin_silently, _acquire_token_silently
2143 if "welcome_template" in kwargs:
2144 logger.debug(kwargs["welcome_template"]) # Experimental
2145 authority = "https://{}/{}".format(
2146 self.authority.instance, self.authority.tenant)
2147 validate_authority = "no" if (
2148 self.authority._is_known_to_developer
2149 or self._instance_discovery is False) else None
2150 # Calls different broker methods to mimic the OIDC behaviors
2151 if login_hint and prompt != "select_account": # OIDC prompts when the user did not sign in
2152 accounts = self.get_accounts(username=login_hint)
2153 if len(accounts) == 1: # Unambiguously proceed with this account
2154 logger.debug("Calling broker._acquire_token_silently()")
2155 response = _acquire_token_silently( # When it works, it bypasses prompt
2156 authority,
2157 self.client_id,
2158 accounts[0]["local_account_id"],
2159 scopes,
2160 claims=claims,
2161 auth_scheme=auth_scheme,
2162 **data)
2163 if response and "error" not in response:
2164 return response
2165 # login_hint undecisive or not exists
2166 if prompt == "none" or not prompt: # Must/Can attempt _signin_silently()
2167 logger.debug("Calling broker._signin_silently()")
2168 response = _signin_silently( # Unlike OIDC, it doesn't honor login_hint
2169 authority, self.client_id, scopes,
2170 validateAuthority=validate_authority,
2171 claims=claims,
2172 max_age=max_age,
2173 enable_msa_pt=enable_msa_passthrough,
2174 auth_scheme=auth_scheme,
2175 **data)
2176 is_wrong_account = bool(
2177 # _signin_silently() only gets tokens for default account,
2178 # but this seems to have been fixed in PyMsalRuntime 0.11.2
2179 "access_token" in response and login_hint
2180 and response.get("id_token_claims", {}) != login_hint)
2181 wrong_account_error_message = (
2182 'prompt="none" will not work for login_hint="non-default-user"')
2183 if is_wrong_account:
2184 logger.debug(wrong_account_error_message)
2185 if prompt == "none":
2186 return response if not is_wrong_account else {
2187 "error": "broker_error",
2188 "error_description": wrong_account_error_message,
2189 }
2190 else:
2191 assert bool(prompt) is False
2192 from pymsalruntime import Response_Status
2193 recoverable_errors = frozenset([
2194 Response_Status.Status_AccountUnusable,
2195 Response_Status.Status_InteractionRequired,
2196 ])
2197 if is_wrong_account or "error" in response and response.get(
2198 "_broker_status") in recoverable_errors:
2199 pass # It will fall back to the _signin_interactively()
2200 else:
2201 return response
2203 logger.debug("Falls back to broker._signin_interactively()")
2204 on_before_launching_ui(ui="broker")
2205 return _signin_interactively(
2206 authority, self.client_id, scopes,
2207 None if parent_window_handle is self.CONSOLE_WINDOW_HANDLE
2208 else parent_window_handle,
2209 validateAuthority=validate_authority,
2210 login_hint=login_hint,
2211 prompt=prompt,
2212 claims=claims,
2213 max_age=max_age,
2214 enable_msa_pt=enable_msa_passthrough,
2215 auth_scheme=auth_scheme,
2216 **data)
2218 def initiate_device_flow(self, scopes=None, **kwargs):
2219 """Initiate a Device Flow instance,
2220 which will be used in :func:`~acquire_token_by_device_flow`.
2222 :param list[str] scopes:
2223 Scopes requested to access a protected API (a resource).
2224 :return: A dict representing a newly created Device Flow object.
2226 - A successful response would contain "user_code" key, among others
2227 - an error response would contain some other readable key/value pairs.
2228 """
2229 correlation_id = msal.telemetry._get_new_correlation_id()
2230 flow = self.client.initiate_device_flow(
2231 scope=self._decorate_scope(scopes or []),
2232 headers={msal.telemetry.CLIENT_REQUEST_ID: correlation_id},
2233 **kwargs)
2234 flow[self.DEVICE_FLOW_CORRELATION_ID] = correlation_id
2235 return flow
2237 def acquire_token_by_device_flow(self, flow, claims_challenge=None, **kwargs):
2238 """Obtain token by a device flow object, with customizable polling effect.
2240 :param dict flow:
2241 A dict previously generated by :func:`~initiate_device_flow`.
2242 By default, this method's polling effect will block current thread.
2243 You can abort the polling loop at any time,
2244 by changing the value of the flow's "expires_at" key to 0.
2245 :param claims_challenge:
2246 The claims_challenge parameter requests specific claims requested by the resource provider
2247 in the form of a claims_challenge directive in the www-authenticate header to be
2248 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
2249 It is a string of a JSON object which contains lists of claims being requested from these locations.
2251 :return: A dict representing the json response from Microsoft Entra:
2253 - A successful response would contain "access_token" key,
2254 - an error response would contain "error" and usually "error_description".
2255 """
2256 telemetry_context = self._build_telemetry_context(
2257 self.ACQUIRE_TOKEN_BY_DEVICE_FLOW_ID,
2258 correlation_id=flow.get(self.DEVICE_FLOW_CORRELATION_ID))
2259 response = _clean_up(self.client.obtain_token_by_device_flow(
2260 flow,
2261 data=dict(
2262 kwargs.pop("data", {}),
2263 code=flow["device_code"], # 2018-10-4 Hack:
2264 # during transition period,
2265 # service seemingly need both device_code and code parameter.
2266 claims=_merge_claims_challenge_and_capabilities(
2267 self._client_capabilities, claims_challenge),
2268 ),
2269 headers=telemetry_context.generate_headers(),
2270 **kwargs))
2271 if "access_token" in response:
2272 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP
2273 telemetry_context.update_telemetry(response)
2274 return response
2277class ConfidentialClientApplication(ClientApplication): # server-side web app
2278 """Same as :func:`ClientApplication.__init__`,
2279 except that ``allow_broker`` parameter shall remain ``None``.
2280 """
2282 def acquire_token_for_client(self, scopes, claims_challenge=None, **kwargs):
2283 """Acquires token for the current confidential client, not for an end user.
2285 Since MSAL Python 1.23, it will automatically look for token from cache,
2286 and only send request to Identity Provider when cache misses.
2288 :param list[str] scopes: (Required)
2289 Scopes requested to access a protected API (a resource).
2290 :param claims_challenge:
2291 The claims_challenge parameter requests specific claims requested by the resource provider
2292 in the form of a claims_challenge directive in the www-authenticate header to be
2293 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
2294 It is a string of a JSON object which contains lists of claims being requested from these locations.
2296 :return: A dict representing the json response from Microsoft Entra:
2298 - A successful response would contain "access_token" key,
2299 - an error response would contain "error" and usually "error_description".
2300 """
2301 if kwargs.get("force_refresh"):
2302 raise ValueError( # We choose to disallow force_refresh
2303 "Historically, this method does not support force_refresh behavior. "
2304 )
2305 return _clean_up(self._acquire_token_silent_with_error(
2306 scopes, None, claims_challenge=claims_challenge, **kwargs))
2308 def _acquire_token_for_client(
2309 self,
2310 scopes,
2311 refresh_reason,
2312 claims_challenge=None,
2313 **kwargs
2314 ):
2315 if self.authority.tenant.lower() in ["common", "organizations"]:
2316 warnings.warn(
2317 "Using /common or /organizations authority "
2318 "in acquire_token_for_client() is unreliable. "
2319 "Please use a specific tenant instead.", DeprecationWarning)
2320 self._validate_ssh_cert_input_data(kwargs.get("data", {}))
2321 telemetry_context = self._build_telemetry_context(
2322 self.ACQUIRE_TOKEN_FOR_CLIENT_ID, refresh_reason=refresh_reason)
2323 client = self._regional_client or self.client
2324 response = client.obtain_token_for_client(
2325 scope=scopes, # This grant flow requires no scope decoration
2326 headers=telemetry_context.generate_headers(),
2327 data=dict(
2328 kwargs.pop("data", {}),
2329 claims=_merge_claims_challenge_and_capabilities(
2330 self._client_capabilities, claims_challenge)),
2331 **kwargs)
2332 telemetry_context.update_telemetry(response)
2333 return response
2335 def remove_tokens_for_client(self):
2336 """Remove all tokens that were previously acquired via
2337 :func:`~acquire_token_for_client()` for the current client."""
2338 for env in [self.authority.instance] + self._get_authority_aliases(
2339 self.authority.instance):
2340 for at in list(self.token_cache.search( # Remove ATs from a snapshot
2341 TokenCache.CredentialType.ACCESS_TOKEN, query={
2342 "client_id": self.client_id,
2343 "environment": env,
2344 "home_account_id": None, # These are mostly app-only tokens
2345 })):
2346 self.token_cache.remove_at(at)
2347 # acquire_token_for_client() obtains no RTs, so we have no RT to remove
2349 def acquire_token_on_behalf_of(self, user_assertion, scopes, claims_challenge=None, **kwargs):
2350 """Acquires token using on-behalf-of (OBO) flow.
2352 The current app is a middle-tier service which was called with a token
2353 representing an end user.
2354 The current app can use such token (a.k.a. a user assertion) to request
2355 another token to access downstream web API, on behalf of that user.
2356 See `detail docs here <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-on-behalf-of-flow>`_ .
2358 The current middle-tier app has no user interaction to obtain consent.
2359 See how to gain consent upfront for your middle-tier app from this article.
2360 https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-on-behalf-of-flow#gaining-consent-for-the-middle-tier-application
2362 :param str user_assertion: The incoming token already received by this app
2363 :param list[str] scopes: Scopes required by downstream API (a resource).
2364 :param claims_challenge:
2365 The claims_challenge parameter requests specific claims requested by the resource provider
2366 in the form of a claims_challenge directive in the www-authenticate header to be
2367 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
2368 It is a string of a JSON object which contains lists of claims being requested from these locations.
2370 :return: A dict representing the json response from Microsoft Entra:
2372 - A successful response would contain "access_token" key,
2373 - an error response would contain "error" and usually "error_description".
2374 """
2375 telemetry_context = self._build_telemetry_context(
2376 self.ACQUIRE_TOKEN_ON_BEHALF_OF_ID)
2377 # The implementation is NOT based on Token Exchange (RFC 8693)
2378 response = _clean_up(self.client.obtain_token_by_assertion( # bases on assertion RFC 7521
2379 user_assertion,
2380 self.client.GRANT_TYPE_JWT, # IDTs and AAD ATs are all JWTs
2381 scope=self._decorate_scope(scopes), # Decoration is used for:
2382 # 1. Explicitly requesting an RT, without relying on AAD default
2383 # behavior, even though it currently still issues an RT.
2384 # 2. Requesting an IDT (which would otherwise be unavailable)
2385 # so that the calling app could use id_token_claims to implement
2386 # their own cache mapping, which is likely needed in web apps.
2387 data=dict(
2388 kwargs.pop("data", {}),
2389 requested_token_use="on_behalf_of",
2390 claims=_merge_claims_challenge_and_capabilities(
2391 self._client_capabilities, claims_challenge)),
2392 headers=telemetry_context.generate_headers(),
2393 # TBD: Expose a login_hint (or ccs_routing_hint) param for web app
2394 **kwargs))
2395 if "access_token" in response:
2396 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP
2397 telemetry_context.update_telemetry(response)
2398 return response