Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/msal/application.py: 16%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import functools
2import json
3import time
4import logging
5import sys
6import warnings
7from threading import Lock
8from typing import Optional # Needed in Python 3.7 & 3.8
9from urllib.parse import urlparse
10import os
12from .oauth2cli import Client, JwtAssertionCreator
13from .oauth2cli.oidc import decode_part
14from .authority import (
15 Authority,
16 WORLD_WIDE,
17 _get_instance_discovery_endpoint,
18 _get_instance_discovery_host,
19)
20from .mex import send_request as mex_send_request
21from .wstrust_request import send_request as wst_send_request
22from .wstrust_response import *
23from .token_cache import TokenCache, _get_username, _GRANT_TYPE_BROKER, _compute_ext_cache_key
24import msal.telemetry
25from .region import _detect_region
26from .throttled_http_client import ThrottledHttpClient
27from .cloudshell import _is_running_in_cloud_shell
28from .sku import SKU, __version__
29from .oauth2cli.authcode import is_wsl
32logger = logging.getLogger(__name__)
33_AUTHORITY_TYPE_CLOUDSHELL = "CLOUDSHELL"
35def _init_broker(enable_pii_log): # Make it a function to allow mocking
36 from . import broker # Trigger Broker's initialization, lazily
37 if enable_pii_log:
38 broker._enable_pii_log()
40def extract_certs(public_cert_content):
41 # Parses raw public certificate file contents and returns a list of strings
42 # Usage: headers = {"x5c": extract_certs(open("my_cert.pem").read())}
43 public_certificates = re.findall(
44 r'-----BEGIN CERTIFICATE-----(?P<cert_value>[^-]+)-----END CERTIFICATE-----',
45 public_cert_content, re.I)
46 if public_certificates:
47 return [cert.strip() for cert in public_certificates]
48 # The public cert tags are not found in the input,
49 # let's make best effort to exclude a private key pem file.
50 if "PRIVATE KEY" in public_cert_content:
51 raise ValueError(
52 "We expect your public key but detect a private key instead")
53 return [public_cert_content.strip()]
56def _merge_claims_challenge_and_capabilities(capabilities, claims_challenge):
57 # Represent capabilities as {"access_token": {"xms_cc": {"values": capabilities}}}
58 # and then merge/add it into incoming claims
59 if not capabilities:
60 return claims_challenge
61 claims_dict = json.loads(claims_challenge) if claims_challenge else {}
62 for key in ["access_token"]: # We could add "id_token" if we'd decide to
63 claims_dict.setdefault(key, {}).update(xms_cc={"values": capabilities})
64 return json.dumps(claims_dict)
67def _str2bytes(raw):
68 # A conversion based on duck-typing rather than six.text_type
69 try:
70 return raw.encode(encoding="utf-8")
71 except:
72 return raw
74def _extract_cert_and_thumbprints(cert):
75 # Cert concepts https://security.stackexchange.com/a/226758/125264
76 from cryptography.hazmat.primitives import hashes, serialization
77 cert_pem = cert.public_bytes( # Requires cryptography 1.0+
78 encoding=serialization.Encoding.PEM).decode()
79 x5c = [
80 '\n'.join(
81 cert_pem.splitlines()
82 [1:-1] # Strip the "--- header ---" and "--- footer ---"
83 )
84 ]
85 # https://cryptography.io/en/latest/x509/reference/#x-509-certificate-object - Requires cryptography 0.7+
86 sha256_thumbprint = cert.fingerprint(hashes.SHA256()).hex()
87 sha1_thumbprint = cert.fingerprint(hashes.SHA1()).hex() # CodeQL [SM02167] for legacy support such as ADFS
88 return sha256_thumbprint, sha1_thumbprint, x5c
90def _parse_pfx(pfx_path, passphrase_bytes):
91 # Cert concepts https://security.stackexchange.com/a/226758/125264
92 from cryptography.hazmat.primitives.serialization import pkcs12
93 with open(pfx_path, 'rb') as f:
94 private_key, cert, _ = pkcs12.load_key_and_certificates( # cryptography 2.5+
95 # https://cryptography.io/en/latest/hazmat/primitives/asymmetric/serialization/#cryptography.hazmat.primitives.serialization.pkcs12.load_key_and_certificates
96 f.read(), passphrase_bytes)
97 if not (private_key and cert):
98 raise ValueError("Your PFX file shall contain both private key and cert")
99 sha256_thumbprint, sha1_thumbprint, x5c = _extract_cert_and_thumbprints(cert)
100 return private_key, sha256_thumbprint, sha1_thumbprint, x5c
103def _load_private_key_from_pem_str(private_key_pem_str, passphrase_bytes):
104 from cryptography.hazmat.primitives import serialization
105 from cryptography.hazmat.backends import default_backend
106 return serialization.load_pem_private_key( # cryptography 0.6+
107 _str2bytes(private_key_pem_str),
108 passphrase_bytes,
109 backend=default_backend(), # It was a required param until 2020
110 )
113def _pii_less_home_account_id(home_account_id):
114 parts = home_account_id.split(".") # It could contain one or two parts
115 parts[0] = "********"
116 return ".".join(parts)
119def _clean_up(result):
120 if isinstance(result, dict):
121 if "_msalruntime_telemetry" in result or "_msal_python_telemetry" in result:
122 result["msal_telemetry"] = json.dumps({ # Telemetry as an opaque string
123 "msalruntime_telemetry": result.get("_msalruntime_telemetry"),
124 "msal_python_telemetry": result.get("_msal_python_telemetry"),
125 }, separators=(",", ":"))
126 return_value = {
127 k: result[k] for k in result
128 if k != "refresh_in" # MSAL handled refresh_in, customers need not
129 and not k.startswith('_') # Skim internal properties
130 }
131 if "refresh_in" in result: # To encourage proactive refresh
132 return_value["refresh_on"] = int(time.time() + result["refresh_in"])
133 return return_value
134 return result # It could be None
137def _preferred_browser():
138 """Register Edge and return a name suitable for subsequent webbrowser.get(...)
139 when appropriate. Otherwise return None.
140 """
141 # On Linux, only Edge will provide device-based Conditional Access support
142 if sys.platform != "linux": # On other platforms, we have no browser preference
143 return None
144 browser_path = "/usr/bin/microsoft-edge" # Use a full path owned by sys admin
145 # Note: /usr/bin/microsoft-edge, /usr/bin/microsoft-edge-stable, etc.
146 # are symlinks that point to the actual binaries which are found under
147 # /opt/microsoft/msedge/msedge or /opt/microsoft/msedge-beta/msedge.
148 # Either method can be used to detect an Edge installation.
149 user_has_no_preference = "BROWSER" not in os.environ
150 user_wont_mind_edge = "microsoft-edge" in os.environ.get("BROWSER", "") # Note:
151 # BROWSER could contain "microsoft-edge" or "/path/to/microsoft-edge".
152 # Python documentation (https://docs.python.org/3/library/webbrowser.html)
153 # does not document the name being implicitly register,
154 # so there is no public API to know whether the ENV VAR browser would work.
155 # Therefore, we would not bother examine the env var browser's type.
156 # We would just register our own Edge instance.
157 if (user_has_no_preference or user_wont_mind_edge) and os.path.exists(browser_path):
158 try:
159 import webbrowser # Lazy import. Some distro may not have this.
160 browser_name = "msal-edge" # Avoid popular name "microsoft-edge"
161 # otherwise `BROWSER="microsoft-edge"; webbrowser.get("microsoft-edge")`
162 # would return a GenericBrowser instance which won't work.
163 try:
164 registration_available = isinstance(
165 webbrowser.get(browser_name), webbrowser.BackgroundBrowser)
166 except webbrowser.Error:
167 registration_available = False
168 if not registration_available:
169 logger.debug("Register %s with %s", browser_name, browser_path)
170 # By registering our own browser instance with our own name,
171 # rather than populating a process-wide BROWSER enn var,
172 # this approach does not have side effect on non-MSAL code path.
173 webbrowser.register( # Even double-register happens to work fine
174 browser_name, None, webbrowser.BackgroundBrowser(browser_path))
175 return browser_name
176 except ImportError:
177 pass # We may still proceed
178 return None
180def _is_ssh_cert_or_pop_request(token_type, auth_scheme) -> bool:
181 return token_type == "ssh-cert" or token_type == "pop" or isinstance(auth_scheme, msal.auth_scheme.PopAuthScheme)
183class _ClientWithCcsRoutingInfo(Client):
185 def initiate_auth_code_flow(self, **kwargs):
186 if kwargs.get("login_hint"): # eSTS could have utilized this as-is, but nope
187 kwargs["X-AnchorMailbox"] = "UPN:%s" % kwargs["login_hint"]
188 return super(_ClientWithCcsRoutingInfo, self).initiate_auth_code_flow(
189 client_info=1, # To be used as CSS Routing info
190 **kwargs)
192 def obtain_token_by_auth_code_flow(
193 self, auth_code_flow, auth_response, **kwargs):
194 # Note: the obtain_token_by_browser() is also covered by this
195 assert isinstance(auth_code_flow, dict) and isinstance(auth_response, dict)
196 headers = kwargs.pop("headers", {})
197 client_info = json.loads(
198 decode_part(auth_response["client_info"])
199 ) if auth_response.get("client_info") else {}
200 if "uid" in client_info and "utid" in client_info:
201 # Note: The value of X-AnchorMailbox is also case-insensitive
202 headers["X-AnchorMailbox"] = "Oid:{uid}@{utid}".format(**client_info)
203 return super(_ClientWithCcsRoutingInfo, self).obtain_token_by_auth_code_flow(
204 auth_code_flow, auth_response, headers=headers, **kwargs)
206 def obtain_token_by_username_password(self, username, password, **kwargs):
207 headers = kwargs.pop("headers", {})
208 headers["X-AnchorMailbox"] = "upn:{}".format(username)
209 return super(_ClientWithCcsRoutingInfo, self).obtain_token_by_username_password(
210 username, password, headers=headers, **kwargs)
213def _msal_extension_check():
214 # Can't run this in module or class level otherwise you'll get circular import error
215 try:
216 from msal_extensions import __version__ as v
217 major, minor, _ = v.split(".", maxsplit=3)
218 if not (int(major) >= 1 and int(minor) >= 2):
219 warnings.warn(
220 "Please upgrade msal-extensions. "
221 "Only msal-extensions 1.2+ can work with msal 1.30+")
222 except ImportError:
223 pass # The optional msal_extensions is not installed. Business as usual.
224 except ValueError:
225 logger.exception(f"msal_extensions version {v} not in major.minor.patch format")
226 except:
227 logger.exception(
228 "Unable to import msal_extensions during an optional check. "
229 "This exception can be safely ignored."
230 )
233class ClientApplication(object):
234 """You do not usually directly use this class. Use its subclasses instead:
235 :class:`PublicClientApplication` and :class:`ConfidentialClientApplication`.
236 """
237 ACQUIRE_TOKEN_SILENT_ID = "84"
238 ACQUIRE_TOKEN_BY_REFRESH_TOKEN = "85"
239 ACQUIRE_TOKEN_BY_USERNAME_PASSWORD_ID = "301"
240 ACQUIRE_TOKEN_ON_BEHALF_OF_ID = "523"
241 ACQUIRE_TOKEN_BY_DEVICE_FLOW_ID = "622"
242 ACQUIRE_TOKEN_FOR_CLIENT_ID = "730"
243 ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID = "832"
244 ACQUIRE_TOKEN_INTERACTIVE = "169"
245 ACQUIRE_TOKEN_BY_USER_FIC_ID = "950"
246 GET_ACCOUNTS_ID = "902"
247 REMOVE_ACCOUNT_ID = "903"
249 ATTEMPT_REGION_DISCOVERY = True # "TryAutoDetect"
250 DISABLE_MSAL_FORCE_REGION = False # Used in azure_region to disable MSAL_FORCE_REGION behavior
251 _TOKEN_SOURCE = "token_source"
252 _TOKEN_SOURCE_IDP = "identity_provider"
253 _TOKEN_SOURCE_CACHE = "cache"
254 _TOKEN_SOURCE_BROKER = "broker"
256 _enable_broker = False
257 _AUTH_SCHEME_UNSUPPORTED = (
258 "auth_scheme is currently only available from broker. "
259 "You can enable broker by following these instructions. "
260 "https://msal-python.readthedocs.io/en/latest/#publicclientapplication")
262 def __init__(
263 self, client_id,
264 client_credential=None, authority=None, validate_authority=True,
265 token_cache=None,
266 http_client=None,
267 verify=True, proxies=None, timeout=None,
268 client_claims=None, app_name=None, app_version=None,
269 client_capabilities=None,
270 azure_region=None, # Note: We choose to add this param in this base class,
271 # despite it is currently only needed by ConfidentialClientApplication.
272 # This way, it holds the same positional param place for PCA,
273 # when we would eventually want to add this feature to PCA in future.
274 exclude_scopes=None,
275 http_cache=None,
276 instance_discovery=None,
277 allow_broker=None,
278 enable_pii_log=None,
279 oidc_authority=None,
280 ):
281 """Create an instance of application.
283 :param str client_id: Your app has a client_id after you register it on Microsoft Entra admin center.
285 :param client_credential:
286 For :class:`PublicClientApplication`, you use `None` here.
288 For :class:`ConfidentialClientApplication`,
289 it supports many different input formats for different scenarios.
291 .. admonition:: Support using a client secret.
293 Just feed in a string, such as ``"your client secret"``.
295 .. admonition:: Support using a certificate in X.509 (.pem) format
297 Deprecated because it uses SHA-1 thumbprint,
298 unless you are still using ADFS which supports SHA-1 thumbprint only.
299 Please use the .pfx option documented later in this page.
301 Feed in a dict in this form::
303 {
304 "private_key": "...-----BEGIN PRIVATE KEY-----... in PEM format",
305 "thumbprint": "An SHA-1 thumbprint such as A1B2C3D4E5F6..."
306 "Changed in version 1.35.0, if thumbprint is absent"
307 "and a public_certificate is present, MSAL will"
308 "automatically calculate an SHA-256 thumbprint instead.",
309 "passphrase": "Needed if the private_key is encrypted (Added in version 1.6.0)",
310 "public_certificate": "...-----BEGIN CERTIFICATE-----...", # Needed if you use Subject Name/Issuer auth. Added in version 0.5.0.
311 }
313 MSAL Python requires a "private_key" in PEM format.
314 If your cert is in PKCS12 (.pfx) format,
315 you can convert it to X.509 (.pem) format,
316 by ``openssl pkcs12 -in file.pfx -out file.pem -nodes``.
318 The thumbprint is available in your app's registration in Azure Portal.
319 Alternatively, you can `calculate the thumbprint <https://github.com/Azure/azure-sdk-for-python/blob/07d10639d7e47f4852eaeb74aef5d569db499d6e/sdk/identity/azure-identity/azure/identity/_credentials/certificate.py#L94-L97>`_.
321 ``public_certificate`` (optional) is public key certificate
322 which will be sent through 'x5c' JWT header.
323 This is useful when you use `Subject Name/Issuer Authentication
324 <https://github.com/AzureAD/microsoft-authentication-library-for-python/issues/60>`_
325 which is an approach to allow easier certificate rotation.
326 Per `specs <https://tools.ietf.org/html/rfc7515#section-4.1.6>`_,
327 "the certificate containing
328 the public key corresponding to the key used to digitally sign the
329 JWS MUST be the first certificate. This MAY be followed by
330 additional certificates, with each subsequent certificate being the
331 one used to certify the previous one."
332 However, your certificate's issuer may use a different order.
333 So, if your attempt ends up with an error AADSTS700027 -
334 "The provided signature value did not match the expected signature value",
335 you may try use only the leaf cert (in PEM/str format) instead.
337 .. admonition:: Supporting raw assertion obtained from elsewhere
339 *Added in version 1.13.0*:
340 It can also be a completely pre-signed assertion that you've assembled yourself.
341 Simply pass a container containing only the key "client_assertion", like this::
343 {
344 "client_assertion": "...a JWT with claims aud, exp, iss, jti, nbf, and sub..."
345 }
347 .. note::
349 A pre-signed JWT string has a fixed expiration. Long-running
350 confidential client applications (for example, workloads using
351 AKS workload identity federation, or any other dynamic
352 credential source) should instead pass a **callable** which
353 MSAL will invoke on demand to obtain a fresh assertion::
355 def get_client_assertion():
356 # e.g. read the projected service-account token from disk
357 with open("/var/run/secrets/azure/tokens/azure-identity-token") as f:
358 return f.read()
360 app = ConfidentialClientApplication(
361 "client_id",
362 client_credential={"client_assertion": get_client_assertion},
363 ...,
364 )
366 The callable is only invoked when MSAL needs to send a token
367 request on the wire (the in-memory token cache transparently
368 avoids unnecessary calls).
370 If your callback is itself expensive (for example it calls
371 out to a key vault), wrap it in :class:`msal.AutoRefresher`
372 to memoize the assertion for its lifetime::
374 from msal import AutoRefresher
375 smart_callback = AutoRefresher(get_client_assertion, expires_in=3600)
376 app = ConfidentialClientApplication(
377 "client_id",
378 client_credential={"client_assertion": smart_callback},
379 ...,
380 )
382 Passing a plain ``str`` / ``bytes`` ``client_assertion`` is
383 still supported for backward compatibility but is discouraged
384 because the assertion will eventually expire.
386 .. admonition:: Supporting reading client certificates from PFX files
388 This usage will automatically use SHA-256 thumbprint of the certificate.
390 *Added in version 1.29.0*:
391 Feed in a dictionary containing the path to a PFX file::
393 {
394 "private_key_pfx_path": "/path/to/your.pfx", # Added in version 1.29.0
395 "public_certificate": True, # Only needed if you use Subject Name/Issuer auth. Added in version 1.30.0
396 "passphrase": "Passphrase if the private_key is encrypted (Optional)",
397 }
399 The following command will generate a .pfx file from your .key and .pem file::
401 openssl pkcs12 -export -out certificate.pfx -inkey privateKey.key -in certificate.pem
403 `Subject Name/Issuer Auth
404 <https://github.com/AzureAD/microsoft-authentication-library-for-python/issues/60>`_
405 is an approach to allow easier certificate rotation.
406 If your .pfx file contains both the private key and public cert,
407 you can opt in for Subject Name/Issuer Auth by setting "public_certificate" to ``True``.
409 :type client_credential: Union[dict, str, None]
411 :param dict client_claims:
412 *Added in version 0.5.0*:
413 It is a dictionary of extra claims that would be signed by
414 by this :class:`ConfidentialClientApplication` 's private key.
415 For example, you can use {"client_ip": "x.x.x.x"}.
416 You may also override any of the following default claims::
418 {
419 "aud": the_token_endpoint,
420 "iss": self.client_id,
421 "sub": same_as_issuer,
422 "exp": now + 10_min,
423 "iat": now,
424 "jti": a_random_uuid
425 }
427 :param str authority:
428 A URL that identifies a token authority. It should be of the format
429 ``https://login.microsoftonline.com/your_tenant``
430 By default, we will use ``https://login.microsoftonline.com/common``
432 *Changed in version 1.17*: you can also use predefined constant
433 and a builder like this::
435 from msal.authority import (
436 AuthorityBuilder,
437 AZURE_US_GOVERNMENT, AZURE_CHINA, AZURE_PUBLIC)
438 my_authority = AuthorityBuilder(AZURE_PUBLIC, "contoso.onmicrosoft.com")
439 # Now you get an equivalent of
440 # "https://login.microsoftonline.com/contoso.onmicrosoft.com"
442 # You can feed such an authority to msal's ClientApplication
443 from msal import PublicClientApplication
444 app = PublicClientApplication("my_client_id", authority=my_authority, ...)
446 :param bool validate_authority: (optional) Turns authority validation
447 on or off. This parameter default to true.
448 :param TokenCache token_cache:
449 Sets the token cache used by this ClientApplication instance.
450 By default, an in-memory cache will be created and used.
451 :param http_client: (optional)
452 Your implementation of abstract class HttpClient <msal.oauth2cli.http.http_client>
453 Defaults to a requests session instance.
454 Since MSAL 1.11.0, the default session would be configured
455 to attempt one retry on connection error.
456 If you are providing your own http_client,
457 it will be your http_client's duty to decide whether to perform retry.
459 :param verify: (optional)
460 It will be passed to the
461 `verify parameter in the underlying requests library
462 <http://docs.python-requests.org/en/v2.9.1/user/advanced/#ssl-cert-verification>`_
463 This does not apply if you have chosen to pass your own Http client
464 :param proxies: (optional)
465 It will be passed to the
466 `proxies parameter in the underlying requests library
467 <http://docs.python-requests.org/en/v2.9.1/user/advanced/#proxies>`_
468 This does not apply if you have chosen to pass your own Http client
469 :param timeout: (optional)
470 It will be passed to the
471 `timeout parameter in the underlying requests library
472 <http://docs.python-requests.org/en/v2.9.1/user/advanced/#timeouts>`_
473 This does not apply if you have chosen to pass your own Http client
474 :param app_name: (optional)
475 You can provide your application name for Microsoft telemetry purposes.
476 Default value is None, means it will not be passed to Microsoft.
477 :param app_version: (optional)
478 You can provide your application version for Microsoft telemetry purposes.
479 Default value is None, means it will not be passed to Microsoft.
480 :param list[str] client_capabilities: (optional)
481 Allows configuration of one or more client capabilities, e.g. ["CP1"].
483 Client capability is meant to inform the Microsoft identity platform
484 (STS) what this client is capable for,
485 so STS can decide to turn on certain features.
486 For example, if client is capable to handle *claims challenge*,
487 STS may issue
488 `Continuous Access Evaluation (CAE) <https://learn.microsoft.com/entra/identity/conditional-access/concept-continuous-access-evaluation>`_
489 access tokens to resources,
490 knowing that when the resource emits a *claims challenge*
491 the client will be able to handle those challenges.
493 Implementation details:
494 Client capability is implemented using "claims" parameter on the wire,
495 for now.
496 MSAL will combine them into
497 `claims parameter <https://openid.net/specs/openid-connect-core-1_0-final.html#ClaimsParameter>`_
498 which you will later provide via one of the acquire-token request.
500 :param str azure_region: (optional)
501 Instructs MSAL to use the Entra regional token service. This legacy feature is only available to
502 first-party applications. Only ``acquire_token_for_client()`` is supported.
504 Supports 4 values:
506 1. ``azure_region=None`` - This default value means no region is configured.
507 MSAL will use the region defined in env var ``MSAL_FORCE_REGION``.
508 2. ``azure_region="some_region"`` - meaning the specified region is used.
509 3. ``azure_region=True`` - meaning
510 MSAL will try to auto-detect the region. This is not recommended.
511 4. ``azure_region=False`` - meaning MSAL will use no region.
513 .. note::
514 Region auto-discovery has been tested on VMs and on Azure Functions. It is unreliable.
515 Applications using this option should configure a short timeout.
517 For more details and for the values of the region string
518 see https://learn.microsoft.com/entra/msal/dotnet/resources/region-discovery-troubleshooting
520 New in version 1.12.0.
522 :param list[str] exclude_scopes: (optional)
523 Historically MSAL hardcodes `offline_access` scope,
524 which would allow your app to have prolonged access to user's data.
525 If that is unnecessary or undesirable for your app,
526 now you can use this parameter to supply an exclusion list of scopes,
527 such as ``exclude_scopes = ["offline_access"]``.
529 :param dict http_cache:
530 MSAL has long been caching tokens in the ``token_cache``.
531 Recently, MSAL also introduced a concept of ``http_cache``,
532 by automatically caching some finite amount of non-token http responses,
533 so that *long-lived*
534 ``PublicClientApplication`` and ``ConfidentialClientApplication``
535 would be more performant and responsive in some situations.
537 This ``http_cache`` parameter accepts any dict-like object.
538 If not provided, MSAL will use an in-memory dict.
540 If your app is a command-line app (CLI),
541 you would want to persist your http_cache across different CLI runs.
542 The persisted file's format may change due to, but not limited to,
543 `unstable protocol <https://docs.python.org/3/library/pickle.html#data-stream-format>`_,
544 so your implementation shall tolerate unexpected loading errors.
545 The following recipe shows a way to do so::
547 # Just add the following lines at the beginning of your CLI script
548 import sys, atexit, pickle, logging
549 http_cache_filename = sys.argv[0] + ".http_cache"
550 try:
551 with open(http_cache_filename, "rb") as f:
552 persisted_http_cache = pickle.load(f) # Take a snapshot
553 except (
554 FileNotFoundError, # Or IOError in Python 2
555 pickle.UnpicklingError, # A corrupted http cache file
556 AttributeError, # Cache created by a different version of MSAL
557 ):
558 persisted_http_cache = {} # Recover by starting afresh
559 except: # Unexpected exceptions
560 logging.exception("You may want to debug this")
561 persisted_http_cache = {} # Recover by starting afresh
562 atexit.register(lambda: pickle.dump(
563 # When exit, flush it back to the file.
564 # It may occasionally overwrite another process's concurrent write,
565 # but that is fine. Subsequent runs will reach eventual consistency.
566 persisted_http_cache, open(http_cache_file, "wb")))
568 # And then you can implement your app as you normally would
569 app = msal.PublicClientApplication(
570 "your_client_id",
571 ...,
572 http_cache=persisted_http_cache, # Utilize persisted_http_cache
573 ...,
574 #token_cache=..., # You may combine the old token_cache trick
575 # Please refer to token_cache recipe at
576 # https://msal-python.readthedocs.io/en/latest/#msal.SerializableTokenCache
577 )
578 app.acquire_token_interactive(["your", "scope"], ...)
580 Content inside ``http_cache`` are cheap to obtain.
581 There is no need to share them among different apps.
583 Content inside ``http_cache`` will contain no tokens nor
584 Personally Identifiable Information (PII). Encryption is unnecessary.
586 New in version 1.16.0.
588 :param boolean instance_discovery:
589 Historically, MSAL would connect to a central endpoint located at
590 ``https://login.microsoftonline.com`` to acquire some metadata,
591 especially when using an unfamiliar authority.
592 This behavior is known as Instance Discovery.
594 This parameter defaults to None, which enables the Instance Discovery.
596 If you know some authorities which you allow MSAL to operate with as-is,
597 without involving any Instance Discovery, the recommended pattern is::
599 known_authorities = frozenset([ # Treat your known authorities as const
600 "https://contoso.com/adfs", "https://login.azs/foo"])
601 ...
602 authority = "https://contoso.com/adfs" # Assuming your app will use this
603 app1 = PublicClientApplication(
604 "client_id",
605 authority=authority,
606 # Conditionally disable Instance Discovery for known authorities
607 instance_discovery=authority not in known_authorities,
608 )
610 If you do not know some authorities beforehand,
611 yet still want MSAL to accept any authority that you will provide,
612 you can use a ``False`` to unconditionally disable Instance Discovery.
614 New in version 1.19.0.
616 :param boolean allow_broker:
617 Deprecated. Please use ``enable_broker_on_windows`` instead.
619 :param boolean enable_pii_log:
620 When enabled, logs may include PII (Personal Identifiable Information).
621 This can be useful in troubleshooting broker behaviors.
622 The default behavior is False.
624 New in version 1.24.0.
626 :param str oidc_authority:
627 *Added in version 1.28.0*:
628 It is a URL that identifies an OpenID Connect (OIDC) authority of
629 the format ``https://contoso.com/tenant``.
630 MSAL will append ".well-known/openid-configuration" to the authority
631 and retrieve the OIDC metadata from there, to figure out the endpoints.
633 Note: Broker will NOT be used for OIDC authority.
634 """
635 self.client_id = client_id
636 self.client_credential = client_credential
637 self.client_claims = client_claims
638 self._client_capabilities = client_capabilities
639 self._instance_discovery = instance_discovery
641 if exclude_scopes and not isinstance(exclude_scopes, list):
642 raise ValueError(
643 "Invalid exclude_scopes={}. It need to be a list of strings.".format(
644 repr(exclude_scopes)))
645 self._exclude_scopes = frozenset(exclude_scopes or [])
646 if "openid" in self._exclude_scopes:
647 raise ValueError(
648 'Invalid exclude_scopes={}. You can not opt out "openid" scope'.format(
649 repr(exclude_scopes)))
651 if http_client:
652 self.http_client = http_client
653 else:
654 import requests # Lazy load
656 self.http_client = requests.Session()
657 self.http_client.verify = verify
658 self.http_client.proxies = proxies
659 # Requests, does not support session - wide timeout
660 # But you can patch that (https://github.com/psf/requests/issues/3341):
661 self.http_client.request = functools.partial(
662 self.http_client.request, timeout=timeout)
664 # Enable a minimal retry. Better than nothing.
665 # https://github.com/psf/requests/blob/v2.25.1/requests/adapters.py#L94-L108
666 a = requests.adapters.HTTPAdapter(max_retries=1)
667 self.http_client.mount("http://", a)
668 self.http_client.mount("https://", a)
669 self.http_client = ThrottledHttpClient(
670 self.http_client,
671 http_cache=http_cache,
672 default_throttle_time=60
673 # The default value 60 was recommended mainly for PCA at the end of
674 # https://identitydivision.visualstudio.com/devex/_git/AuthLibrariesApiReview?version=GBdev&path=%2FService%20protection%2FIntial%20set%20of%20protection%20measures.md&_a=preview
675 if isinstance(self, PublicClientApplication) else 5,
676 )
678 self.app_name = app_name
679 self.app_version = app_version
681 # Here the self.authority will not be the same type as authority in input
682 if oidc_authority and authority:
683 raise ValueError("You can not provide both authority and oidc_authority")
684 if isinstance(authority, str) and urlparse(authority).path.startswith(
685 "/dstsv2"): # dSTS authority's path always starts with "/dstsv2"
686 oidc_authority = authority # So we treat it as if an oidc_authority
687 try:
688 authority_to_use = authority or "https://{}/common/".format(WORLD_WIDE)
689 self.authority = Authority(
690 authority_to_use,
691 self.http_client,
692 validate_authority=validate_authority,
693 instance_discovery=self._instance_discovery,
694 oidc_authority_url=oidc_authority,
695 )
696 except ValueError: # Those are explicit authority validation errors
697 raise
698 except Exception: # The rest are typically connection errors
699 if validate_authority and not oidc_authority and (
700 azure_region # Opted in to use region
701 or (azure_region is None and os.getenv("MSAL_FORCE_REGION")) # Will use region
702 ):
703 # Since caller opts in to use region, here we tolerate connection
704 # errors happened during authority validation at non-region endpoint
705 self.authority = Authority(
706 authority_to_use,
707 self.http_client,
708 instance_discovery=False,
709 )
710 else:
711 raise
713 self._decide_broker(allow_broker, enable_pii_log)
714 self.token_cache = token_cache or TokenCache()
715 self._region_configured = azure_region
716 self._region_detected = None
717 self.client, self._regional_client = self._build_client(
718 client_credential, self.authority)
719 # Warn if using a static string/bytes client_assertion (discouraged for long-running apps)
720 if isinstance(client_credential, dict) and isinstance(
721 client_credential.get("client_assertion"), (str, bytes)):
722 warnings.warn(
723 "Passing a static string/bytes 'client_assertion' is "
724 "discouraged because the JWT will eventually expire. "
725 "Pass a no-arg callable instead (optionally wrapped in "
726 "msal.AutoRefresher) so MSAL can obtain a fresh "
727 "assertion on demand. "
728 "See https://github.com/AzureAD/microsoft-authentication-library-for-python/issues/746",
729 DeprecationWarning, stacklevel=2)
731 self.authority_groups = {}
732 self._telemetry_buffer = {}
733 self._telemetry_lock = Lock()
734 _msal_extension_check()
737 def _decide_broker(self, allow_broker, enable_pii_log):
738 is_confidential_app = self.client_credential or isinstance(
739 self, ConfidentialClientApplication)
740 if is_confidential_app and allow_broker:
741 raise ValueError("allow_broker=True is only supported in PublicClientApplication")
742 # Historically, we chose to support ClientApplication("client_id", allow_broker=True)
743 if allow_broker:
744 warnings.warn(
745 "allow_broker is deprecated. "
746 "Please use PublicClientApplication(..., "
747 "enable_broker_on_windows=True, "
748 # No need to mention non-Windows platforms, because allow_broker is only for Windows
749 "...)",
750 DeprecationWarning)
751 opted_in_for_broker = (
752 self._enable_broker # True means Opted-in from PCA
753 or (
754 # When we started the broker project on Windows platform,
755 # the allow_broker was meant to be cross-platform. Now we realize
756 # that other platforms have different redirect_uri requirements,
757 # so the old allow_broker is deprecated and will only for Windows.
758 allow_broker and sys.platform == "win32")
759 )
760 self._enable_broker = ( # This same variable will also store the state
761 opted_in_for_broker
762 and not is_confidential_app
763 and not self.authority.is_adfs
764 and not self.authority._is_b2c
765 )
766 if self._enable_broker:
767 try:
768 _init_broker(enable_pii_log)
769 except RuntimeError:
770 self._enable_broker = False
771 logger.warning( # It is common on Mac and Linux where broker is not built-in
772 "Broker is unavailable on this platform. "
773 "We will fallback to non-broker.")
774 logger.debug("Broker enabled? %s", self._enable_broker)
776 def is_pop_supported(self):
777 """Returns True if this client supports Proof-of-Possession Access Token."""
778 return self._enable_broker and sys.platform in ("win32", "darwin")
780 def _decorate_scope(
781 self, scopes,
782 reserved_scope=frozenset(['openid', 'profile', 'offline_access'])):
783 if not isinstance(scopes, (list, set, tuple)):
784 raise ValueError("The input scopes should be a list, tuple, or set")
785 scope_set = set(scopes) # Input scopes is typically a list. Copy it to a set.
786 if scope_set & reserved_scope:
787 # These scopes are reserved for the API to provide good experience.
788 # We could make the developer pass these and then if they do they will
789 # come back asking why they don't see refresh token or user information.
790 raise ValueError(
791 """You cannot use any scope value that is reserved.
792Your input: {}
793The reserved list: {}""".format(list(scope_set), list(reserved_scope)))
794 raise ValueError(
795 "You cannot use any scope value that is in this reserved list: {}".format(
796 list(reserved_scope)))
798 # client_id can also be used as a scope in B2C
799 decorated = scope_set | reserved_scope
800 decorated -= self._exclude_scopes
801 return list(decorated)
803 def _build_telemetry_context(
804 self, api_id, correlation_id=None, refresh_reason=None):
805 return msal.telemetry._TelemetryContext(
806 self._telemetry_buffer, self._telemetry_lock, api_id,
807 correlation_id=correlation_id, refresh_reason=refresh_reason)
809 def _get_regional_authority(self, central_authority) -> Optional[Authority]:
810 if self._region_configured is False: # User opts out of ESTS-R
811 return None # Short circuit to completely bypass region detection
812 if self._region_configured is None: # User did not make an ESTS-R choice
813 self._region_configured = os.getenv("MSAL_FORCE_REGION") or None
814 self._region_detected = self._region_detected or _detect_region(
815 self.http_client if self._region_configured is not None else None)
816 if (self._region_configured != self.ATTEMPT_REGION_DISCOVERY
817 and self._region_configured != self._region_detected):
818 logger.warning('Region configured ({}) != region detected ({})'.format(
819 repr(self._region_configured), repr(self._region_detected)))
820 region_to_use = (
821 self._region_detected
822 if self._region_configured == self.ATTEMPT_REGION_DISCOVERY
823 else self._region_configured) # It will retain the None i.e. opted out
824 logger.debug('Region to be used: {}'.format(repr(region_to_use)))
825 if region_to_use:
826 regional_host = ("{}.login.microsoft.com".format(region_to_use)
827 if central_authority.instance in (
828 # The list came from point 3 of the algorithm section in this internal doc
829 # https://identitydivision.visualstudio.com/DevEx/_git/AuthLibrariesApiReview?path=/PinAuthToRegion/AAD%20SDK%20Proposal%20to%20Pin%20Auth%20to%20region.md&anchor=algorithm&_a=preview
830 "login.microsoftonline.com",
831 "login.microsoft.com",
832 "login.windows.net",
833 "sts.windows.net",
834 )
835 else "{}.{}".format(region_to_use, central_authority.instance))
836 return Authority( # The central_authority has already been validated
837 "https://{}/{}".format(regional_host, central_authority.tenant),
838 self.http_client,
839 instance_discovery=False,
840 )
841 return None
843 def _build_client(self, client_credential, authority, skip_regional_client=False):
844 client_assertion = None
845 client_assertion_type = None
846 default_headers = {
847 "x-client-sku": SKU, "x-client-ver": __version__,
848 "x-client-os": sys.platform,
849 "x-ms-lib-capability": "retry-after, h429",
850 }
851 if self.app_name:
852 default_headers['x-app-name'] = self.app_name
853 if self.app_version:
854 default_headers['x-app-ver'] = self.app_version
855 default_body = {"client_info": 1}
856 if isinstance(client_credential, dict):
857 client_assertion_type = Client.CLIENT_ASSERTION_TYPE_JWT
858 # Use client_credential.get("...") rather than "..." in client_credential
859 # so that we can ignore an empty string came from an empty ENV VAR.
860 if client_credential.get("client_assertion"):
861 client_assertion = client_credential['client_assertion']
862 else:
863 headers = {}
864 sha1_thumbprint = sha256_thumbprint = None
865 passphrase_bytes = _str2bytes(
866 client_credential["passphrase"]
867 ) if client_credential.get("passphrase") else None
868 if client_credential.get("private_key_pfx_path"):
869 private_key, sha256_thumbprint, sha1_thumbprint, x5c = _parse_pfx(
870 client_credential["private_key_pfx_path"],
871 passphrase_bytes)
872 if client_credential.get("public_certificate") is True and x5c:
873 headers["x5c"] = x5c
874 elif client_credential.get("private_key"): # PEM blob
875 private_key = ( # handles both encrypted and unencrypted
876 _load_private_key_from_pem_str(
877 client_credential['private_key'], passphrase_bytes)
878 if passphrase_bytes
879 else client_credential['private_key']
880 )
882 # Determine thumbprints based on what's provided
883 if client_credential.get("thumbprint"):
884 # User provided a thumbprint - use it as SHA-1 (legacy/manual approach)
885 sha1_thumbprint = client_credential["thumbprint"]
886 sha256_thumbprint = None
887 elif isinstance(client_credential.get('public_certificate'), str):
888 # No thumbprint provided, but we have a certificate to calculate thumbprints
889 from cryptography import x509
890 cert = x509.load_pem_x509_certificate(
891 _str2bytes(client_credential['public_certificate']))
892 sha256_thumbprint, sha1_thumbprint, headers["x5c"] = (
893 _extract_cert_and_thumbprints(cert))
894 else:
895 raise ValueError(
896 "You must provide either 'thumbprint' or 'public_certificate' "
897 "from which the thumbprint can be calculated.")
898 else:
899 raise ValueError(
900 "client_credential needs to follow this format "
901 "https://msal-python.readthedocs.io/en/latest/#msal.ClientApplication.params.client_credential")
902 if ("x5c" not in headers # So the .pfx file contains no certificate
903 and isinstance(client_credential.get('public_certificate'), str)
904 ): # Then we treat the public_certificate value as PEM content
905 headers["x5c"] = extract_certs(client_credential['public_certificate'])
906 if sha256_thumbprint and not authority.is_adfs:
907 assertion_params = {
908 "algorithm": "PS256", "sha256_thumbprint": sha256_thumbprint,
909 }
910 else: # Fall back
911 if not sha1_thumbprint:
912 raise ValueError("You shall provide a thumbprint in SHA1.")
913 assertion_params = {
914 "algorithm": "RS256", "sha1_thumbprint": sha1_thumbprint,
915 }
916 assertion = JwtAssertionCreator(
917 private_key, headers=headers, **assertion_params)
918 client_assertion = assertion.create_regenerative_assertion(
919 audience=authority.token_endpoint, issuer=self.client_id,
920 additional_claims=self.client_claims or {})
921 else:
922 default_body['client_secret'] = client_credential
923 central_configuration = {
924 "authorization_endpoint": authority.authorization_endpoint,
925 "token_endpoint": authority.token_endpoint,
926 "device_authorization_endpoint": authority.device_authorization_endpoint,
927 }
928 central_client = _ClientWithCcsRoutingInfo(
929 central_configuration,
930 self.client_id,
931 http_client=self.http_client,
932 default_headers=default_headers,
933 default_body=default_body,
934 client_assertion=client_assertion,
935 client_assertion_type=client_assertion_type,
936 on_obtaining_tokens=lambda event: self.token_cache.add(dict(
937 event, environment=authority.instance)),
938 on_removing_rt=self.token_cache.remove_rt,
939 on_updating_rt=self.token_cache.update_rt)
941 regional_client = None
942 if (client_credential # Currently regional endpoint only serves some CCA flows
943 and not skip_regional_client):
944 regional_authority = self._get_regional_authority(authority)
945 if regional_authority:
946 regional_configuration = {
947 "authorization_endpoint": regional_authority.authorization_endpoint,
948 "token_endpoint": regional_authority.token_endpoint,
949 "device_authorization_endpoint":
950 regional_authority.device_authorization_endpoint,
951 }
952 regional_client = _ClientWithCcsRoutingInfo(
953 regional_configuration,
954 self.client_id,
955 http_client=self.http_client,
956 default_headers=default_headers,
957 default_body=default_body,
958 client_assertion=client_assertion,
959 client_assertion_type=client_assertion_type,
960 on_obtaining_tokens=lambda event: self.token_cache.add(dict(
961 event, environment=authority.instance)),
962 on_removing_rt=self.token_cache.remove_rt,
963 on_updating_rt=self.token_cache.update_rt)
964 return central_client, regional_client
966 def initiate_auth_code_flow(
967 self,
968 scopes, # type: list[str]
969 redirect_uri=None,
970 state=None, # Recommended by OAuth2 for CSRF protection
971 prompt=None,
972 login_hint=None, # type: Optional[str]
973 domain_hint=None, # type: Optional[str]
974 claims_challenge=None,
975 max_age=None,
976 response_mode=None, # type: Optional[str]
977 ):
978 """Initiate an auth code flow.
980 Later when the response reaches your redirect_uri,
981 you can use :func:`~acquire_token_by_auth_code_flow()`
982 to complete the authentication/authorization.
984 :param list scopes:
985 It is a list of case-sensitive strings.
986 :param str redirect_uri:
987 Optional. If not specified, server will use the pre-registered one.
988 :param str state:
989 An opaque value used by the client to
990 maintain state between the request and callback.
991 If absent, this library will automatically generate one internally.
992 :param str prompt:
993 By default, no prompt value will be sent, not even string ``"none"``.
994 You will have to specify a value explicitly.
995 Its valid values are the constants defined in
996 :class:`Prompt <msal.Prompt>`.
998 :param str login_hint:
999 Optional. Identifier of the user. Generally a User Principal Name (UPN).
1000 :param domain_hint:
1001 Can be one of "consumers" or "organizations" or your tenant domain "contoso.com".
1002 If included, it will skip the email-based discovery process that user goes
1003 through on the sign-in page, leading to a slightly more streamlined user experience.
1004 More information on possible values available in
1005 `Auth Code Flow doc <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-auth-code-flow#request-an-authorization-code>`_ and
1006 `domain_hint doc <https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-oapx/86fb452d-e34a-494e-ac61-e526e263b6d8>`_.
1008 :param int max_age:
1009 OPTIONAL. Maximum Authentication Age.
1010 Specifies the allowable elapsed time in seconds
1011 since the last time the End-User was actively authenticated.
1012 If the elapsed time is greater than this value,
1013 Microsoft identity platform will actively re-authenticate the End-User.
1015 MSAL Python will also automatically validate the auth_time in ID token.
1017 New in version 1.15.
1019 :param str response_mode:
1020 OPTIONAL. Specifies the method with which response parameters should be returned.
1021 The default value is equivalent to ``query``, which was still secure enough in MSAL Python
1022 (because MSAL Python does not transfer tokens via query parameter in the first place).
1023 For even better security, we recommend using the value ``form_post``.
1024 In "form_post" mode, response parameters
1025 will be encoded as HTML form values that are transmitted via the HTTP POST method and
1026 encoded in the body using the application/x-www-form-urlencoded format.
1027 Valid values can be either "form_post" for HTTP POST to callback URI or
1028 "query" (the default) for HTTP GET with parameters encoded in query string.
1029 More information on possible values
1030 `here <https://openid.net/specs/oauth-v2-multiple-response-types-1_0.html#ResponseModes>`
1031 and `here <https://openid.net/specs/oauth-v2-form-post-response-mode-1_0.html#FormPostResponseMode>`
1033 .. note::
1034 You should configure your web framework to accept form_post responses instead of query responses.
1035 While this parameter still works, it will be removed in a future version.
1036 Using query-based response modes is less secure and should be avoided.
1038 :return:
1039 The auth code flow. It is a dict in this form::
1041 {
1042 "auth_uri": "https://...", // Guide user to visit this
1043 "state": "...", // You may choose to verify it by yourself,
1044 // or just let acquire_token_by_auth_code_flow()
1045 // do that for you.
1046 "...": "...", // Everything else are reserved and internal
1047 }
1049 The caller is expected to:
1051 1. somehow store this content, typically inside the current session,
1052 2. guide the end user (i.e. resource owner) to visit that auth_uri,
1053 3. and then relay this dict and subsequent auth response to
1054 :func:`~acquire_token_by_auth_code_flow()`.
1055 """
1056 # Note to maintainers: Do not emit warning for the use of response_mode here,
1057 # because response_mode=form_post is still the recommended usage for MSAL Python 1.x.
1058 # App developers making the right call shall not be disturbed by unactionable warnings.
1059 client = _ClientWithCcsRoutingInfo(
1060 {"authorization_endpoint": self.authority.authorization_endpoint},
1061 self.client_id,
1062 http_client=self.http_client)
1063 flow = client.initiate_auth_code_flow(
1064 redirect_uri=redirect_uri, state=state, login_hint=login_hint,
1065 prompt=prompt,
1066 scope=self._decorate_scope(scopes),
1067 domain_hint=domain_hint,
1068 claims=_merge_claims_challenge_and_capabilities(
1069 self._client_capabilities, claims_challenge),
1070 max_age=max_age,
1071 response_mode=response_mode,
1072 )
1073 flow["claims_challenge"] = claims_challenge
1074 return flow
1076 def get_authorization_request_url(
1077 self,
1078 scopes, # type: list[str]
1079 login_hint=None, # type: Optional[str]
1080 state=None, # Recommended by OAuth2 for CSRF protection
1081 redirect_uri=None,
1082 response_type="code", # Could be "token" if you use Implicit Grant
1083 prompt=None,
1084 nonce=None,
1085 domain_hint=None, # type: Optional[str]
1086 claims_challenge=None,
1087 **kwargs):
1088 """Constructs a URL for you to start a Authorization Code Grant.
1090 :param list[str] scopes: (Required)
1091 Scopes requested to access a protected API (a resource).
1092 :param str state: Recommended by OAuth2 for CSRF protection.
1093 :param str login_hint:
1094 Identifier of the user. Generally a User Principal Name (UPN).
1095 :param str redirect_uri:
1096 Address to return to upon receiving a response from the authority.
1097 :param str response_type:
1098 Default value is "code" for an OAuth2 Authorization Code grant.
1100 You could use other content such as "id_token" or "token",
1101 which would trigger an Implicit Grant, but that is
1102 `not recommended <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-implicit-grant-flow#is-the-implicit-grant-suitable-for-my-app>`_.
1104 :param str prompt:
1105 By default, no prompt value will be sent, not even string ``"none"``.
1106 You will have to specify a value explicitly.
1107 Its valid values are the constants defined in
1108 :class:`Prompt <msal.Prompt>`.
1109 :param nonce:
1110 A cryptographically random value used to mitigate replay attacks. See also
1111 `OIDC specs <https://openid.net/specs/openid-connect-core-1_0.html#AuthRequest>`_.
1112 :param domain_hint:
1113 Can be one of "consumers" or "organizations" or your tenant domain "contoso.com".
1114 If included, it will skip the email-based discovery process that user goes
1115 through on the sign-in page, leading to a slightly more streamlined user experience.
1116 More information on possible values available in
1117 `Auth Code Flow doc <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-auth-code-flow#request-an-authorization-code>`_ and
1118 `domain_hint doc <https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-oapx/86fb452d-e34a-494e-ac61-e526e263b6d8>`_.
1119 :param claims_challenge:
1120 The claims_challenge parameter requests specific claims requested by the resource provider
1121 in the form of a claims_challenge directive in the www-authenticate header to be
1122 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
1123 It is a string of a JSON object which contains lists of claims being requested from these locations.
1125 :return: The authorization url as a string.
1126 """
1127 authority = kwargs.pop("authority", None) # Historically we support this
1128 if authority:
1129 warnings.warn(
1130 "We haven't decided if this method will accept authority parameter")
1131 # The previous implementation is, it will use self.authority by default.
1132 # Multi-tenant app can use new authority on demand
1133 the_authority = Authority(
1134 authority,
1135 self.http_client,
1136 instance_discovery=self._instance_discovery,
1137 ) if authority else self.authority
1139 client = _ClientWithCcsRoutingInfo(
1140 {"authorization_endpoint": the_authority.authorization_endpoint},
1141 self.client_id,
1142 http_client=self.http_client)
1143 warnings.warn(
1144 "Change your get_authorization_request_url() "
1145 "to initiate_auth_code_flow()", DeprecationWarning)
1146 with warnings.catch_warnings(record=True):
1147 return client.build_auth_request_uri(
1148 response_type=response_type,
1149 redirect_uri=redirect_uri, state=state, login_hint=login_hint,
1150 prompt=prompt,
1151 scope=self._decorate_scope(scopes),
1152 nonce=nonce,
1153 domain_hint=domain_hint,
1154 claims=_merge_claims_challenge_and_capabilities(
1155 self._client_capabilities, claims_challenge),
1156 )
1158 def acquire_token_by_auth_code_flow(
1159 self, auth_code_flow, auth_response, scopes=None, **kwargs):
1160 """Validate the auth response being redirected back, and obtain tokens.
1162 It automatically provides nonce protection.
1164 :param dict auth_code_flow:
1165 The same dict returned by :func:`~initiate_auth_code_flow()`.
1166 :param dict auth_response:
1167 A dict of the query string received from auth server.
1168 :param list[str] scopes:
1169 Scopes requested to access a protected API (a resource).
1171 Most of the time, you can leave it empty.
1173 If you requested user consent for multiple resources, here you will
1174 need to provide a subset of what you required in
1175 :func:`~initiate_auth_code_flow()`.
1177 OAuth2 was designed mostly for singleton services,
1178 where tokens are always meant for the same resource and the only
1179 changes are in the scopes.
1180 In Microsoft Entra, tokens can be issued for multiple 3rd party resources.
1181 You can ask authorization code for multiple resources,
1182 but when you redeem it, the token is for only one intended
1183 recipient, called audience.
1184 So the developer need to specify a scope so that we can restrict the
1185 token to be issued for the corresponding audience.
1187 :return:
1188 * A dict containing "access_token" and/or "id_token", among others,
1189 depends on what scope was used.
1190 (See https://tools.ietf.org/html/rfc6749#section-5.1)
1191 * A dict containing "error", optionally "error_description", "error_uri".
1192 (It is either `this <https://tools.ietf.org/html/rfc6749#section-4.1.2.1>`_
1193 or `that <https://tools.ietf.org/html/rfc6749#section-5.2>`_)
1194 * Most client-side data error would result in ValueError exception.
1195 So the usage pattern could be without any protocol details::
1197 def authorize(): # A controller in a web app
1198 try:
1199 result = msal_app.acquire_token_by_auth_code_flow(
1200 session.get("flow", {}), request.args)
1201 if "error" in result:
1202 return render_template("error.html", result)
1203 use(result) # Token(s) are available in result and cache
1204 except ValueError: # Usually caused by CSRF
1205 pass # Simply ignore them
1206 return redirect(url_for("index"))
1207 """
1208 self._validate_ssh_cert_input_data(kwargs.get("data", {}))
1209 telemetry_context = self._build_telemetry_context(
1210 self.ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID)
1211 response = _clean_up(self.client.obtain_token_by_auth_code_flow(
1212 auth_code_flow,
1213 auth_response,
1214 scope=self._decorate_scope(scopes) if scopes else None,
1215 headers=telemetry_context.generate_headers(),
1216 data=dict(
1217 kwargs.pop("data", {}),
1218 claims=_merge_claims_challenge_and_capabilities(
1219 self._client_capabilities,
1220 auth_code_flow.pop("claims_challenge", None))),
1221 **kwargs))
1222 if "access_token" in response:
1223 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP
1224 telemetry_context.update_telemetry(response)
1225 return response
1227 def acquire_token_by_authorization_code(
1228 self,
1229 code,
1230 scopes, # Syntactically required. STS accepts empty value though.
1231 redirect_uri=None,
1232 # REQUIRED, if the "redirect_uri" parameter was included in the
1233 # authorization request as described in Section 4.1.1, and their
1234 # values MUST be identical.
1235 nonce=None,
1236 claims_challenge=None,
1237 **kwargs):
1238 """The second half of the Authorization Code Grant.
1240 :param code: The authorization code returned from Authorization Server.
1241 :param list[str] scopes: (Required)
1242 Scopes requested to access a protected API (a resource).
1244 If you requested user consent for multiple resources, here you will
1245 typically want to provide a subset of what you required in AuthCode.
1247 OAuth2 was designed mostly for singleton services,
1248 where tokens are always meant for the same resource and the only
1249 changes are in the scopes.
1250 In Microsoft Entra, tokens can be issued for multiple 3rd party resources.
1251 You can ask authorization code for multiple resources,
1252 but when you redeem it, the token is for only one intended
1253 recipient, called audience.
1254 So the developer need to specify a scope so that we can restrict the
1255 token to be issued for the corresponding audience.
1257 :param nonce:
1258 If you provided a nonce when calling :func:`get_authorization_request_url`,
1259 same nonce should also be provided here, so that we'll validate it.
1260 An exception will be raised if the nonce in id token mismatches.
1262 :param claims_challenge:
1263 The claims_challenge parameter requests specific claims requested by the resource provider
1264 in the form of a claims_challenge directive in the www-authenticate header to be
1265 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
1266 It is a string of a JSON object which contains lists of claims being requested from these locations.
1268 :return: A dict representing the json response from Microsoft Entra:
1270 - A successful response would contain "access_token" key,
1271 - an error response would contain "error" and usually "error_description".
1272 """
1273 # If scope is absent on the wire, STS will give you a token associated
1274 # to the FIRST scope sent during the authorization request.
1275 # So in theory, you can omit scope here when you were working with only
1276 # one scope. But, MSAL decorates your scope anyway, so they are never
1277 # really empty.
1278 assert isinstance(scopes, list), "Invalid parameter type"
1279 self._validate_ssh_cert_input_data(kwargs.get("data", {}))
1280 warnings.warn(
1281 "Change your acquire_token_by_authorization_code() "
1282 "to acquire_token_by_auth_code_flow()", DeprecationWarning)
1283 with warnings.catch_warnings(record=True):
1284 telemetry_context = self._build_telemetry_context(
1285 self.ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID)
1286 response = _clean_up(self.client.obtain_token_by_authorization_code(
1287 code, redirect_uri=redirect_uri,
1288 scope=self._decorate_scope(scopes),
1289 headers=telemetry_context.generate_headers(),
1290 data=dict(
1291 kwargs.pop("data", {}),
1292 claims=_merge_claims_challenge_and_capabilities(
1293 self._client_capabilities, claims_challenge)),
1294 nonce=nonce,
1295 **kwargs))
1296 if "access_token" in response:
1297 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP
1298 telemetry_context.update_telemetry(response)
1299 return response
1301 def get_accounts(self, username=None):
1302 """Get a list of accounts which previously signed in, i.e. exists in cache.
1304 An account can later be used in :func:`~acquire_token_silent`
1305 to find its tokens.
1307 :param username:
1308 Filter accounts with this username only. Case insensitive.
1309 :return: A list of account objects.
1310 Each account is a dict. For now, we only document its "username" field.
1311 Your app can choose to display those information to end user,
1312 and allow user to choose one of his/her accounts to proceed.
1313 """
1314 accounts = self._find_msal_accounts(environment=self.authority.instance)
1315 if not accounts: # Now try other aliases of this authority instance
1316 for alias in self._get_authority_aliases(self.authority.instance):
1317 accounts = self._find_msal_accounts(environment=alias)
1318 if accounts:
1319 break
1320 if username:
1321 # Federated account["username"] from AAD could contain mixed case
1322 lowercase_username = username.lower()
1323 accounts = [a for a in accounts
1324 if a["username"].lower() == lowercase_username]
1325 if not accounts:
1326 logger.debug(( # This would also happen when the cache is empty
1327 "get_accounts(username='{}') finds no account. "
1328 "If tokens were acquired without 'profile' scope, "
1329 "they would contain no username for filtering. "
1330 "Consider calling get_accounts(username=None) instead."
1331 ).format(username))
1332 # Does not further filter by existing RTs here. It probably won't matter.
1333 # Because in most cases Accounts and RTs co-exist.
1334 # Even in the rare case when an RT is revoked and then removed,
1335 # acquire_token_silent() would then yield no result,
1336 # apps would fall back to other acquire methods. This is the standard pattern.
1337 return accounts
1339 def _find_msal_accounts(self, environment):
1340 interested_authority_types = [
1341 TokenCache.AuthorityType.ADFS, TokenCache.AuthorityType.MSSTS]
1342 if _is_running_in_cloud_shell():
1343 interested_authority_types.append(_AUTHORITY_TYPE_CLOUDSHELL)
1344 grouped_accounts = {
1345 a.get("home_account_id"): # Grouped by home tenant's id
1346 { # These are minimal amount of non-tenant-specific account info
1347 "home_account_id": a.get("home_account_id"),
1348 "environment": a.get("environment"),
1349 "username": a.get("username"),
1350 "account_source": a.get("account_source"),
1352 # The following fields for backward compatibility, for now
1353 "authority_type": a.get("authority_type"),
1354 "local_account_id": a.get("local_account_id"), # Tenant-specific
1355 "realm": a.get("realm"), # Tenant-specific
1356 }
1357 for a in self.token_cache.search(
1358 TokenCache.CredentialType.ACCOUNT,
1359 query={"environment": environment})
1360 if a["authority_type"] in interested_authority_types
1361 }
1362 return list(grouped_accounts.values())
1364 def _get_instance_metadata(self, instance): # This exists so it can be mocked in unit test
1365 instance_discovery_host = _get_instance_discovery_host(instance)
1366 resp = self.http_client.get(
1367 _get_instance_discovery_endpoint(instance),
1368 params={
1369 'api-version': '1.1',
1370 'authorization_endpoint': (
1371 "https://{}/common/oauth2/authorize".format(instance_discovery_host)
1372 ),
1373 },
1374 headers={'Accept': 'application/json'})
1375 resp.raise_for_status()
1376 return json.loads(resp.text)['metadata']
1378 def _get_authority_aliases(self, instance):
1379 if self._instance_discovery is False:
1380 return []
1381 if self.authority._is_known_to_developer:
1382 # Then it is an ADFS/B2C/known_authority_hosts situation
1383 # which may not reach the central endpoint, so we skip it.
1384 return []
1385 if instance not in self.authority_groups:
1386 self.authority_groups[instance] = [
1387 set(group['aliases']) for group in self._get_instance_metadata(instance)]
1388 for group in self.authority_groups[instance]:
1389 if instance in group:
1390 return [alias for alias in group if alias != instance]
1391 return []
1393 def remove_account(self, account):
1394 """Sign me out and forget me from token cache"""
1395 if self._enable_broker:
1396 from .broker import _signout_silently
1397 error = _signout_silently(self.client_id, account["local_account_id"])
1398 if error:
1399 logger.debug("_signout_silently() returns error: %s", error)
1400 # Broker sign-out has been attempted, even if the _forget_me() below throws.
1401 self._forget_me(account)
1403 def _sign_out(self, home_account):
1404 # Remove all relevant RTs and ATs from token cache
1405 owned_by_home_account = {
1406 "environment": home_account["environment"],
1407 "home_account_id": home_account["home_account_id"],} # realm-independent
1408 app_metadata = self._get_app_metadata(home_account["environment"])
1409 # Remove RTs/FRTs, and they are realm-independent
1410 for rt in [ # Remove RTs from a static list (rather than from a dynamic generator),
1411 # to avoid changing self.token_cache while it is being iterated
1412 rt for rt in self.token_cache.search(
1413 TokenCache.CredentialType.REFRESH_TOKEN, query=owned_by_home_account)
1414 # Do RT's app ownership check as a precaution, in case family apps
1415 # and 3rd-party apps share same token cache, although they should not.
1416 if rt["client_id"] == self.client_id or (
1417 app_metadata.get("family_id") # Now let's settle family business
1418 and rt.get("family_id") == app_metadata["family_id"])
1419 ]:
1420 self.token_cache.remove_rt(rt)
1421 for at in list(self.token_cache.search( # Remove ATs from a static list,
1422 # to avoid changing self.token_cache while it is being iterated
1423 TokenCache.CredentialType.ACCESS_TOKEN, query=owned_by_home_account,
1424 # Regardless of realm, b/c we've removed realm-independent RTs anyway
1425 )):
1426 # To avoid the complexity of locating sibling family app's AT,
1427 # we skip AT's app ownership check.
1428 # It means ATs for other apps will also be removed, it is OK because:
1429 # * non-family apps are not supposed to share token cache to begin with;
1430 # * Even if it happens, we keep other app's RT already, so SSO still works
1431 self.token_cache.remove_at(at)
1433 def _forget_me(self, home_account):
1434 # It implies signout, and then also remove all relevant accounts and IDTs
1435 self._sign_out(home_account)
1436 owned_by_home_account = {
1437 "environment": home_account["environment"],
1438 "home_account_id": home_account["home_account_id"],} # realm-independent
1439 for idt in list(self.token_cache.search( # Remove IDTs from a static list,
1440 # to avoid changing self.token_cache while it is being iterated
1441 TokenCache.CredentialType.ID_TOKEN, query=owned_by_home_account, # regardless of realm
1442 )):
1443 self.token_cache.remove_idt(idt)
1444 for a in list(self.token_cache.search( # Remove Accounts from a static list,
1445 # to avoid changing self.token_cache while it is being iterated
1446 TokenCache.CredentialType.ACCOUNT, query=owned_by_home_account, # regardless of realm
1447 )):
1448 self.token_cache.remove_account(a)
1450 def _acquire_token_by_cloud_shell(self, scopes, data=None):
1451 from .cloudshell import _obtain_token
1452 response = _obtain_token(
1453 self.http_client, scopes, client_id=self.client_id, data=data)
1454 if "error" not in response:
1455 self.token_cache.add(dict(
1456 client_id=self.client_id,
1457 scope=response["scope"].split() if "scope" in response else scopes,
1458 token_endpoint=self.authority.token_endpoint,
1459 response=response,
1460 data=data or {},
1461 authority_type=_AUTHORITY_TYPE_CLOUDSHELL,
1462 ))
1463 if "access_token" in response:
1464 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_BROKER
1465 return response
1467 def acquire_token_silent(
1468 self,
1469 scopes, # type: List[str]
1470 account, # type: Optional[Account]
1471 authority=None, # See get_authorization_request_url()
1472 force_refresh=False, # type: Optional[boolean]
1473 claims_challenge=None,
1474 auth_scheme=None,
1475 **kwargs):
1476 """Acquire an access token for given account, without user interaction.
1478 It has same parameters as the :func:`~acquire_token_silent_with_error`.
1479 The difference is the behavior of the return value.
1480 This method will combine the cache empty and refresh error
1481 into one return value, `None`.
1482 If your app does not care about the exact token refresh error during
1483 token cache look-up, then this method is easier and recommended.
1485 :return:
1486 - A dict containing no "error" key,
1487 and typically contains an "access_token" key,
1488 if cache lookup succeeded.
1489 - None when cache lookup does not yield a token.
1490 """
1491 if not account:
1492 return None # A backward-compatible NO-OP to drop the account=None usage
1493 result = _clean_up(self._acquire_token_silent_with_error(
1494 scopes, account, authority=authority, force_refresh=force_refresh,
1495 claims_challenge=claims_challenge, auth_scheme=auth_scheme, **kwargs))
1496 return result if result and "error" not in result else None
1498 def acquire_token_silent_with_error(
1499 self,
1500 scopes, # type: List[str]
1501 account, # type: Optional[Account]
1502 authority=None, # See get_authorization_request_url()
1503 force_refresh=False, # type: Optional[boolean]
1504 claims_challenge=None,
1505 auth_scheme=None,
1506 **kwargs):
1507 """Acquire an access token for given account, without user interaction.
1509 It is done either by finding a valid access token from cache,
1510 or by finding a valid refresh token from cache and then automatically
1511 use it to redeem a new access token.
1513 This method will differentiate cache empty from token refresh error.
1514 If your app cares the exact token refresh error during
1515 token cache look-up, then this method is suitable.
1516 Otherwise, the other method :func:`~acquire_token_silent` is recommended.
1518 :param list[str] scopes: (Required)
1519 Scopes requested to access a protected API (a resource).
1520 :param account: (Required)
1521 One of the account object returned by :func:`~get_accounts`.
1522 Starting from MSAL Python 1.23,
1523 a ``None`` input will become a NO-OP and always return ``None``.
1524 :param force_refresh:
1525 If True, it will skip Access Token look-up,
1526 and try to find a Refresh Token to obtain a new Access Token.
1527 :param claims_challenge:
1528 The claims_challenge parameter requests specific claims requested by the resource provider
1529 in the form of a claims_challenge directive in the www-authenticate header to be
1530 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
1531 It is a string of a JSON object which contains lists of claims being requested from these locations.
1532 :param object auth_scheme:
1533 You can provide an ``msal.auth_scheme.PopAuthScheme`` object
1534 so that MSAL will get a Proof-of-Possession (POP) token for you.
1536 New in version 1.26.0.
1538 :return:
1539 - A dict containing no "error" key,
1540 and typically contains an "access_token" key,
1541 if cache lookup succeeded.
1542 - None when there is simply no token in the cache.
1543 - A dict containing an "error" key, when token refresh failed.
1544 """
1545 if not account:
1546 return None # A backward-compatible NO-OP to drop the account=None usage
1547 return _clean_up(self._acquire_token_silent_with_error(
1548 scopes, account, authority=authority, force_refresh=force_refresh,
1549 claims_challenge=claims_challenge, auth_scheme=auth_scheme, **kwargs))
1551 def _acquire_token_silent_with_error(
1552 self,
1553 scopes, # type: List[str]
1554 account, # type: Optional[Account]
1555 authority=None, # See get_authorization_request_url()
1556 force_refresh=False, # type: Optional[boolean]
1557 claims_challenge=None,
1558 auth_scheme=None,
1559 **kwargs):
1560 assert isinstance(scopes, list), "Invalid parameter type"
1561 self._validate_ssh_cert_input_data(kwargs.get("data", {}))
1562 correlation_id = msal.telemetry._get_new_correlation_id()
1563 if authority:
1564 warnings.warn("We haven't decided how/if this method will accept authority parameter")
1565 # the_authority = Authority(
1566 # authority,
1567 # self.http_client,
1568 # instance_discovery=self._instance_discovery,
1569 # ) if authority else self.authority
1570 result = self._acquire_token_silent_from_cache_and_possibly_refresh_it(
1571 scopes, account, self.authority, force_refresh=force_refresh,
1572 claims_challenge=claims_challenge,
1573 correlation_id=correlation_id,
1574 auth_scheme=auth_scheme,
1575 **kwargs)
1576 if result and "error" not in result:
1577 return result
1578 final_result = result
1579 for alias in self._get_authority_aliases(self.authority.instance):
1580 if not list(self.token_cache.search( # Need a list to test emptiness
1581 self.token_cache.CredentialType.REFRESH_TOKEN,
1582 # target=scopes, # MUST NOT filter by scopes, because:
1583 # 1. AAD RTs are scope-independent;
1584 # 2. therefore target is optional per schema;
1585 query={"environment": alias})):
1586 # Skip heavy weight logic when RT for this alias doesn't exist
1587 continue
1588 the_authority = Authority(
1589 "https://" + alias + "/" + self.authority.tenant,
1590 self.http_client,
1591 instance_discovery=False,
1592 )
1593 result = self._acquire_token_silent_from_cache_and_possibly_refresh_it(
1594 scopes, account, the_authority, force_refresh=force_refresh,
1595 claims_challenge=claims_challenge,
1596 correlation_id=correlation_id,
1597 auth_scheme=auth_scheme,
1598 **kwargs)
1599 if result:
1600 if "error" not in result:
1601 return result
1602 final_result = result
1603 if final_result and final_result.get("suberror"):
1604 final_result["classification"] = { # Suppress these suberrors, per #57
1605 "bad_token": "",
1606 "token_expired": "",
1607 "protection_policy_required": "",
1608 "client_mismatch": "",
1609 "device_authentication_failed": "",
1610 }.get(final_result["suberror"], final_result["suberror"])
1611 return final_result
1613 def _acquire_token_silent_from_cache_and_possibly_refresh_it(
1614 self,
1615 scopes, # type: List[str]
1616 account, # type: Optional[Account]
1617 authority, # This can be different than self.authority
1618 force_refresh=False, # type: Optional[boolean]
1619 claims_challenge=None,
1620 correlation_id=None,
1621 http_exceptions=None,
1622 auth_scheme=None,
1623 **kwargs):
1624 # This internal method has two calling patterns:
1625 # it accepts a non-empty account to find token for a user,
1626 # and accepts account=None to find a token for the current app.
1627 access_token_from_cache = None
1628 if not (force_refresh or claims_challenge or auth_scheme): # Then attempt AT cache
1629 query={
1630 "client_id": self.client_id,
1631 "environment": authority.instance,
1632 "realm": authority.tenant,
1633 "home_account_id": (account or {}).get("home_account_id"),
1634 }
1635 key_id = kwargs.get("data", {}).get("key_id")
1636 if key_id: # Some token types (SSH-certs, POP) are bound to a key
1637 query["key_id"] = key_id
1638 ext_cache_key = _compute_ext_cache_key(kwargs.get("data", {}))
1639 if ext_cache_key: # FMI tokens need cache isolation by path
1640 query["ext_cache_key"] = ext_cache_key
1641 now = time.time()
1642 refresh_reason = msal.telemetry.AT_ABSENT
1643 for entry in self.token_cache.search( # A generator allows us to
1644 # break early in cache-hit without finding a full list
1645 self.token_cache.CredentialType.ACCESS_TOKEN,
1646 target=scopes,
1647 query=query,
1648 ): # This loop is about token search, not about token deletion.
1649 # Note that search() holds a lock during this loop;
1650 # that is fine because this loop is fast
1651 expires_in = int(entry["expires_on"]) - now
1652 if expires_in < 5*60: # Then consider it expired
1653 refresh_reason = msal.telemetry.AT_EXPIRED
1654 continue # Removal is not necessary, it will be overwritten
1655 logger.debug("Cache hit an AT")
1656 access_token_from_cache = { # Mimic a real response
1657 "access_token": entry["secret"],
1658 "token_type": entry.get("token_type", "Bearer"),
1659 "expires_in": int(expires_in), # OAuth2 specs defines it as int
1660 self._TOKEN_SOURCE: self._TOKEN_SOURCE_CACHE,
1661 }
1662 if "refresh_on" in entry:
1663 access_token_from_cache["refresh_on"] = int(entry["refresh_on"])
1664 if int(entry["refresh_on"]) < now: # aging
1665 refresh_reason = msal.telemetry.AT_AGING
1666 break # With a fallback in hand, we break here to go refresh
1667 self._build_telemetry_context(-1).hit_an_access_token()
1668 return access_token_from_cache # It is still good as new
1669 else:
1670 refresh_reason = msal.telemetry.FORCE_REFRESH # TODO: It could also mean claims_challenge
1671 assert refresh_reason, "It should have been established at this point"
1672 if not http_exceptions: # It can be a tuple of exceptions
1673 # The exact HTTP exceptions are transportation-layer dependent
1674 from requests.exceptions import RequestException # Lazy load
1675 http_exceptions = (RequestException,)
1676 try:
1677 data = kwargs.get("data", {})
1678 if account and account.get("authority_type") == _AUTHORITY_TYPE_CLOUDSHELL:
1679 if auth_scheme:
1680 raise ValueError("auth_scheme is not supported in Cloud Shell")
1681 return self._acquire_token_by_cloud_shell(scopes, data=data)
1683 is_ssh_cert_or_pop_request = _is_ssh_cert_or_pop_request(data.get("token_type"), auth_scheme)
1685 if self._enable_broker and account and account.get("account_source") in (
1686 _GRANT_TYPE_BROKER, # Broker successfully established this account previously.
1687 None, # Unknown data from older MSAL. Broker might still work.
1688 ) and (sys.platform in ("win32", "darwin") or not is_ssh_cert_or_pop_request):
1689 from .broker import _acquire_token_silently
1690 response = _acquire_token_silently(
1691 "https://{}/{}".format(self.authority.instance, self.authority.tenant),
1692 self.client_id,
1693 account["local_account_id"],
1694 scopes,
1695 claims=_merge_claims_challenge_and_capabilities(
1696 self._client_capabilities, claims_challenge),
1697 correlation_id=correlation_id,
1698 auth_scheme=auth_scheme,
1699 **data)
1700 if response: # Broker provides a decisive outcome
1701 account_was_established_by_broker = account.get(
1702 "account_source") == _GRANT_TYPE_BROKER
1703 broker_attempt_succeeded_just_now = "error" not in response
1704 if account_was_established_by_broker or broker_attempt_succeeded_just_now:
1705 return self._process_broker_response(response, scopes, data)
1707 if auth_scheme:
1708 raise ValueError(self._AUTH_SCHEME_UNSUPPORTED)
1709 if account:
1710 result = self._acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family(
1711 authority, self._decorate_scope(scopes), account,
1712 refresh_reason=refresh_reason, claims_challenge=claims_challenge,
1713 correlation_id=correlation_id,
1714 **kwargs)
1715 else: # The caller is acquire_token_for_client()
1716 result = self._acquire_token_for_client(
1717 scopes, refresh_reason, claims_challenge=claims_challenge,
1718 **kwargs)
1719 if result and "access_token" in result:
1720 result[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP
1721 if (result and "error" not in result) or (not access_token_from_cache):
1722 return result
1723 except http_exceptions:
1724 # Typically network error. Potential AAD outage?
1725 if not access_token_from_cache: # It means there is no fall back option
1726 raise # We choose to bubble up the exception
1727 return access_token_from_cache
1729 def _process_broker_response(self, response, scopes, data):
1730 if "error" not in response:
1731 self.token_cache.add(dict(
1732 client_id=self.client_id,
1733 scope=response["scope"].split() if "scope" in response else scopes,
1734 token_endpoint=self.authority.token_endpoint,
1735 response=response,
1736 data=data,
1737 _account_id=response["_account_id"],
1738 environment=self.authority.instance, # Be consistent with non-broker flows
1739 grant_type=_GRANT_TYPE_BROKER, # A pseudo grant type for TokenCache to mark account_source as broker
1740 ))
1741 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_BROKER
1742 return _clean_up(response)
1744 def _acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family(
1745 self, authority, scopes, account, **kwargs):
1746 query = {
1747 "environment": authority.instance,
1748 "home_account_id": (account or {}).get("home_account_id"),
1749 # "realm": authority.tenant, # AAD RTs are tenant-independent
1750 }
1751 app_metadata = self._get_app_metadata(authority.instance)
1752 if not app_metadata: # Meaning this app is now used for the first time.
1753 # When/if we have a way to directly detect current app's family,
1754 # we'll rewrite this block, to support multiple families.
1755 # For now, we try existing RTs (*). If it works, we are in that family.
1756 # (*) RTs of a different app/family are not supposed to be
1757 # shared with or accessible by us in the first place.
1758 at = self._acquire_token_silent_by_finding_specific_refresh_token(
1759 authority, scopes,
1760 dict(query, family_id="1"), # A hack, we have only 1 family for now
1761 rt_remover=lambda rt_item: None, # NO-OP b/c RTs are likely not mine
1762 break_condition=lambda response: # Break loop when app not in family
1763 # Based on an AAD-only behavior mentioned in internal doc here
1764 # https://msazure.visualstudio.com/One/_git/ESTS-Docs/pullrequest/1138595
1765 "client_mismatch" in response.get("error_additional_info", []),
1766 **kwargs)
1767 if at and "error" not in at:
1768 return at
1769 last_resp = None
1770 if app_metadata.get("family_id"): # Meaning this app belongs to this family
1771 last_resp = at = self._acquire_token_silent_by_finding_specific_refresh_token(
1772 authority, scopes, dict(query, family_id=app_metadata["family_id"]),
1773 **kwargs)
1774 if at and "error" not in at:
1775 return at
1776 # Either this app is an orphan, so we will naturally use its own RT;
1777 # or all attempts above have failed, so we fall back to non-foci behavior.
1778 return self._acquire_token_silent_by_finding_specific_refresh_token(
1779 authority, scopes, dict(query, client_id=self.client_id),
1780 **kwargs) or last_resp
1782 def _get_app_metadata(self, environment):
1783 return self.token_cache._get_app_metadata(
1784 environment=environment, client_id=self.client_id, default={})
1786 def _acquire_token_silent_by_finding_specific_refresh_token(
1787 self, authority, scopes, query,
1788 rt_remover=None, break_condition=lambda response: False,
1789 refresh_reason=None, correlation_id=None, claims_challenge=None,
1790 **kwargs):
1791 matches = list(self.token_cache.search( # We want a list to test emptiness
1792 self.token_cache.CredentialType.REFRESH_TOKEN,
1793 # target=scopes, # AAD RTs are scope-independent
1794 query=query))
1795 logger.debug("Found %d RTs matching %s", len(matches), {
1796 k: _pii_less_home_account_id(v) if k == "home_account_id" and v else v
1797 for k, v in query.items()
1798 })
1800 response = None # A distinguishable value to mean cache is empty
1801 if not matches: # Then exit early to avoid expensive operations
1802 return response
1803 client, _ = self._build_client(
1804 # Potentially expensive if building regional client
1805 self.client_credential, authority, skip_regional_client=True)
1806 telemetry_context = self._build_telemetry_context(
1807 self.ACQUIRE_TOKEN_SILENT_ID,
1808 correlation_id=correlation_id, refresh_reason=refresh_reason)
1809 for entry in sorted( # Since unfit RTs would not be aggressively removed,
1810 # we start from newer RTs which are more likely fit.
1811 matches,
1812 key=lambda e: int(e.get("last_modification_time", "0")),
1813 reverse=True):
1814 logger.debug("Cache attempts an RT")
1815 headers = telemetry_context.generate_headers()
1816 if query.get("home_account_id"): # Then use it as CCS Routing info
1817 headers["X-AnchorMailbox"] = "Oid:{}".format( # case-insensitive value
1818 query["home_account_id"].replace(".", "@"))
1819 response = client.obtain_token_by_refresh_token(
1820 entry, rt_getter=lambda token_item: token_item["secret"],
1821 on_removing_rt=lambda rt_item: None, # Disable RT removal,
1822 # because an invalid_grant could be caused by new MFA policy,
1823 # the RT could still be useful for other MFA-less scope or tenant
1824 on_obtaining_tokens=lambda event: self.token_cache.add(dict(
1825 event,
1826 environment=authority.instance,
1827 skip_account_creation=True, # To honor a concurrent remove_account()
1828 )),
1829 scope=scopes,
1830 headers=headers,
1831 data=dict(
1832 kwargs.pop("data", {}),
1833 claims=_merge_claims_challenge_and_capabilities(
1834 self._client_capabilities, claims_challenge)),
1835 **kwargs)
1836 telemetry_context.update_telemetry(response)
1837 if "error" not in response:
1838 return response
1839 logger.debug("Refresh failed. {error}: {error_description}".format(
1840 error=response.get("error"),
1841 error_description=response.get("error_description"),
1842 ))
1843 if break_condition(response):
1844 break
1845 return response # Returns the latest error (if any), or just None
1847 def _validate_ssh_cert_input_data(self, data):
1848 if data.get("token_type") == "ssh-cert":
1849 if not data.get("req_cnf"):
1850 raise ValueError(
1851 "When requesting an SSH certificate, "
1852 "you must include a string parameter named 'req_cnf' "
1853 "containing the public key in JWK format "
1854 "(https://tools.ietf.org/html/rfc7517).")
1855 if not data.get("key_id"):
1856 raise ValueError(
1857 "When requesting an SSH certificate, "
1858 "you must include a string parameter named 'key_id' "
1859 "which identifies the key in the 'req_cnf' argument.")
1861 def acquire_token_by_refresh_token(self, refresh_token, scopes, **kwargs):
1862 """Acquire token(s) based on a refresh token (RT) obtained from elsewhere.
1864 You use this method only when you have old RTs from elsewhere,
1865 and now you want to migrate them into MSAL.
1866 Calling this method results in new tokens automatically storing into MSAL.
1868 You do NOT need to use this method if you are already using MSAL.
1869 MSAL maintains RT automatically inside its token cache,
1870 and an access token can be retrieved
1871 when you call :func:`~acquire_token_silent`.
1873 :param str refresh_token: The old refresh token, as a string.
1875 :param list scopes:
1876 The scopes associate with this old RT.
1877 Each scope needs to be in the Microsoft identity platform (v2) format.
1878 See `Scopes not resources <https://docs.microsoft.com/en-us/azure/active-directory/develop/migrate-python-adal-msal#scopes-not-resources>`_.
1880 :return:
1881 * A dict contains "error" and some other keys, when error happened.
1882 * A dict contains no "error" key means migration was successful.
1883 """
1884 self._validate_ssh_cert_input_data(kwargs.get("data", {}))
1885 telemetry_context = self._build_telemetry_context(
1886 self.ACQUIRE_TOKEN_BY_REFRESH_TOKEN,
1887 refresh_reason=msal.telemetry.FORCE_REFRESH)
1888 response = _clean_up(self.client.obtain_token_by_refresh_token(
1889 refresh_token,
1890 scope=self._decorate_scope(scopes),
1891 headers=telemetry_context.generate_headers(),
1892 rt_getter=lambda rt: rt,
1893 on_updating_rt=False,
1894 on_removing_rt=lambda rt_item: None, # No OP
1895 **kwargs))
1896 if "access_token" in response:
1897 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP
1898 telemetry_context.update_telemetry(response)
1899 return response
1901 def acquire_token_by_username_password(
1902 self, username, password, scopes, claims_challenge=None,
1903 # Note: We shouldn't need to surface enable_msa_passthrough,
1904 # because this ROPC won't work with MSA account anyway.
1905 auth_scheme=None,
1906 **kwargs):
1907 """Gets a token for a given resource via user credentials.
1909 See this page for constraints of Username Password Flow.
1910 https://github.com/AzureAD/microsoft-authentication-library-for-python/wiki/Username-Password-Authentication
1912 :param str username: Typically a UPN in the form of an email address.
1913 :param str password: The password.
1914 :param list[str] scopes:
1915 Scopes requested to access a protected API (a resource).
1916 :param claims_challenge:
1917 The claims_challenge parameter requests specific claims requested by the resource provider
1918 in the form of a claims_challenge directive in the www-authenticate header to be
1919 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
1920 It is a string of a JSON object which contains lists of claims being requested from these locations.
1922 :param object auth_scheme:
1923 You can provide an ``msal.auth_scheme.PopAuthScheme`` object
1924 so that MSAL will get a Proof-of-Possession (POP) token for you.
1926 New in version 1.26.0.
1928 :return: A dict representing the json response from Microsoft Entra:
1930 - A successful response would contain "access_token" key,
1931 - an error response would contain "error" and usually "error_description".
1933 [Deprecated] This API is deprecated for public client flows and will be
1934 removed in a future release. Use a more secure flow instead.
1935 Migration guide: https://aka.ms/msal-ropc-migration
1937 """
1938 is_confidential_app = self.client_credential or isinstance(
1939 self, ConfidentialClientApplication)
1940 if not is_confidential_app:
1941 warnings.warn("""This API has been deprecated for public client flows, please use a more secure flow.
1942 See https://aka.ms/msal-ropc-migration for migration guidance""", DeprecationWarning)
1943 claims = _merge_claims_challenge_and_capabilities(
1944 self._client_capabilities, claims_challenge)
1945 if self._enable_broker and sys.platform in ("win32", "darwin"):
1946 from .broker import _signin_silently
1947 response = _signin_silently(
1948 "https://{}/{}".format(self.authority.instance, self.authority.tenant),
1949 self.client_id,
1950 scopes, # Decorated scopes won't work due to offline_access
1951 MSALRuntime_Username=username,
1952 MSALRuntime_Password=password,
1953 validateAuthority="no" if (
1954 self.authority._is_known_to_developer
1955 or self._instance_discovery is False) else None,
1956 claims=claims,
1957 auth_scheme=auth_scheme,
1958 )
1959 return self._process_broker_response(response, scopes, kwargs.get("data", {}))
1961 if auth_scheme:
1962 raise ValueError(self._AUTH_SCHEME_UNSUPPORTED)
1963 scopes = self._decorate_scope(scopes)
1964 telemetry_context = self._build_telemetry_context(
1965 self.ACQUIRE_TOKEN_BY_USERNAME_PASSWORD_ID)
1966 headers = telemetry_context.generate_headers()
1967 data = dict(kwargs.pop("data", {}), claims=claims)
1968 response = None
1969 if not self.authority.is_adfs:
1970 user_realm_result = self.authority.user_realm_discovery(
1971 username, correlation_id=headers[msal.telemetry.CLIENT_REQUEST_ID])
1972 if user_realm_result.get("account_type") == "Federated":
1973 response = _clean_up(self._acquire_token_by_username_password_federated(
1974 user_realm_result, username, password, scopes=scopes,
1975 data=data,
1976 headers=headers, **kwargs))
1977 if response is None: # Either ADFS or not federated
1978 response = _clean_up(self.client.obtain_token_by_username_password(
1979 username, password, scope=scopes,
1980 headers=headers,
1981 data=data,
1982 **kwargs))
1983 if "access_token" in response:
1984 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP
1985 telemetry_context.update_telemetry(response)
1986 return response
1988 def _acquire_token_by_username_password_federated(
1989 self, user_realm_result, username, password, scopes=None, **kwargs):
1990 wstrust_endpoint = {}
1991 if user_realm_result.get("federation_metadata_url"):
1992 wstrust_endpoint = mex_send_request(
1993 user_realm_result["federation_metadata_url"],
1994 self.http_client)
1995 if wstrust_endpoint is None:
1996 raise ValueError("Unable to find wstrust endpoint from MEX. "
1997 "This typically happens when attempting MSA accounts. "
1998 "More details available here. "
1999 "https://github.com/AzureAD/microsoft-authentication-library-for-python/wiki/Username-Password-Authentication")
2000 logger.debug("wstrust_endpoint = %s", wstrust_endpoint)
2001 wstrust_result = wst_send_request(
2002 username, password,
2003 user_realm_result.get("cloud_audience_urn", "urn:federation:MicrosoftOnline"),
2004 wstrust_endpoint.get("address",
2005 # Fallback to an AAD supplied endpoint
2006 user_realm_result.get("federation_active_auth_url")),
2007 wstrust_endpoint.get("action"), self.http_client)
2008 if not ("token" in wstrust_result and "type" in wstrust_result):
2009 raise RuntimeError("Unsuccessful RSTR. %s" % wstrust_result)
2010 GRANT_TYPE_SAML1_1 = 'urn:ietf:params:oauth:grant-type:saml1_1-bearer'
2011 grant_type = {
2012 SAML_TOKEN_TYPE_V1: GRANT_TYPE_SAML1_1,
2013 SAML_TOKEN_TYPE_V2: self.client.GRANT_TYPE_SAML2,
2014 WSS_SAML_TOKEN_PROFILE_V1_1: GRANT_TYPE_SAML1_1,
2015 WSS_SAML_TOKEN_PROFILE_V2: self.client.GRANT_TYPE_SAML2
2016 }.get(wstrust_result.get("type"))
2017 if not grant_type:
2018 raise RuntimeError(
2019 "RSTR returned unknown token type: %s", wstrust_result.get("type"))
2020 self.client.grant_assertion_encoders.setdefault( # Register a non-standard type
2021 grant_type, self.client.encode_saml_assertion)
2022 return self.client.obtain_token_by_assertion(
2023 wstrust_result["token"], grant_type, scope=scopes,
2024 on_obtaining_tokens=lambda event: self.token_cache.add(dict(
2025 event,
2026 environment=self.authority.instance,
2027 username=username, # Useful in case IDT contains no such info
2028 )),
2029 **kwargs)
2032class PublicClientApplication(ClientApplication): # browser app or mobile app
2034 DEVICE_FLOW_CORRELATION_ID = "_correlation_id"
2035 CONSOLE_WINDOW_HANDLE = object()
2037 def __init__(
2038 self, client_id, client_credential=None,
2039 *,
2040 enable_broker_on_windows=None,
2041 enable_broker_on_mac=None,
2042 enable_broker_on_linux=None,
2043 enable_broker_on_wsl=None,
2044 **kwargs):
2045 """Same as :func:`ClientApplication.__init__`,
2046 except that ``client_credential`` parameter shall remain ``None``.
2048 .. note::
2050 **What is a broker, and why use it?**
2052 A broker is a component installed on your device.
2053 Broker implicitly gives your device an identity. By using a broker,
2054 your device becomes a factor that can satisfy MFA (Multi-factor authentication).
2055 This factor would become mandatory
2056 if a tenant's admin enables a corresponding Conditional Access (CA) policy.
2057 The broker's presence allows Microsoft identity platform
2058 to have higher confidence that the tokens are being issued to your device,
2059 and that is more secure.
2061 An additional benefit of broker is,
2062 it runs as a long-lived process with your device's OS,
2063 and maintains its own cache,
2064 so that your broker-enabled apps (even a CLI)
2065 could automatically SSO from a previously established signed-in session.
2067 **How to opt in to use broker?**
2069 1. You can set any combination of the following opt-in parameters to true:
2071 +--------------------------+-----------------------------------+------------------------------------------------------------------------------------+
2072 | Opt-in flag | If app will run on | App has registered this as a Desktop platform redirect URI in Azure Portal |
2073 +==========================+===================================+====================================================================================+
2074 | enable_broker_on_windows | Windows 10+ | ms-appx-web://Microsoft.AAD.BrokerPlugin/your_client_id |
2075 +--------------------------+-----------------------------------+------------------------------------------------------------------------------------+
2076 | enable_broker_on_wsl | WSL | ms-appx-web://Microsoft.AAD.BrokerPlugin/your_client_id |
2077 +--------------------------+-----------------------------------+------------------------------------------------------------------------------------+
2078 | enable_broker_on_mac | Mac with Company Portal installed | msauth.com.msauth.unsignedapp://auth |
2079 +--------------------------+-----------------------------------+------------------------------------------------------------------------------------+
2080 | enable_broker_on_linux | Linux with Intune installed | ``https://login.microsoftonline.com/common/oauth2/nativeclient`` (MUST be enabled) |
2081 +--------------------------+-----------------------------------+------------------------------------------------------------------------------------+
2083 2. Install broker dependency,
2084 e.g. ``pip install msal[broker]>=1.33,<2``.
2086 3. Test with ``acquire_token_interactive()`` and ``acquire_token_silent()``.
2088 **The fallback behaviors of MSAL Python's broker support**
2090 MSAL will either error out, or silently fallback to non-broker flows.
2092 1. MSAL will ignore the `enable_broker_...` and bypass broker
2093 on those auth flows that are known to be NOT supported by broker.
2094 This includes ADFS, B2C, etc..
2095 For other "could-use-broker" scenarios, please see below.
2096 2. MSAL errors out when app developer opted-in to use broker
2097 but a direct dependency "mid-tier" package is not installed.
2098 Error message guides app developer to declare the correct dependency
2099 ``msal[broker]``.
2100 We error out here because the error is actionable to app developers.
2101 3. MSAL silently "deactivates" the broker and fallback to non-broker,
2102 when opted-in, dependency installed yet failed to initialize.
2103 We anticipate this would happen on a device whose OS is too old
2104 or the underlying broker component is somehow unavailable.
2105 There is not much an app developer or the end user can do here.
2106 Eventually, the conditional access policy shall
2107 force the user to switch to a different device.
2108 4. MSAL errors out when broker is opted in, installed, initialized,
2109 but subsequent token request(s) failed.
2111 :param boolean enable_broker_on_windows:
2112 This setting is only effective if your app is running on Windows 10+.
2113 This parameter defaults to None, which means MSAL will not utilize a broker.
2115 New in MSAL Python 1.25.0.
2117 :param boolean enable_broker_on_mac:
2118 This setting is only effective if your app is running on Mac.
2119 This parameter defaults to None, which means MSAL will not utilize a broker.
2121 New in MSAL Python 1.31.0.
2123 :param boolean enable_broker_on_linux:
2124 This setting is only effective if your app is running on Linux, including WSL.
2125 This parameter defaults to None, which means MSAL will not utilize a broker.
2127 New in MSAL Python 1.33.0.
2129 :param boolean enable_broker_on_wsl:
2130 This setting is only effective if your app is running on WSL.
2131 This parameter defaults to None, which means MSAL will not utilize a broker.
2133 New in MSAL Python 1.33.0.
2134 """
2135 if client_credential is not None:
2136 raise ValueError("Public Client should not possess credentials")
2138 self._enable_broker = bool(
2139 enable_broker_on_windows and sys.platform == "win32"
2140 or enable_broker_on_mac and sys.platform == "darwin"
2141 or enable_broker_on_linux and sys.platform == "linux"
2142 or enable_broker_on_wsl and is_wsl()
2143 )
2145 super(PublicClientApplication, self).__init__(
2146 client_id, client_credential=None, **kwargs)
2148 def acquire_token_interactive(
2149 self,
2150 scopes, # type: list[str]
2151 prompt=None,
2152 login_hint=None, # type: Optional[str]
2153 domain_hint=None, # type: Optional[str]
2154 claims_challenge=None,
2155 timeout=None,
2156 port=None,
2157 extra_scopes_to_consent=None,
2158 max_age=None,
2159 parent_window_handle=None,
2160 on_before_launching_ui=None,
2161 auth_scheme=None,
2162 **kwargs):
2163 """Acquire token interactively i.e. via a local browser.
2165 Prerequisite: In Azure Portal, configure the Redirect URI of your
2166 "Mobile and Desktop application" as ``http://localhost``.
2167 If you opts in to use broker during ``PublicClientApplication`` creation,
2168 your app also need this Redirect URI:
2169 ``ms-appx-web://Microsoft.AAD.BrokerPlugin/YOUR_CLIENT_ID``
2171 :param list scopes:
2172 It is a list of case-sensitive strings.
2173 :param str prompt:
2174 By default, no prompt value will be sent, not even string ``"none"``.
2175 You will have to specify a value explicitly.
2176 Its valid values are the constants defined in
2177 :class:`Prompt <msal.Prompt>`.
2178 :param str login_hint:
2179 Optional. Identifier of the user. Generally a User Principal Name (UPN).
2180 :param domain_hint:
2181 Can be one of "consumers" or "organizations" or your tenant domain "contoso.com".
2182 If included, it will skip the email-based discovery process that user goes
2183 through on the sign-in page, leading to a slightly more streamlined user experience.
2184 More information on possible values available in
2185 `Auth Code Flow doc <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-auth-code-flow#request-an-authorization-code>`_ and
2186 `domain_hint doc <https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-oapx/86fb452d-e34a-494e-ac61-e526e263b6d8>`_.
2188 :param claims_challenge:
2189 The claims_challenge parameter requests specific claims requested by the resource provider
2190 in the form of a claims_challenge directive in the www-authenticate header to be
2191 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
2192 It is a string of a JSON object which contains lists of claims being requested from these locations.
2194 :param int timeout:
2195 This method will block the current thread.
2196 This parameter specifies the timeout value in seconds.
2197 Default value ``None`` means wait indefinitely.
2199 :param int port:
2200 The port to be used to listen to an incoming auth response.
2201 By default we will use a system-allocated port.
2202 (The rest of the redirect_uri is hard coded as ``http://localhost``.)
2204 :param list extra_scopes_to_consent:
2205 "Extra scopes to consent" is a concept only available in Microsoft Entra.
2206 It refers to other resources you might want to prompt to consent for,
2207 in the same interaction, but for which you won't get back a
2208 token for in this particular operation.
2210 :param int max_age:
2211 OPTIONAL. Maximum Authentication Age.
2212 Specifies the allowable elapsed time in seconds
2213 since the last time the End-User was actively authenticated.
2214 If the elapsed time is greater than this value,
2215 Microsoft identity platform will actively re-authenticate the End-User.
2217 MSAL Python will also automatically validate the auth_time in ID token.
2219 New in version 1.15.
2221 :param int parent_window_handle:
2222 OPTIONAL.
2224 * If your app does not opt in to use broker,
2225 you do not need to provide a ``parent_window_handle`` here.
2227 * If your app opts in to use broker,
2228 ``parent_window_handle`` is required.
2230 - If your app is a GUI app running on Windows or Mac system,
2231 you are required to also provide its window handle,
2232 so that the sign-in window will pop up on top of your window.
2233 - If your app is a console app running on Windows or Mac system,
2234 you can use a placeholder
2235 ``PublicClientApplication.CONSOLE_WINDOW_HANDLE``.
2237 Most Python scripts are console apps.
2239 New in version 1.20.0.
2241 :param function on_before_launching_ui:
2242 A callback with the form of
2243 ``lambda ui="xyz", **kwargs: print("A {} will be launched".format(ui))``,
2244 where ``ui`` will be either "browser" or "broker".
2245 You can use it to inform your end user to expect a pop-up window.
2247 New in version 1.20.0.
2249 :param object auth_scheme:
2250 You can provide an ``msal.auth_scheme.PopAuthScheme`` object
2251 so that MSAL will get a Proof-of-Possession (POP) token for you.
2253 New in version 1.26.0.
2255 :return:
2256 - A dict containing no "error" key,
2257 and typically contains an "access_token" key.
2258 - A dict containing an "error" key, when token refresh failed.
2259 """
2260 data = kwargs.pop("data", {})
2261 enable_msa_passthrough = kwargs.pop( # MUST remove it from kwargs
2262 "enable_msa_passthrough", # Keep it as a hidden param, for now.
2263 # OPTIONAL. MSA-Passthrough is a legacy configuration,
2264 # needed by a small amount of Microsoft first-party apps,
2265 # which would login MSA accounts via ".../organizations" authority.
2266 # If you app belongs to this category, AND you are enabling broker,
2267 # you would want to enable this flag. Default value is False.
2268 # More background of MSA-PT is available from this internal docs:
2269 # https://microsoft.sharepoint.com/:w:/t/Identity-DevEx/EatIUauX3c9Ctw1l7AQ6iM8B5CeBZxc58eoQCE0IuZ0VFw?e=tgc3jP&CID=39c853be-76ea-79d7-ee73-f1b2706ede05
2270 False
2271 ) and data.get("token_type") != "ssh-cert" # Work around a known issue as of PyMsalRuntime 0.8
2272 self._validate_ssh_cert_input_data(data)
2273 is_ssh_cert_or_pop_request = _is_ssh_cert_or_pop_request(data.get("token_type"), auth_scheme)
2275 if not on_before_launching_ui:
2276 on_before_launching_ui = lambda **kwargs: None
2277 if _is_running_in_cloud_shell() and prompt == "none":
2278 # Note: _acquire_token_by_cloud_shell() is always silent,
2279 # so we would not fire on_before_launching_ui()
2280 return self._acquire_token_by_cloud_shell(scopes, data=data)
2281 claims = _merge_claims_challenge_and_capabilities(
2282 self._client_capabilities, claims_challenge)
2283 if self._enable_broker and (sys.platform in ("win32", "darwin") or not is_ssh_cert_or_pop_request):
2284 if parent_window_handle is None:
2285 raise ValueError(
2286 "parent_window_handle is required when you opted into using broker. "
2287 "You need to provide the window handle of your GUI application, "
2288 "or use msal.PublicClientApplication.CONSOLE_WINDOW_HANDLE "
2289 "when and only when your application is a console app.")
2290 if extra_scopes_to_consent:
2291 logger.warning(
2292 "Ignoring parameter extra_scopes_to_consent, "
2293 "which is not supported by broker")
2294 response = self._acquire_token_interactive_via_broker(
2295 scopes,
2296 parent_window_handle,
2297 enable_msa_passthrough,
2298 claims,
2299 data,
2300 on_before_launching_ui,
2301 auth_scheme,
2302 prompt=prompt,
2303 login_hint=login_hint,
2304 max_age=max_age,
2305 )
2306 return self._process_broker_response(response, scopes, data)
2308 if isinstance(auth_scheme, msal.auth_scheme.PopAuthScheme) and sys.platform == "linux":
2309 raise ValueError("POP is not supported on Linux")
2310 elif auth_scheme:
2311 raise ValueError(self._AUTH_SCHEME_UNSUPPORTED)
2312 on_before_launching_ui(ui="browser")
2313 telemetry_context = self._build_telemetry_context(
2314 self.ACQUIRE_TOKEN_INTERACTIVE)
2315 response = _clean_up(self.client.obtain_token_by_browser(
2316 scope=self._decorate_scope(scopes) if scopes else None,
2317 extra_scope_to_consent=extra_scopes_to_consent,
2318 redirect_uri="http://localhost:{port}".format(
2319 # Hardcode the host, for now. AAD portal rejects 127.0.0.1 anyway
2320 port=port or 0),
2321 prompt=prompt,
2322 login_hint=login_hint,
2323 max_age=max_age,
2324 timeout=timeout,
2325 auth_params={
2326 "claims": claims,
2327 "domain_hint": domain_hint,
2328 },
2329 data=dict(data, claims=claims),
2330 headers=telemetry_context.generate_headers(),
2331 browser_name=_preferred_browser(),
2332 **kwargs))
2333 if "access_token" in response:
2334 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP
2335 telemetry_context.update_telemetry(response)
2336 return response
2338 def _acquire_token_interactive_via_broker(
2339 self,
2340 scopes, # type: list[str]
2341 parent_window_handle, # type: int
2342 enable_msa_passthrough, # type: boolean
2343 claims, # type: str
2344 data, # type: dict
2345 on_before_launching_ui, # type: callable
2346 auth_scheme, # type: object
2347 prompt=None,
2348 login_hint=None, # type: Optional[str]
2349 max_age=None,
2350 **kwargs):
2351 from .broker import _signin_interactively, _signin_silently, _acquire_token_silently
2352 if "welcome_template" in kwargs:
2353 logger.debug(kwargs["welcome_template"]) # Experimental
2354 authority = "https://{}/{}".format(
2355 self.authority.instance, self.authority.tenant)
2356 validate_authority = "no" if (
2357 self.authority._is_known_to_developer
2358 or self._instance_discovery is False) else None
2359 # Calls different broker methods to mimic the OIDC behaviors
2360 if login_hint and prompt != "select_account": # OIDC prompts when the user did not sign in
2361 accounts = self.get_accounts(username=login_hint)
2362 if len(accounts) == 1: # Unambiguously proceed with this account
2363 logger.debug("Calling broker._acquire_token_silently()")
2364 response = _acquire_token_silently( # When it works, it bypasses prompt
2365 authority,
2366 self.client_id,
2367 accounts[0]["local_account_id"],
2368 scopes,
2369 claims=claims,
2370 auth_scheme=auth_scheme,
2371 **data)
2372 if response and "error" not in response:
2373 return response
2374 # login_hint undecisive or not exists
2375 if prompt == "none" or not prompt: # Must/Can attempt _signin_silently()
2376 logger.debug("Calling broker._signin_silently()")
2377 response = _signin_silently( # Unlike OIDC, it doesn't honor login_hint
2378 authority, self.client_id, scopes,
2379 validateAuthority=validate_authority,
2380 claims=claims,
2381 max_age=max_age,
2382 enable_msa_pt=enable_msa_passthrough,
2383 auth_scheme=auth_scheme,
2384 **data)
2385 is_wrong_account = bool(
2386 # _signin_silently() only gets tokens for default account,
2387 # but this seems to have been fixed in PyMsalRuntime 0.11.2
2388 "access_token" in response and login_hint
2389 and login_hint != response.get(
2390 "id_token_claims", {}).get("preferred_username"))
2391 wrong_account_error_message = (
2392 'prompt="none" will not work for login_hint="non-default-user"')
2393 if is_wrong_account:
2394 logger.debug(wrong_account_error_message)
2395 if prompt == "none":
2396 return response if not is_wrong_account else {
2397 "error": "broker_error",
2398 "error_description": wrong_account_error_message,
2399 }
2400 else:
2401 assert bool(prompt) is False
2402 from pymsalruntime import Response_Status
2403 recoverable_errors = frozenset([
2404 Response_Status.Status_AccountUnusable,
2405 Response_Status.Status_InteractionRequired,
2406 ])
2407 if is_wrong_account or "error" in response and response.get(
2408 "_broker_status") in recoverable_errors:
2409 pass # It will fall back to the _signin_interactively()
2410 else:
2411 return response
2413 logger.debug("Falls back to broker._signin_interactively()")
2414 on_before_launching_ui(ui="broker")
2415 return _signin_interactively(
2416 authority, self.client_id, scopes,
2417 None if parent_window_handle is self.CONSOLE_WINDOW_HANDLE
2418 else parent_window_handle,
2419 validateAuthority=validate_authority,
2420 login_hint=login_hint,
2421 prompt=prompt,
2422 claims=claims,
2423 max_age=max_age,
2424 enable_msa_pt=enable_msa_passthrough,
2425 auth_scheme=auth_scheme,
2426 **data)
2428 def initiate_device_flow(self, scopes=None, *, claims_challenge=None, **kwargs):
2429 """Initiate a Device Flow instance,
2430 which will be used in :func:`~acquire_token_by_device_flow`.
2432 :param list[str] scopes:
2433 Scopes requested to access a protected API (a resource).
2434 :return: A dict representing a newly created Device Flow object.
2436 - A successful response would contain "user_code" key, among others
2437 - an error response would contain some other readable key/value pairs.
2438 """
2439 correlation_id = msal.telemetry._get_new_correlation_id()
2440 flow = self.client.initiate_device_flow(
2441 scope=self._decorate_scope(scopes or []),
2442 headers={msal.telemetry.CLIENT_REQUEST_ID: correlation_id},
2443 data={"claims": _merge_claims_challenge_and_capabilities(
2444 self._client_capabilities, claims_challenge)},
2445 **kwargs)
2446 flow[self.DEVICE_FLOW_CORRELATION_ID] = correlation_id
2447 return flow
2449 def acquire_token_by_device_flow(self, flow, claims_challenge=None, **kwargs):
2450 """Obtain token by a device flow object, with customizable polling effect.
2452 :param dict flow:
2453 A dict previously generated by :func:`~initiate_device_flow`.
2454 By default, this method's polling effect will block current thread.
2455 You can abort the polling loop at any time,
2456 by changing the value of the flow's "expires_at" key to 0.
2457 :param claims_challenge:
2458 The claims_challenge parameter requests specific claims requested by the resource provider
2459 in the form of a claims_challenge directive in the www-authenticate header to be
2460 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
2461 It is a string of a JSON object which contains lists of claims being requested from these locations.
2463 :return: A dict representing the json response from Microsoft Entra:
2465 - A successful response would contain "access_token" key,
2466 - an error response would contain "error" and usually "error_description".
2467 """
2468 telemetry_context = self._build_telemetry_context(
2469 self.ACQUIRE_TOKEN_BY_DEVICE_FLOW_ID,
2470 correlation_id=flow.get(self.DEVICE_FLOW_CORRELATION_ID))
2471 response = _clean_up(self.client.obtain_token_by_device_flow(
2472 flow,
2473 data=dict(
2474 kwargs.pop("data", {}),
2475 code=flow["device_code"], # 2018-10-4 Hack:
2476 # during transition period,
2477 # service seemingly need both device_code and code parameter.
2478 claims=_merge_claims_challenge_and_capabilities(
2479 self._client_capabilities, claims_challenge),
2480 ),
2481 headers=telemetry_context.generate_headers(),
2482 **kwargs))
2483 if "access_token" in response:
2484 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP
2485 telemetry_context.update_telemetry(response)
2486 return response
2489class ConfidentialClientApplication(ClientApplication): # server-side web app
2490 """Same as :func:`ClientApplication.__init__`,
2491 except that ``allow_broker`` parameter shall remain ``None``.
2492 """
2494 def acquire_token_for_client(self, scopes, claims_challenge=None, fmi_path=None, **kwargs):
2495 """Acquires token for the current confidential client, not for an end user.
2497 Since MSAL Python 1.23, it will automatically look for token from cache,
2498 and only send request to Identity Provider when cache misses.
2500 :param list[str] scopes: (Required)
2501 Scopes requested to access a protected API (a resource).
2502 :param claims_challenge:
2503 The claims_challenge parameter requests specific claims requested by the resource provider
2504 in the form of a claims_challenge directive in the www-authenticate header to be
2505 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
2506 It is a string of a JSON object which contains lists of claims being requested from these locations.
2507 :param str fmi_path:
2508 Optional. The Federated Managed Identity (FMI) credential path.
2509 When provided, it is sent as the ``fmi_path`` parameter in the
2510 token request body, and the resulting token is cached separately
2511 so that different FMI paths do not share cached tokens.
2512 Example usage::
2514 result = cca.acquire_token_for_client(
2515 scopes=["api://resource/.default"],
2516 fmi_path="SomeFmiPath/FmiCredentialPath",
2517 )
2518 :return: A dict representing the json response from Microsoft Entra:
2520 - A successful response would contain "access_token" key,
2521 - an error response would contain "error" and usually "error_description".
2522 """
2523 if kwargs.get("force_refresh"):
2524 raise ValueError( # We choose to disallow force_refresh
2525 "Historically, this method does not support force_refresh behavior. "
2526 )
2527 if fmi_path is not None:
2528 if not isinstance(fmi_path, str):
2529 raise ValueError(
2530 "fmi_path must be a string, got {}".format(type(fmi_path).__name__))
2531 kwargs["data"] = kwargs.get("data", {})
2532 kwargs["data"]["fmi_path"] = fmi_path
2533 return _clean_up(self._acquire_token_silent_with_error(
2534 scopes, None, claims_challenge=claims_challenge, **kwargs))
2536 def _acquire_token_for_client(
2537 self,
2538 scopes,
2539 refresh_reason,
2540 claims_challenge=None,
2541 **kwargs
2542 ):
2543 if self.authority.tenant.lower() in ["common", "organizations"]:
2544 warnings.warn(
2545 "Using /common or /organizations authority "
2546 "in acquire_token_for_client() is unreliable. "
2547 "Please use a specific tenant instead.", DeprecationWarning)
2548 self._validate_ssh_cert_input_data(kwargs.get("data", {}))
2549 telemetry_context = self._build_telemetry_context(
2550 self.ACQUIRE_TOKEN_FOR_CLIENT_ID, refresh_reason=refresh_reason)
2551 client = self._regional_client or self.client
2552 response = client.obtain_token_for_client(
2553 scope=scopes, # This grant flow requires no scope decoration
2554 headers=telemetry_context.generate_headers(),
2555 data=dict(
2556 kwargs.pop("data", {}),
2557 claims=_merge_claims_challenge_and_capabilities(
2558 self._client_capabilities, claims_challenge)),
2559 **kwargs)
2560 telemetry_context.update_telemetry(response)
2561 return response
2563 def remove_tokens_for_client(self):
2564 """Remove all tokens that were previously acquired via
2565 :func:`~acquire_token_for_client()` for the current client."""
2566 for env in [self.authority.instance] + self._get_authority_aliases(
2567 self.authority.instance):
2568 for at in list(self.token_cache.search( # Remove ATs from a snapshot
2569 TokenCache.CredentialType.ACCESS_TOKEN, query={
2570 "client_id": self.client_id,
2571 "environment": env,
2572 "home_account_id": None, # These are mostly app-only tokens
2573 })):
2574 self.token_cache.remove_at(at)
2575 # acquire_token_for_client() obtains no RTs, so we have no RT to remove
2577 def acquire_token_on_behalf_of(self, user_assertion, scopes, claims_challenge=None, **kwargs):
2578 """Acquires token using on-behalf-of (OBO) flow.
2580 The current app is a middle-tier service which was called with a token
2581 representing an end user.
2582 The current app can use such token (a.k.a. a user assertion) to request
2583 another token to access downstream web API, on behalf of that user.
2584 See `detail docs here <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-on-behalf-of-flow>`_ .
2586 The current middle-tier app has no user interaction to obtain consent.
2587 See how to gain consent upfront for your middle-tier app from this article.
2588 https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-on-behalf-of-flow#gaining-consent-for-the-middle-tier-application
2590 :param str user_assertion: The incoming token already received by this app
2591 :param list[str] scopes: Scopes required by downstream API (a resource).
2592 :param claims_challenge:
2593 The claims_challenge parameter requests specific claims requested by the resource provider
2594 in the form of a claims_challenge directive in the www-authenticate header to be
2595 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
2596 It is a string of a JSON object which contains lists of claims being requested from these locations.
2598 :return: A dict representing the json response from Microsoft Entra:
2600 - A successful response would contain "access_token" key,
2601 - an error response would contain "error" and usually "error_description".
2602 """
2603 telemetry_context = self._build_telemetry_context(
2604 self.ACQUIRE_TOKEN_ON_BEHALF_OF_ID)
2605 # The implementation is NOT based on Token Exchange (RFC 8693)
2606 response = _clean_up(self.client.obtain_token_by_assertion( # bases on assertion RFC 7521
2607 user_assertion,
2608 self.client.GRANT_TYPE_JWT, # IDTs and AAD ATs are all JWTs
2609 scope=self._decorate_scope(scopes), # Decoration is used for:
2610 # 1. Explicitly requesting an RT, without relying on AAD default
2611 # behavior, even though it currently still issues an RT.
2612 # 2. Requesting an IDT (which would otherwise be unavailable)
2613 # so that the calling app could use id_token_claims to implement
2614 # their own cache mapping, which is likely needed in web apps.
2615 data=dict(
2616 kwargs.pop("data", {}),
2617 requested_token_use="on_behalf_of",
2618 claims=_merge_claims_challenge_and_capabilities(
2619 self._client_capabilities, claims_challenge)),
2620 headers=telemetry_context.generate_headers(),
2621 # TBD: Expose a login_hint (or ccs_routing_hint) param for web app
2622 **kwargs))
2623 if "access_token" in response:
2624 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP
2625 telemetry_context.update_telemetry(response)
2626 return response
2628 def acquire_token_by_user_federated_identity_credential(
2629 self, scopes, assertion, username=None, user_object_id=None,
2630 claims_challenge=None, **kwargs):
2631 """Acquires a user-scoped token using the ``user_fic`` grant type.
2633 This method exchanges a federated identity credential (typically an
2634 agent instance token from Leg 2 of the agent identity protocol) for
2635 a user-scoped access token, enabling an agent to act on behalf of
2636 a specific user.
2638 :param list[str] scopes: Scopes required by downstream API (a resource).
2639 :param str assertion:
2640 The federated identity credential token (e.g. the instance token
2641 obtained from Leg 2 of the agent identity flow).
2642 :param str username:
2643 The target user's UPN (User Principal Name).
2644 Mutually exclusive with ``user_object_id``.
2645 :param str user_object_id:
2646 The target user's Object ID.
2647 Mutually exclusive with ``username``.
2648 :param claims_challenge:
2649 The claims_challenge parameter requests specific claims requested by the resource provider
2650 in the form of a claims_challenge directive in the www-authenticate header to be
2651 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
2652 It is a string of a JSON object which contains lists of claims being requested from these locations.
2654 :return: A dict representing the json response from Microsoft Entra:
2656 - A successful response would contain "access_token" key,
2657 - an error response would contain "error" and usually "error_description".
2658 """
2659 # Input validation
2660 if not assertion:
2661 raise ValueError("assertion is required and must be non-empty")
2662 if not username and not user_object_id:
2663 raise ValueError(
2664 "Either username or user_object_id must be provided")
2665 if username and user_object_id:
2666 raise ValueError(
2667 "username and user_object_id are mutually exclusive")
2669 telemetry_context = self._build_telemetry_context(
2670 self.ACQUIRE_TOKEN_BY_USER_FIC_ID)
2671 headers = telemetry_context.generate_headers()
2672 if username:
2673 headers["X-AnchorMailbox"] = "upn:{}".format(username)
2674 elif user_object_id:
2675 headers["X-AnchorMailbox"] = "Oid:{}@{}".format(
2676 user_object_id, self.authority.tenant)
2677 response = _clean_up(self.client.obtain_token_by_user_fic(
2678 scope=self._decorate_scope(scopes),
2679 assertion=assertion,
2680 username=username,
2681 user_object_id=user_object_id,
2682 headers=headers,
2683 data=dict(
2684 kwargs.pop("data", {}),
2685 claims=_merge_claims_challenge_and_capabilities(
2686 self._client_capabilities, claims_challenge)),
2687 **kwargs))
2688 if "access_token" in response:
2689 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP
2690 telemetry_context.update_telemetry(response)
2691 return response