Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/msal/application.py: 16%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import functools
2import json
3import time
4import logging
5import sys
6import warnings
7from threading import Lock
8from typing import Optional # Needed in Python 3.7 & 3.8
9from urllib.parse import urlparse
10import os
12from .oauth2cli import Client, JwtAssertionCreator
13from .oauth2cli.oidc import decode_part
14from .authority import (
15 Authority,
16 WORLD_WIDE,
17 _get_instance_discovery_endpoint,
18 _get_instance_discovery_host,
19)
20from .mex import send_request as mex_send_request
21from .wstrust_request import send_request as wst_send_request
22from .wstrust_response import *
23from .token_cache import TokenCache, _get_username, _GRANT_TYPE_BROKER
24import msal.telemetry
25from .region import _detect_region
26from .throttled_http_client import ThrottledHttpClient
27from .cloudshell import _is_running_in_cloud_shell
28from .sku import SKU, __version__
29from .oauth2cli.authcode import is_wsl
32logger = logging.getLogger(__name__)
33_AUTHORITY_TYPE_CLOUDSHELL = "CLOUDSHELL"
35def _init_broker(enable_pii_log): # Make it a function to allow mocking
36 from . import broker # Trigger Broker's initialization, lazily
37 if enable_pii_log:
38 broker._enable_pii_log()
40def extract_certs(public_cert_content):
41 # Parses raw public certificate file contents and returns a list of strings
42 # Usage: headers = {"x5c": extract_certs(open("my_cert.pem").read())}
43 public_certificates = re.findall(
44 r'-----BEGIN CERTIFICATE-----(?P<cert_value>[^-]+)-----END CERTIFICATE-----',
45 public_cert_content, re.I)
46 if public_certificates:
47 return [cert.strip() for cert in public_certificates]
48 # The public cert tags are not found in the input,
49 # let's make best effort to exclude a private key pem file.
50 if "PRIVATE KEY" in public_cert_content:
51 raise ValueError(
52 "We expect your public key but detect a private key instead")
53 return [public_cert_content.strip()]
56def _merge_claims_challenge_and_capabilities(capabilities, claims_challenge):
57 # Represent capabilities as {"access_token": {"xms_cc": {"values": capabilities}}}
58 # and then merge/add it into incoming claims
59 if not capabilities:
60 return claims_challenge
61 claims_dict = json.loads(claims_challenge) if claims_challenge else {}
62 for key in ["access_token"]: # We could add "id_token" if we'd decide to
63 claims_dict.setdefault(key, {}).update(xms_cc={"values": capabilities})
64 return json.dumps(claims_dict)
67def _str2bytes(raw):
68 # A conversion based on duck-typing rather than six.text_type
69 try:
70 return raw.encode(encoding="utf-8")
71 except:
72 return raw
74def _extract_cert_and_thumbprints(cert):
75 # Cert concepts https://security.stackexchange.com/a/226758/125264
76 from cryptography.hazmat.primitives import hashes, serialization
77 cert_pem = cert.public_bytes( # Requires cryptography 1.0+
78 encoding=serialization.Encoding.PEM).decode()
79 x5c = [
80 '\n'.join(
81 cert_pem.splitlines()
82 [1:-1] # Strip the "--- header ---" and "--- footer ---"
83 )
84 ]
85 # https://cryptography.io/en/latest/x509/reference/#x-509-certificate-object - Requires cryptography 0.7+
86 sha256_thumbprint = cert.fingerprint(hashes.SHA256()).hex()
87 sha1_thumbprint = cert.fingerprint(hashes.SHA1()).hex() # CodeQL [SM02167] for legacy support such as ADFS
88 return sha256_thumbprint, sha1_thumbprint, x5c
90def _parse_pfx(pfx_path, passphrase_bytes):
91 # Cert concepts https://security.stackexchange.com/a/226758/125264
92 from cryptography.hazmat.primitives.serialization import pkcs12
93 with open(pfx_path, 'rb') as f:
94 private_key, cert, _ = pkcs12.load_key_and_certificates( # cryptography 2.5+
95 # https://cryptography.io/en/latest/hazmat/primitives/asymmetric/serialization/#cryptography.hazmat.primitives.serialization.pkcs12.load_key_and_certificates
96 f.read(), passphrase_bytes)
97 if not (private_key and cert):
98 raise ValueError("Your PFX file shall contain both private key and cert")
99 sha256_thumbprint, sha1_thumbprint, x5c = _extract_cert_and_thumbprints(cert)
100 return private_key, sha256_thumbprint, sha1_thumbprint, x5c
103def _load_private_key_from_pem_str(private_key_pem_str, passphrase_bytes):
104 from cryptography.hazmat.primitives import serialization
105 from cryptography.hazmat.backends import default_backend
106 return serialization.load_pem_private_key( # cryptography 0.6+
107 _str2bytes(private_key_pem_str),
108 passphrase_bytes,
109 backend=default_backend(), # It was a required param until 2020
110 )
113def _pii_less_home_account_id(home_account_id):
114 parts = home_account_id.split(".") # It could contain one or two parts
115 parts[0] = "********"
116 return ".".join(parts)
119def _clean_up(result):
120 if isinstance(result, dict):
121 if "_msalruntime_telemetry" in result or "_msal_python_telemetry" in result:
122 result["msal_telemetry"] = json.dumps({ # Telemetry as an opaque string
123 "msalruntime_telemetry": result.get("_msalruntime_telemetry"),
124 "msal_python_telemetry": result.get("_msal_python_telemetry"),
125 }, separators=(",", ":"))
126 return_value = {
127 k: result[k] for k in result
128 if k != "refresh_in" # MSAL handled refresh_in, customers need not
129 and not k.startswith('_') # Skim internal properties
130 }
131 if "refresh_in" in result: # To encourage proactive refresh
132 return_value["refresh_on"] = int(time.time() + result["refresh_in"])
133 return return_value
134 return result # It could be None
137def _preferred_browser():
138 """Register Edge and return a name suitable for subsequent webbrowser.get(...)
139 when appropriate. Otherwise return None.
140 """
141 # On Linux, only Edge will provide device-based Conditional Access support
142 if sys.platform != "linux": # On other platforms, we have no browser preference
143 return None
144 browser_path = "/usr/bin/microsoft-edge" # Use a full path owned by sys admin
145 # Note: /usr/bin/microsoft-edge, /usr/bin/microsoft-edge-stable, etc.
146 # are symlinks that point to the actual binaries which are found under
147 # /opt/microsoft/msedge/msedge or /opt/microsoft/msedge-beta/msedge.
148 # Either method can be used to detect an Edge installation.
149 user_has_no_preference = "BROWSER" not in os.environ
150 user_wont_mind_edge = "microsoft-edge" in os.environ.get("BROWSER", "") # Note:
151 # BROWSER could contain "microsoft-edge" or "/path/to/microsoft-edge".
152 # Python documentation (https://docs.python.org/3/library/webbrowser.html)
153 # does not document the name being implicitly register,
154 # so there is no public API to know whether the ENV VAR browser would work.
155 # Therefore, we would not bother examine the env var browser's type.
156 # We would just register our own Edge instance.
157 if (user_has_no_preference or user_wont_mind_edge) and os.path.exists(browser_path):
158 try:
159 import webbrowser # Lazy import. Some distro may not have this.
160 browser_name = "msal-edge" # Avoid popular name "microsoft-edge"
161 # otherwise `BROWSER="microsoft-edge"; webbrowser.get("microsoft-edge")`
162 # would return a GenericBrowser instance which won't work.
163 try:
164 registration_available = isinstance(
165 webbrowser.get(browser_name), webbrowser.BackgroundBrowser)
166 except webbrowser.Error:
167 registration_available = False
168 if not registration_available:
169 logger.debug("Register %s with %s", browser_name, browser_path)
170 # By registering our own browser instance with our own name,
171 # rather than populating a process-wide BROWSER enn var,
172 # this approach does not have side effect on non-MSAL code path.
173 webbrowser.register( # Even double-register happens to work fine
174 browser_name, None, webbrowser.BackgroundBrowser(browser_path))
175 return browser_name
176 except ImportError:
177 pass # We may still proceed
178 return None
180def _is_ssh_cert_or_pop_request(token_type, auth_scheme) -> bool:
181 return token_type == "ssh-cert" or token_type == "pop" or isinstance(auth_scheme, msal.auth_scheme.PopAuthScheme)
183class _ClientWithCcsRoutingInfo(Client):
185 def initiate_auth_code_flow(self, **kwargs):
186 if kwargs.get("login_hint"): # eSTS could have utilized this as-is, but nope
187 kwargs["X-AnchorMailbox"] = "UPN:%s" % kwargs["login_hint"]
188 return super(_ClientWithCcsRoutingInfo, self).initiate_auth_code_flow(
189 client_info=1, # To be used as CSS Routing info
190 **kwargs)
192 def obtain_token_by_auth_code_flow(
193 self, auth_code_flow, auth_response, **kwargs):
194 # Note: the obtain_token_by_browser() is also covered by this
195 assert isinstance(auth_code_flow, dict) and isinstance(auth_response, dict)
196 headers = kwargs.pop("headers", {})
197 client_info = json.loads(
198 decode_part(auth_response["client_info"])
199 ) if auth_response.get("client_info") else {}
200 if "uid" in client_info and "utid" in client_info:
201 # Note: The value of X-AnchorMailbox is also case-insensitive
202 headers["X-AnchorMailbox"] = "Oid:{uid}@{utid}".format(**client_info)
203 return super(_ClientWithCcsRoutingInfo, self).obtain_token_by_auth_code_flow(
204 auth_code_flow, auth_response, headers=headers, **kwargs)
206 def obtain_token_by_username_password(self, username, password, **kwargs):
207 headers = kwargs.pop("headers", {})
208 headers["X-AnchorMailbox"] = "upn:{}".format(username)
209 return super(_ClientWithCcsRoutingInfo, self).obtain_token_by_username_password(
210 username, password, headers=headers, **kwargs)
213def _msal_extension_check():
214 # Can't run this in module or class level otherwise you'll get circular import error
215 try:
216 from msal_extensions import __version__ as v
217 major, minor, _ = v.split(".", maxsplit=3)
218 if not (int(major) >= 1 and int(minor) >= 2):
219 warnings.warn(
220 "Please upgrade msal-extensions. "
221 "Only msal-extensions 1.2+ can work with msal 1.30+")
222 except ImportError:
223 pass # The optional msal_extensions is not installed. Business as usual.
224 except ValueError:
225 logger.exception(f"msal_extensions version {v} not in major.minor.patch format")
226 except:
227 logger.exception(
228 "Unable to import msal_extensions during an optional check. "
229 "This exception can be safely ignored."
230 )
233class ClientApplication(object):
234 """You do not usually directly use this class. Use its subclasses instead:
235 :class:`PublicClientApplication` and :class:`ConfidentialClientApplication`.
236 """
237 ACQUIRE_TOKEN_SILENT_ID = "84"
238 ACQUIRE_TOKEN_BY_REFRESH_TOKEN = "85"
239 ACQUIRE_TOKEN_BY_USERNAME_PASSWORD_ID = "301"
240 ACQUIRE_TOKEN_ON_BEHALF_OF_ID = "523"
241 ACQUIRE_TOKEN_BY_DEVICE_FLOW_ID = "622"
242 ACQUIRE_TOKEN_FOR_CLIENT_ID = "730"
243 ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID = "832"
244 ACQUIRE_TOKEN_INTERACTIVE = "169"
245 GET_ACCOUNTS_ID = "902"
246 REMOVE_ACCOUNT_ID = "903"
248 ATTEMPT_REGION_DISCOVERY = True # "TryAutoDetect"
249 DISABLE_MSAL_FORCE_REGION = False # Used in azure_region to disable MSAL_FORCE_REGION behavior
250 _TOKEN_SOURCE = "token_source"
251 _TOKEN_SOURCE_IDP = "identity_provider"
252 _TOKEN_SOURCE_CACHE = "cache"
253 _TOKEN_SOURCE_BROKER = "broker"
255 _enable_broker = False
256 _AUTH_SCHEME_UNSUPPORTED = (
257 "auth_scheme is currently only available from broker. "
258 "You can enable broker by following these instructions. "
259 "https://msal-python.readthedocs.io/en/latest/#publicclientapplication")
261 def __init__(
262 self, client_id,
263 client_credential=None, authority=None, validate_authority=True,
264 token_cache=None,
265 http_client=None,
266 verify=True, proxies=None, timeout=None,
267 client_claims=None, app_name=None, app_version=None,
268 client_capabilities=None,
269 azure_region=None, # Note: We choose to add this param in this base class,
270 # despite it is currently only needed by ConfidentialClientApplication.
271 # This way, it holds the same positional param place for PCA,
272 # when we would eventually want to add this feature to PCA in future.
273 exclude_scopes=None,
274 http_cache=None,
275 instance_discovery=None,
276 allow_broker=None,
277 enable_pii_log=None,
278 oidc_authority=None,
279 ):
280 """Create an instance of application.
282 :param str client_id: Your app has a client_id after you register it on Microsoft Entra admin center.
284 :param client_credential:
285 For :class:`PublicClientApplication`, you use `None` here.
287 For :class:`ConfidentialClientApplication`,
288 it supports many different input formats for different scenarios.
290 .. admonition:: Support using a client secret.
292 Just feed in a string, such as ``"your client secret"``.
294 .. admonition:: Support using a certificate in X.509 (.pem) format
296 Deprecated because it uses SHA-1 thumbprint,
297 unless you are still using ADFS which supports SHA-1 thumbprint only.
298 Please use the .pfx option documented later in this page.
300 Feed in a dict in this form::
302 {
303 "private_key": "...-----BEGIN PRIVATE KEY-----... in PEM format",
304 "thumbprint": "An SHA-1 thumbprint such as A1B2C3D4E5F6..."
305 "Changed in version 1.35.0, if thumbprint is absent"
306 "and a public_certificate is present, MSAL will"
307 "automatically calculate an SHA-256 thumbprint instead.",
308 "passphrase": "Needed if the private_key is encrypted (Added in version 1.6.0)",
309 "public_certificate": "...-----BEGIN CERTIFICATE-----...", # Needed if you use Subject Name/Issuer auth. Added in version 0.5.0.
310 }
312 MSAL Python requires a "private_key" in PEM format.
313 If your cert is in PKCS12 (.pfx) format,
314 you can convert it to X.509 (.pem) format,
315 by ``openssl pkcs12 -in file.pfx -out file.pem -nodes``.
317 The thumbprint is available in your app's registration in Azure Portal.
318 Alternatively, you can `calculate the thumbprint <https://github.com/Azure/azure-sdk-for-python/blob/07d10639d7e47f4852eaeb74aef5d569db499d6e/sdk/identity/azure-identity/azure/identity/_credentials/certificate.py#L94-L97>`_.
320 ``public_certificate`` (optional) is public key certificate
321 which will be sent through 'x5c' JWT header.
322 This is useful when you use `Subject Name/Issuer Authentication
323 <https://github.com/AzureAD/microsoft-authentication-library-for-python/issues/60>`_
324 which is an approach to allow easier certificate rotation.
325 Per `specs <https://tools.ietf.org/html/rfc7515#section-4.1.6>`_,
326 "the certificate containing
327 the public key corresponding to the key used to digitally sign the
328 JWS MUST be the first certificate. This MAY be followed by
329 additional certificates, with each subsequent certificate being the
330 one used to certify the previous one."
331 However, your certificate's issuer may use a different order.
332 So, if your attempt ends up with an error AADSTS700027 -
333 "The provided signature value did not match the expected signature value",
334 you may try use only the leaf cert (in PEM/str format) instead.
336 .. admonition:: Supporting raw assertion obtained from elsewhere
338 *Added in version 1.13.0*:
339 It can also be a completely pre-signed assertion that you've assembled yourself.
340 Simply pass a container containing only the key "client_assertion", like this::
342 {
343 "client_assertion": "...a JWT with claims aud, exp, iss, jti, nbf, and sub..."
344 }
346 .. admonition:: Supporting reading client certificates from PFX files
348 This usage will automatically use SHA-256 thumbprint of the certificate.
350 *Added in version 1.29.0*:
351 Feed in a dictionary containing the path to a PFX file::
353 {
354 "private_key_pfx_path": "/path/to/your.pfx", # Added in version 1.29.0
355 "public_certificate": True, # Only needed if you use Subject Name/Issuer auth. Added in version 1.30.0
356 "passphrase": "Passphrase if the private_key is encrypted (Optional)",
357 }
359 The following command will generate a .pfx file from your .key and .pem file::
361 openssl pkcs12 -export -out certificate.pfx -inkey privateKey.key -in certificate.pem
363 `Subject Name/Issuer Auth
364 <https://github.com/AzureAD/microsoft-authentication-library-for-python/issues/60>`_
365 is an approach to allow easier certificate rotation.
366 If your .pfx file contains both the private key and public cert,
367 you can opt in for Subject Name/Issuer Auth by setting "public_certificate" to ``True``.
369 :type client_credential: Union[dict, str, None]
371 :param dict client_claims:
372 *Added in version 0.5.0*:
373 It is a dictionary of extra claims that would be signed by
374 by this :class:`ConfidentialClientApplication` 's private key.
375 For example, you can use {"client_ip": "x.x.x.x"}.
376 You may also override any of the following default claims::
378 {
379 "aud": the_token_endpoint,
380 "iss": self.client_id,
381 "sub": same_as_issuer,
382 "exp": now + 10_min,
383 "iat": now,
384 "jti": a_random_uuid
385 }
387 :param str authority:
388 A URL that identifies a token authority. It should be of the format
389 ``https://login.microsoftonline.com/your_tenant``
390 By default, we will use ``https://login.microsoftonline.com/common``
392 *Changed in version 1.17*: you can also use predefined constant
393 and a builder like this::
395 from msal.authority import (
396 AuthorityBuilder,
397 AZURE_US_GOVERNMENT, AZURE_CHINA, AZURE_PUBLIC)
398 my_authority = AuthorityBuilder(AZURE_PUBLIC, "contoso.onmicrosoft.com")
399 # Now you get an equivalent of
400 # "https://login.microsoftonline.com/contoso.onmicrosoft.com"
402 # You can feed such an authority to msal's ClientApplication
403 from msal import PublicClientApplication
404 app = PublicClientApplication("my_client_id", authority=my_authority, ...)
406 :param bool validate_authority: (optional) Turns authority validation
407 on or off. This parameter default to true.
408 :param TokenCache token_cache:
409 Sets the token cache used by this ClientApplication instance.
410 By default, an in-memory cache will be created and used.
411 :param http_client: (optional)
412 Your implementation of abstract class HttpClient <msal.oauth2cli.http.http_client>
413 Defaults to a requests session instance.
414 Since MSAL 1.11.0, the default session would be configured
415 to attempt one retry on connection error.
416 If you are providing your own http_client,
417 it will be your http_client's duty to decide whether to perform retry.
419 :param verify: (optional)
420 It will be passed to the
421 `verify parameter in the underlying requests library
422 <http://docs.python-requests.org/en/v2.9.1/user/advanced/#ssl-cert-verification>`_
423 This does not apply if you have chosen to pass your own Http client
424 :param proxies: (optional)
425 It will be passed to the
426 `proxies parameter in the underlying requests library
427 <http://docs.python-requests.org/en/v2.9.1/user/advanced/#proxies>`_
428 This does not apply if you have chosen to pass your own Http client
429 :param timeout: (optional)
430 It will be passed to the
431 `timeout parameter in the underlying requests library
432 <http://docs.python-requests.org/en/v2.9.1/user/advanced/#timeouts>`_
433 This does not apply if you have chosen to pass your own Http client
434 :param app_name: (optional)
435 You can provide your application name for Microsoft telemetry purposes.
436 Default value is None, means it will not be passed to Microsoft.
437 :param app_version: (optional)
438 You can provide your application version for Microsoft telemetry purposes.
439 Default value is None, means it will not be passed to Microsoft.
440 :param list[str] client_capabilities: (optional)
441 Allows configuration of one or more client capabilities, e.g. ["CP1"].
443 Client capability is meant to inform the Microsoft identity platform
444 (STS) what this client is capable for,
445 so STS can decide to turn on certain features.
446 For example, if client is capable to handle *claims challenge*,
447 STS may issue
448 `Continuous Access Evaluation (CAE) <https://learn.microsoft.com/entra/identity/conditional-access/concept-continuous-access-evaluation>`_
449 access tokens to resources,
450 knowing that when the resource emits a *claims challenge*
451 the client will be able to handle those challenges.
453 Implementation details:
454 Client capability is implemented using "claims" parameter on the wire,
455 for now.
456 MSAL will combine them into
457 `claims parameter <https://openid.net/specs/openid-connect-core-1_0-final.html#ClaimsParameter>`_
458 which you will later provide via one of the acquire-token request.
460 :param str azure_region: (optional)
461 Instructs MSAL to use the Entra regional token service. This legacy feature is only available to
462 first-party applications. Only ``acquire_token_for_client()`` is supported.
464 Supports 4 values:
466 1. ``azure_region=None`` - This default value means no region is configured.
467 MSAL will use the region defined in env var ``MSAL_FORCE_REGION``.
468 2. ``azure_region="some_region"`` - meaning the specified region is used.
469 3. ``azure_region=True`` - meaning
470 MSAL will try to auto-detect the region. This is not recommended.
471 4. ``azure_region=False`` - meaning MSAL will use no region.
473 .. note::
474 Region auto-discovery has been tested on VMs and on Azure Functions. It is unreliable.
475 Applications using this option should configure a short timeout.
477 For more details and for the values of the region string
478 see https://learn.microsoft.com/entra/msal/dotnet/resources/region-discovery-troubleshooting
480 New in version 1.12.0.
482 :param list[str] exclude_scopes: (optional)
483 Historically MSAL hardcodes `offline_access` scope,
484 which would allow your app to have prolonged access to user's data.
485 If that is unnecessary or undesirable for your app,
486 now you can use this parameter to supply an exclusion list of scopes,
487 such as ``exclude_scopes = ["offline_access"]``.
489 :param dict http_cache:
490 MSAL has long been caching tokens in the ``token_cache``.
491 Recently, MSAL also introduced a concept of ``http_cache``,
492 by automatically caching some finite amount of non-token http responses,
493 so that *long-lived*
494 ``PublicClientApplication`` and ``ConfidentialClientApplication``
495 would be more performant and responsive in some situations.
497 This ``http_cache`` parameter accepts any dict-like object.
498 If not provided, MSAL will use an in-memory dict.
500 If your app is a command-line app (CLI),
501 you would want to persist your http_cache across different CLI runs.
502 The persisted file's format may change due to, but not limited to,
503 `unstable protocol <https://docs.python.org/3/library/pickle.html#data-stream-format>`_,
504 so your implementation shall tolerate unexpected loading errors.
505 The following recipe shows a way to do so::
507 # Just add the following lines at the beginning of your CLI script
508 import sys, atexit, pickle, logging
509 http_cache_filename = sys.argv[0] + ".http_cache"
510 try:
511 with open(http_cache_filename, "rb") as f:
512 persisted_http_cache = pickle.load(f) # Take a snapshot
513 except (
514 FileNotFoundError, # Or IOError in Python 2
515 pickle.UnpicklingError, # A corrupted http cache file
516 AttributeError, # Cache created by a different version of MSAL
517 ):
518 persisted_http_cache = {} # Recover by starting afresh
519 except: # Unexpected exceptions
520 logging.exception("You may want to debug this")
521 persisted_http_cache = {} # Recover by starting afresh
522 atexit.register(lambda: pickle.dump(
523 # When exit, flush it back to the file.
524 # It may occasionally overwrite another process's concurrent write,
525 # but that is fine. Subsequent runs will reach eventual consistency.
526 persisted_http_cache, open(http_cache_file, "wb")))
528 # And then you can implement your app as you normally would
529 app = msal.PublicClientApplication(
530 "your_client_id",
531 ...,
532 http_cache=persisted_http_cache, # Utilize persisted_http_cache
533 ...,
534 #token_cache=..., # You may combine the old token_cache trick
535 # Please refer to token_cache recipe at
536 # https://msal-python.readthedocs.io/en/latest/#msal.SerializableTokenCache
537 )
538 app.acquire_token_interactive(["your", "scope"], ...)
540 Content inside ``http_cache`` are cheap to obtain.
541 There is no need to share them among different apps.
543 Content inside ``http_cache`` will contain no tokens nor
544 Personally Identifiable Information (PII). Encryption is unnecessary.
546 New in version 1.16.0.
548 :param boolean instance_discovery:
549 Historically, MSAL would connect to a central endpoint located at
550 ``https://login.microsoftonline.com`` to acquire some metadata,
551 especially when using an unfamiliar authority.
552 This behavior is known as Instance Discovery.
554 This parameter defaults to None, which enables the Instance Discovery.
556 If you know some authorities which you allow MSAL to operate with as-is,
557 without involving any Instance Discovery, the recommended pattern is::
559 known_authorities = frozenset([ # Treat your known authorities as const
560 "https://contoso.com/adfs", "https://login.azs/foo"])
561 ...
562 authority = "https://contoso.com/adfs" # Assuming your app will use this
563 app1 = PublicClientApplication(
564 "client_id",
565 authority=authority,
566 # Conditionally disable Instance Discovery for known authorities
567 instance_discovery=authority not in known_authorities,
568 )
570 If you do not know some authorities beforehand,
571 yet still want MSAL to accept any authority that you will provide,
572 you can use a ``False`` to unconditionally disable Instance Discovery.
574 New in version 1.19.0.
576 :param boolean allow_broker:
577 Deprecated. Please use ``enable_broker_on_windows`` instead.
579 :param boolean enable_pii_log:
580 When enabled, logs may include PII (Personal Identifiable Information).
581 This can be useful in troubleshooting broker behaviors.
582 The default behavior is False.
584 New in version 1.24.0.
586 :param str oidc_authority:
587 *Added in version 1.28.0*:
588 It is a URL that identifies an OpenID Connect (OIDC) authority of
589 the format ``https://contoso.com/tenant``.
590 MSAL will append ".well-known/openid-configuration" to the authority
591 and retrieve the OIDC metadata from there, to figure out the endpoints.
593 Note: Broker will NOT be used for OIDC authority.
594 """
595 self.client_id = client_id
596 self.client_credential = client_credential
597 self.client_claims = client_claims
598 self._client_capabilities = client_capabilities
599 self._instance_discovery = instance_discovery
601 if exclude_scopes and not isinstance(exclude_scopes, list):
602 raise ValueError(
603 "Invalid exclude_scopes={}. It need to be a list of strings.".format(
604 repr(exclude_scopes)))
605 self._exclude_scopes = frozenset(exclude_scopes or [])
606 if "openid" in self._exclude_scopes:
607 raise ValueError(
608 'Invalid exclude_scopes={}. You can not opt out "openid" scope'.format(
609 repr(exclude_scopes)))
611 if http_client:
612 self.http_client = http_client
613 else:
614 import requests # Lazy load
616 self.http_client = requests.Session()
617 self.http_client.verify = verify
618 self.http_client.proxies = proxies
619 # Requests, does not support session - wide timeout
620 # But you can patch that (https://github.com/psf/requests/issues/3341):
621 self.http_client.request = functools.partial(
622 self.http_client.request, timeout=timeout)
624 # Enable a minimal retry. Better than nothing.
625 # https://github.com/psf/requests/blob/v2.25.1/requests/adapters.py#L94-L108
626 a = requests.adapters.HTTPAdapter(max_retries=1)
627 self.http_client.mount("http://", a)
628 self.http_client.mount("https://", a)
629 self.http_client = ThrottledHttpClient(
630 self.http_client,
631 http_cache=http_cache,
632 default_throttle_time=60
633 # The default value 60 was recommended mainly for PCA at the end of
634 # https://identitydivision.visualstudio.com/devex/_git/AuthLibrariesApiReview?version=GBdev&path=%2FService%20protection%2FIntial%20set%20of%20protection%20measures.md&_a=preview
635 if isinstance(self, PublicClientApplication) else 5,
636 )
638 self.app_name = app_name
639 self.app_version = app_version
641 # Here the self.authority will not be the same type as authority in input
642 if oidc_authority and authority:
643 raise ValueError("You can not provide both authority and oidc_authority")
644 if isinstance(authority, str) and urlparse(authority).path.startswith(
645 "/dstsv2"): # dSTS authority's path always starts with "/dstsv2"
646 oidc_authority = authority # So we treat it as if an oidc_authority
647 try:
648 authority_to_use = authority or "https://{}/common/".format(WORLD_WIDE)
649 self.authority = Authority(
650 authority_to_use,
651 self.http_client,
652 validate_authority=validate_authority,
653 instance_discovery=self._instance_discovery,
654 oidc_authority_url=oidc_authority,
655 )
656 except ValueError: # Those are explicit authority validation errors
657 raise
658 except Exception: # The rest are typically connection errors
659 if validate_authority and not oidc_authority and (
660 azure_region # Opted in to use region
661 or (azure_region is None and os.getenv("MSAL_FORCE_REGION")) # Will use region
662 ):
663 # Since caller opts in to use region, here we tolerate connection
664 # errors happened during authority validation at non-region endpoint
665 self.authority = Authority(
666 authority_to_use,
667 self.http_client,
668 instance_discovery=False,
669 )
670 else:
671 raise
673 self._decide_broker(allow_broker, enable_pii_log)
674 self.token_cache = token_cache or TokenCache()
675 self._region_configured = azure_region
676 self._region_detected = None
677 self.client, self._regional_client = self._build_client(
678 client_credential, self.authority)
679 self.authority_groups = {}
680 self._telemetry_buffer = {}
681 self._telemetry_lock = Lock()
682 _msal_extension_check()
685 def _decide_broker(self, allow_broker, enable_pii_log):
686 is_confidential_app = self.client_credential or isinstance(
687 self, ConfidentialClientApplication)
688 if is_confidential_app and allow_broker:
689 raise ValueError("allow_broker=True is only supported in PublicClientApplication")
690 # Historically, we chose to support ClientApplication("client_id", allow_broker=True)
691 if allow_broker:
692 warnings.warn(
693 "allow_broker is deprecated. "
694 "Please use PublicClientApplication(..., "
695 "enable_broker_on_windows=True, "
696 # No need to mention non-Windows platforms, because allow_broker is only for Windows
697 "...)",
698 DeprecationWarning)
699 opted_in_for_broker = (
700 self._enable_broker # True means Opted-in from PCA
701 or (
702 # When we started the broker project on Windows platform,
703 # the allow_broker was meant to be cross-platform. Now we realize
704 # that other platforms have different redirect_uri requirements,
705 # so the old allow_broker is deprecated and will only for Windows.
706 allow_broker and sys.platform == "win32")
707 )
708 self._enable_broker = ( # This same variable will also store the state
709 opted_in_for_broker
710 and not is_confidential_app
711 and not self.authority.is_adfs
712 and not self.authority._is_b2c
713 )
714 if self._enable_broker:
715 try:
716 _init_broker(enable_pii_log)
717 except RuntimeError:
718 self._enable_broker = False
719 logger.warning( # It is common on Mac and Linux where broker is not built-in
720 "Broker is unavailable on this platform. "
721 "We will fallback to non-broker.")
722 logger.debug("Broker enabled? %s", self._enable_broker)
724 def is_pop_supported(self):
725 """Returns True if this client supports Proof-of-Possession Access Token."""
726 return self._enable_broker and sys.platform in ("win32", "darwin")
728 def _decorate_scope(
729 self, scopes,
730 reserved_scope=frozenset(['openid', 'profile', 'offline_access'])):
731 if not isinstance(scopes, (list, set, tuple)):
732 raise ValueError("The input scopes should be a list, tuple, or set")
733 scope_set = set(scopes) # Input scopes is typically a list. Copy it to a set.
734 if scope_set & reserved_scope:
735 # These scopes are reserved for the API to provide good experience.
736 # We could make the developer pass these and then if they do they will
737 # come back asking why they don't see refresh token or user information.
738 raise ValueError(
739 """You cannot use any scope value that is reserved.
740Your input: {}
741The reserved list: {}""".format(list(scope_set), list(reserved_scope)))
742 raise ValueError(
743 "You cannot use any scope value that is in this reserved list: {}".format(
744 list(reserved_scope)))
746 # client_id can also be used as a scope in B2C
747 decorated = scope_set | reserved_scope
748 decorated -= self._exclude_scopes
749 return list(decorated)
751 def _build_telemetry_context(
752 self, api_id, correlation_id=None, refresh_reason=None):
753 return msal.telemetry._TelemetryContext(
754 self._telemetry_buffer, self._telemetry_lock, api_id,
755 correlation_id=correlation_id, refresh_reason=refresh_reason)
757 def _get_regional_authority(self, central_authority) -> Optional[Authority]:
758 if self._region_configured is False: # User opts out of ESTS-R
759 return None # Short circuit to completely bypass region detection
760 if self._region_configured is None: # User did not make an ESTS-R choice
761 self._region_configured = os.getenv("MSAL_FORCE_REGION") or None
762 self._region_detected = self._region_detected or _detect_region(
763 self.http_client if self._region_configured is not None else None)
764 if (self._region_configured != self.ATTEMPT_REGION_DISCOVERY
765 and self._region_configured != self._region_detected):
766 logger.warning('Region configured ({}) != region detected ({})'.format(
767 repr(self._region_configured), repr(self._region_detected)))
768 region_to_use = (
769 self._region_detected
770 if self._region_configured == self.ATTEMPT_REGION_DISCOVERY
771 else self._region_configured) # It will retain the None i.e. opted out
772 logger.debug('Region to be used: {}'.format(repr(region_to_use)))
773 if region_to_use:
774 regional_host = ("{}.login.microsoft.com".format(region_to_use)
775 if central_authority.instance in (
776 # The list came from point 3 of the algorithm section in this internal doc
777 # https://identitydivision.visualstudio.com/DevEx/_git/AuthLibrariesApiReview?path=/PinAuthToRegion/AAD%20SDK%20Proposal%20to%20Pin%20Auth%20to%20region.md&anchor=algorithm&_a=preview
778 "login.microsoftonline.com",
779 "login.microsoft.com",
780 "login.windows.net",
781 "sts.windows.net",
782 )
783 else "{}.{}".format(region_to_use, central_authority.instance))
784 return Authority( # The central_authority has already been validated
785 "https://{}/{}".format(regional_host, central_authority.tenant),
786 self.http_client,
787 instance_discovery=False,
788 )
789 return None
791 def _build_client(self, client_credential, authority, skip_regional_client=False):
792 client_assertion = None
793 client_assertion_type = None
794 default_headers = {
795 "x-client-sku": SKU, "x-client-ver": __version__,
796 "x-client-os": sys.platform,
797 "x-ms-lib-capability": "retry-after, h429",
798 }
799 if self.app_name:
800 default_headers['x-app-name'] = self.app_name
801 if self.app_version:
802 default_headers['x-app-ver'] = self.app_version
803 default_body = {"client_info": 1}
804 if isinstance(client_credential, dict):
805 client_assertion_type = Client.CLIENT_ASSERTION_TYPE_JWT
806 # Use client_credential.get("...") rather than "..." in client_credential
807 # so that we can ignore an empty string came from an empty ENV VAR.
808 if client_credential.get("client_assertion"):
809 client_assertion = client_credential['client_assertion']
810 else:
811 headers = {}
812 sha1_thumbprint = sha256_thumbprint = None
813 passphrase_bytes = _str2bytes(
814 client_credential["passphrase"]
815 ) if client_credential.get("passphrase") else None
816 if client_credential.get("private_key_pfx_path"):
817 private_key, sha256_thumbprint, sha1_thumbprint, x5c = _parse_pfx(
818 client_credential["private_key_pfx_path"],
819 passphrase_bytes)
820 if client_credential.get("public_certificate") is True and x5c:
821 headers["x5c"] = x5c
822 elif client_credential.get("private_key"): # PEM blob
823 private_key = ( # handles both encrypted and unencrypted
824 _load_private_key_from_pem_str(
825 client_credential['private_key'], passphrase_bytes)
826 if passphrase_bytes
827 else client_credential['private_key']
828 )
830 # Determine thumbprints based on what's provided
831 if client_credential.get("thumbprint"):
832 # User provided a thumbprint - use it as SHA-1 (legacy/manual approach)
833 sha1_thumbprint = client_credential["thumbprint"]
834 sha256_thumbprint = None
835 elif isinstance(client_credential.get('public_certificate'), str):
836 # No thumbprint provided, but we have a certificate to calculate thumbprints
837 from cryptography import x509
838 cert = x509.load_pem_x509_certificate(
839 _str2bytes(client_credential['public_certificate']))
840 sha256_thumbprint, sha1_thumbprint, headers["x5c"] = (
841 _extract_cert_and_thumbprints(cert))
842 else:
843 raise ValueError(
844 "You must provide either 'thumbprint' or 'public_certificate' "
845 "from which the thumbprint can be calculated.")
846 else:
847 raise ValueError(
848 "client_credential needs to follow this format "
849 "https://msal-python.readthedocs.io/en/latest/#msal.ClientApplication.params.client_credential")
850 if ("x5c" not in headers # So the .pfx file contains no certificate
851 and isinstance(client_credential.get('public_certificate'), str)
852 ): # Then we treat the public_certificate value as PEM content
853 headers["x5c"] = extract_certs(client_credential['public_certificate'])
854 if sha256_thumbprint and not authority.is_adfs:
855 assertion_params = {
856 "algorithm": "PS256", "sha256_thumbprint": sha256_thumbprint,
857 }
858 else: # Fall back
859 if not sha1_thumbprint:
860 raise ValueError("You shall provide a thumbprint in SHA1.")
861 assertion_params = {
862 "algorithm": "RS256", "sha1_thumbprint": sha1_thumbprint,
863 }
864 assertion = JwtAssertionCreator(
865 private_key, headers=headers, **assertion_params)
866 client_assertion = assertion.create_regenerative_assertion(
867 audience=authority.token_endpoint, issuer=self.client_id,
868 additional_claims=self.client_claims or {})
869 else:
870 default_body['client_secret'] = client_credential
871 central_configuration = {
872 "authorization_endpoint": authority.authorization_endpoint,
873 "token_endpoint": authority.token_endpoint,
874 "device_authorization_endpoint": authority.device_authorization_endpoint,
875 }
876 central_client = _ClientWithCcsRoutingInfo(
877 central_configuration,
878 self.client_id,
879 http_client=self.http_client,
880 default_headers=default_headers,
881 default_body=default_body,
882 client_assertion=client_assertion,
883 client_assertion_type=client_assertion_type,
884 on_obtaining_tokens=lambda event: self.token_cache.add(dict(
885 event, environment=authority.instance)),
886 on_removing_rt=self.token_cache.remove_rt,
887 on_updating_rt=self.token_cache.update_rt)
889 regional_client = None
890 if (client_credential # Currently regional endpoint only serves some CCA flows
891 and not skip_regional_client):
892 regional_authority = self._get_regional_authority(authority)
893 if regional_authority:
894 regional_configuration = {
895 "authorization_endpoint": regional_authority.authorization_endpoint,
896 "token_endpoint": regional_authority.token_endpoint,
897 "device_authorization_endpoint":
898 regional_authority.device_authorization_endpoint,
899 }
900 regional_client = _ClientWithCcsRoutingInfo(
901 regional_configuration,
902 self.client_id,
903 http_client=self.http_client,
904 default_headers=default_headers,
905 default_body=default_body,
906 client_assertion=client_assertion,
907 client_assertion_type=client_assertion_type,
908 on_obtaining_tokens=lambda event: self.token_cache.add(dict(
909 event, environment=authority.instance)),
910 on_removing_rt=self.token_cache.remove_rt,
911 on_updating_rt=self.token_cache.update_rt)
912 return central_client, regional_client
914 def initiate_auth_code_flow(
915 self,
916 scopes, # type: list[str]
917 redirect_uri=None,
918 state=None, # Recommended by OAuth2 for CSRF protection
919 prompt=None,
920 login_hint=None, # type: Optional[str]
921 domain_hint=None, # type: Optional[str]
922 claims_challenge=None,
923 max_age=None,
924 response_mode=None, # type: Optional[str]
925 ):
926 """Initiate an auth code flow.
928 Later when the response reaches your redirect_uri,
929 you can use :func:`~acquire_token_by_auth_code_flow()`
930 to complete the authentication/authorization.
932 :param list scopes:
933 It is a list of case-sensitive strings.
934 :param str redirect_uri:
935 Optional. If not specified, server will use the pre-registered one.
936 :param str state:
937 An opaque value used by the client to
938 maintain state between the request and callback.
939 If absent, this library will automatically generate one internally.
940 :param str prompt:
941 By default, no prompt value will be sent, not even string ``"none"``.
942 You will have to specify a value explicitly.
943 Its valid values are the constants defined in
944 :class:`Prompt <msal.Prompt>`.
946 :param str login_hint:
947 Optional. Identifier of the user. Generally a User Principal Name (UPN).
948 :param domain_hint:
949 Can be one of "consumers" or "organizations" or your tenant domain "contoso.com".
950 If included, it will skip the email-based discovery process that user goes
951 through on the sign-in page, leading to a slightly more streamlined user experience.
952 More information on possible values available in
953 `Auth Code Flow doc <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-auth-code-flow#request-an-authorization-code>`_ and
954 `domain_hint doc <https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-oapx/86fb452d-e34a-494e-ac61-e526e263b6d8>`_.
956 :param int max_age:
957 OPTIONAL. Maximum Authentication Age.
958 Specifies the allowable elapsed time in seconds
959 since the last time the End-User was actively authenticated.
960 If the elapsed time is greater than this value,
961 Microsoft identity platform will actively re-authenticate the End-User.
963 MSAL Python will also automatically validate the auth_time in ID token.
965 New in version 1.15.
967 :param str response_mode:
968 OPTIONAL. Specifies the method with which response parameters should be returned.
969 The default value is equivalent to ``query``, which was still secure enough in MSAL Python
970 (because MSAL Python does not transfer tokens via query parameter in the first place).
971 For even better security, we recommend using the value ``form_post``.
972 In "form_post" mode, response parameters
973 will be encoded as HTML form values that are transmitted via the HTTP POST method and
974 encoded in the body using the application/x-www-form-urlencoded format.
975 Valid values can be either "form_post" for HTTP POST to callback URI or
976 "query" (the default) for HTTP GET with parameters encoded in query string.
977 More information on possible values
978 `here <https://openid.net/specs/oauth-v2-multiple-response-types-1_0.html#ResponseModes>`
979 and `here <https://openid.net/specs/oauth-v2-form-post-response-mode-1_0.html#FormPostResponseMode>`
981 .. note::
982 You should configure your web framework to accept form_post responses instead of query responses.
983 While this parameter still works, it will be removed in a future version.
984 Using query-based response modes is less secure and should be avoided.
986 :return:
987 The auth code flow. It is a dict in this form::
989 {
990 "auth_uri": "https://...", // Guide user to visit this
991 "state": "...", // You may choose to verify it by yourself,
992 // or just let acquire_token_by_auth_code_flow()
993 // do that for you.
994 "...": "...", // Everything else are reserved and internal
995 }
997 The caller is expected to:
999 1. somehow store this content, typically inside the current session,
1000 2. guide the end user (i.e. resource owner) to visit that auth_uri,
1001 3. and then relay this dict and subsequent auth response to
1002 :func:`~acquire_token_by_auth_code_flow()`.
1003 """
1004 # Note to maintainers: Do not emit warning for the use of response_mode here,
1005 # because response_mode=form_post is still the recommended usage for MSAL Python 1.x.
1006 # App developers making the right call shall not be disturbed by unactionable warnings.
1007 client = _ClientWithCcsRoutingInfo(
1008 {"authorization_endpoint": self.authority.authorization_endpoint},
1009 self.client_id,
1010 http_client=self.http_client)
1011 flow = client.initiate_auth_code_flow(
1012 redirect_uri=redirect_uri, state=state, login_hint=login_hint,
1013 prompt=prompt,
1014 scope=self._decorate_scope(scopes),
1015 domain_hint=domain_hint,
1016 claims=_merge_claims_challenge_and_capabilities(
1017 self._client_capabilities, claims_challenge),
1018 max_age=max_age,
1019 response_mode=response_mode,
1020 )
1021 flow["claims_challenge"] = claims_challenge
1022 return flow
1024 def get_authorization_request_url(
1025 self,
1026 scopes, # type: list[str]
1027 login_hint=None, # type: Optional[str]
1028 state=None, # Recommended by OAuth2 for CSRF protection
1029 redirect_uri=None,
1030 response_type="code", # Could be "token" if you use Implicit Grant
1031 prompt=None,
1032 nonce=None,
1033 domain_hint=None, # type: Optional[str]
1034 claims_challenge=None,
1035 **kwargs):
1036 """Constructs a URL for you to start a Authorization Code Grant.
1038 :param list[str] scopes: (Required)
1039 Scopes requested to access a protected API (a resource).
1040 :param str state: Recommended by OAuth2 for CSRF protection.
1041 :param str login_hint:
1042 Identifier of the user. Generally a User Principal Name (UPN).
1043 :param str redirect_uri:
1044 Address to return to upon receiving a response from the authority.
1045 :param str response_type:
1046 Default value is "code" for an OAuth2 Authorization Code grant.
1048 You could use other content such as "id_token" or "token",
1049 which would trigger an Implicit Grant, but that is
1050 `not recommended <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-implicit-grant-flow#is-the-implicit-grant-suitable-for-my-app>`_.
1052 :param str prompt:
1053 By default, no prompt value will be sent, not even string ``"none"``.
1054 You will have to specify a value explicitly.
1055 Its valid values are the constants defined in
1056 :class:`Prompt <msal.Prompt>`.
1057 :param nonce:
1058 A cryptographically random value used to mitigate replay attacks. See also
1059 `OIDC specs <https://openid.net/specs/openid-connect-core-1_0.html#AuthRequest>`_.
1060 :param domain_hint:
1061 Can be one of "consumers" or "organizations" or your tenant domain "contoso.com".
1062 If included, it will skip the email-based discovery process that user goes
1063 through on the sign-in page, leading to a slightly more streamlined user experience.
1064 More information on possible values available in
1065 `Auth Code Flow doc <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-auth-code-flow#request-an-authorization-code>`_ and
1066 `domain_hint doc <https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-oapx/86fb452d-e34a-494e-ac61-e526e263b6d8>`_.
1067 :param claims_challenge:
1068 The claims_challenge parameter requests specific claims requested by the resource provider
1069 in the form of a claims_challenge directive in the www-authenticate header to be
1070 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
1071 It is a string of a JSON object which contains lists of claims being requested from these locations.
1073 :return: The authorization url as a string.
1074 """
1075 authority = kwargs.pop("authority", None) # Historically we support this
1076 if authority:
1077 warnings.warn(
1078 "We haven't decided if this method will accept authority parameter")
1079 # The previous implementation is, it will use self.authority by default.
1080 # Multi-tenant app can use new authority on demand
1081 the_authority = Authority(
1082 authority,
1083 self.http_client,
1084 instance_discovery=self._instance_discovery,
1085 ) if authority else self.authority
1087 client = _ClientWithCcsRoutingInfo(
1088 {"authorization_endpoint": the_authority.authorization_endpoint},
1089 self.client_id,
1090 http_client=self.http_client)
1091 warnings.warn(
1092 "Change your get_authorization_request_url() "
1093 "to initiate_auth_code_flow()", DeprecationWarning)
1094 with warnings.catch_warnings(record=True):
1095 return client.build_auth_request_uri(
1096 response_type=response_type,
1097 redirect_uri=redirect_uri, state=state, login_hint=login_hint,
1098 prompt=prompt,
1099 scope=self._decorate_scope(scopes),
1100 nonce=nonce,
1101 domain_hint=domain_hint,
1102 claims=_merge_claims_challenge_and_capabilities(
1103 self._client_capabilities, claims_challenge),
1104 )
1106 def acquire_token_by_auth_code_flow(
1107 self, auth_code_flow, auth_response, scopes=None, **kwargs):
1108 """Validate the auth response being redirected back, and obtain tokens.
1110 It automatically provides nonce protection.
1112 :param dict auth_code_flow:
1113 The same dict returned by :func:`~initiate_auth_code_flow()`.
1114 :param dict auth_response:
1115 A dict of the query string received from auth server.
1116 :param list[str] scopes:
1117 Scopes requested to access a protected API (a resource).
1119 Most of the time, you can leave it empty.
1121 If you requested user consent for multiple resources, here you will
1122 need to provide a subset of what you required in
1123 :func:`~initiate_auth_code_flow()`.
1125 OAuth2 was designed mostly for singleton services,
1126 where tokens are always meant for the same resource and the only
1127 changes are in the scopes.
1128 In Microsoft Entra, tokens can be issued for multiple 3rd party resources.
1129 You can ask authorization code for multiple resources,
1130 but when you redeem it, the token is for only one intended
1131 recipient, called audience.
1132 So the developer need to specify a scope so that we can restrict the
1133 token to be issued for the corresponding audience.
1135 :return:
1136 * A dict containing "access_token" and/or "id_token", among others,
1137 depends on what scope was used.
1138 (See https://tools.ietf.org/html/rfc6749#section-5.1)
1139 * A dict containing "error", optionally "error_description", "error_uri".
1140 (It is either `this <https://tools.ietf.org/html/rfc6749#section-4.1.2.1>`_
1141 or `that <https://tools.ietf.org/html/rfc6749#section-5.2>`_)
1142 * Most client-side data error would result in ValueError exception.
1143 So the usage pattern could be without any protocol details::
1145 def authorize(): # A controller in a web app
1146 try:
1147 result = msal_app.acquire_token_by_auth_code_flow(
1148 session.get("flow", {}), request.args)
1149 if "error" in result:
1150 return render_template("error.html", result)
1151 use(result) # Token(s) are available in result and cache
1152 except ValueError: # Usually caused by CSRF
1153 pass # Simply ignore them
1154 return redirect(url_for("index"))
1155 """
1156 self._validate_ssh_cert_input_data(kwargs.get("data", {}))
1157 telemetry_context = self._build_telemetry_context(
1158 self.ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID)
1159 response = _clean_up(self.client.obtain_token_by_auth_code_flow(
1160 auth_code_flow,
1161 auth_response,
1162 scope=self._decorate_scope(scopes) if scopes else None,
1163 headers=telemetry_context.generate_headers(),
1164 data=dict(
1165 kwargs.pop("data", {}),
1166 claims=_merge_claims_challenge_and_capabilities(
1167 self._client_capabilities,
1168 auth_code_flow.pop("claims_challenge", None))),
1169 **kwargs))
1170 if "access_token" in response:
1171 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP
1172 telemetry_context.update_telemetry(response)
1173 return response
1175 def acquire_token_by_authorization_code(
1176 self,
1177 code,
1178 scopes, # Syntactically required. STS accepts empty value though.
1179 redirect_uri=None,
1180 # REQUIRED, if the "redirect_uri" parameter was included in the
1181 # authorization request as described in Section 4.1.1, and their
1182 # values MUST be identical.
1183 nonce=None,
1184 claims_challenge=None,
1185 **kwargs):
1186 """The second half of the Authorization Code Grant.
1188 :param code: The authorization code returned from Authorization Server.
1189 :param list[str] scopes: (Required)
1190 Scopes requested to access a protected API (a resource).
1192 If you requested user consent for multiple resources, here you will
1193 typically want to provide a subset of what you required in AuthCode.
1195 OAuth2 was designed mostly for singleton services,
1196 where tokens are always meant for the same resource and the only
1197 changes are in the scopes.
1198 In Microsoft Entra, tokens can be issued for multiple 3rd party resources.
1199 You can ask authorization code for multiple resources,
1200 but when you redeem it, the token is for only one intended
1201 recipient, called audience.
1202 So the developer need to specify a scope so that we can restrict the
1203 token to be issued for the corresponding audience.
1205 :param nonce:
1206 If you provided a nonce when calling :func:`get_authorization_request_url`,
1207 same nonce should also be provided here, so that we'll validate it.
1208 An exception will be raised if the nonce in id token mismatches.
1210 :param claims_challenge:
1211 The claims_challenge parameter requests specific claims requested by the resource provider
1212 in the form of a claims_challenge directive in the www-authenticate header to be
1213 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
1214 It is a string of a JSON object which contains lists of claims being requested from these locations.
1216 :return: A dict representing the json response from Microsoft Entra:
1218 - A successful response would contain "access_token" key,
1219 - an error response would contain "error" and usually "error_description".
1220 """
1221 # If scope is absent on the wire, STS will give you a token associated
1222 # to the FIRST scope sent during the authorization request.
1223 # So in theory, you can omit scope here when you were working with only
1224 # one scope. But, MSAL decorates your scope anyway, so they are never
1225 # really empty.
1226 assert isinstance(scopes, list), "Invalid parameter type"
1227 self._validate_ssh_cert_input_data(kwargs.get("data", {}))
1228 warnings.warn(
1229 "Change your acquire_token_by_authorization_code() "
1230 "to acquire_token_by_auth_code_flow()", DeprecationWarning)
1231 with warnings.catch_warnings(record=True):
1232 telemetry_context = self._build_telemetry_context(
1233 self.ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID)
1234 response = _clean_up(self.client.obtain_token_by_authorization_code(
1235 code, redirect_uri=redirect_uri,
1236 scope=self._decorate_scope(scopes),
1237 headers=telemetry_context.generate_headers(),
1238 data=dict(
1239 kwargs.pop("data", {}),
1240 claims=_merge_claims_challenge_and_capabilities(
1241 self._client_capabilities, claims_challenge)),
1242 nonce=nonce,
1243 **kwargs))
1244 if "access_token" in response:
1245 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP
1246 telemetry_context.update_telemetry(response)
1247 return response
1249 def get_accounts(self, username=None):
1250 """Get a list of accounts which previously signed in, i.e. exists in cache.
1252 An account can later be used in :func:`~acquire_token_silent`
1253 to find its tokens.
1255 :param username:
1256 Filter accounts with this username only. Case insensitive.
1257 :return: A list of account objects.
1258 Each account is a dict. For now, we only document its "username" field.
1259 Your app can choose to display those information to end user,
1260 and allow user to choose one of his/her accounts to proceed.
1261 """
1262 accounts = self._find_msal_accounts(environment=self.authority.instance)
1263 if not accounts: # Now try other aliases of this authority instance
1264 for alias in self._get_authority_aliases(self.authority.instance):
1265 accounts = self._find_msal_accounts(environment=alias)
1266 if accounts:
1267 break
1268 if username:
1269 # Federated account["username"] from AAD could contain mixed case
1270 lowercase_username = username.lower()
1271 accounts = [a for a in accounts
1272 if a["username"].lower() == lowercase_username]
1273 if not accounts:
1274 logger.debug(( # This would also happen when the cache is empty
1275 "get_accounts(username='{}') finds no account. "
1276 "If tokens were acquired without 'profile' scope, "
1277 "they would contain no username for filtering. "
1278 "Consider calling get_accounts(username=None) instead."
1279 ).format(username))
1280 # Does not further filter by existing RTs here. It probably won't matter.
1281 # Because in most cases Accounts and RTs co-exist.
1282 # Even in the rare case when an RT is revoked and then removed,
1283 # acquire_token_silent() would then yield no result,
1284 # apps would fall back to other acquire methods. This is the standard pattern.
1285 return accounts
1287 def _find_msal_accounts(self, environment):
1288 interested_authority_types = [
1289 TokenCache.AuthorityType.ADFS, TokenCache.AuthorityType.MSSTS]
1290 if _is_running_in_cloud_shell():
1291 interested_authority_types.append(_AUTHORITY_TYPE_CLOUDSHELL)
1292 grouped_accounts = {
1293 a.get("home_account_id"): # Grouped by home tenant's id
1294 { # These are minimal amount of non-tenant-specific account info
1295 "home_account_id": a.get("home_account_id"),
1296 "environment": a.get("environment"),
1297 "username": a.get("username"),
1298 "account_source": a.get("account_source"),
1300 # The following fields for backward compatibility, for now
1301 "authority_type": a.get("authority_type"),
1302 "local_account_id": a.get("local_account_id"), # Tenant-specific
1303 "realm": a.get("realm"), # Tenant-specific
1304 }
1305 for a in self.token_cache.search(
1306 TokenCache.CredentialType.ACCOUNT,
1307 query={"environment": environment})
1308 if a["authority_type"] in interested_authority_types
1309 }
1310 return list(grouped_accounts.values())
1312 def _get_instance_metadata(self, instance): # This exists so it can be mocked in unit test
1313 instance_discovery_host = _get_instance_discovery_host(instance)
1314 resp = self.http_client.get(
1315 _get_instance_discovery_endpoint(instance),
1316 params={
1317 'api-version': '1.1',
1318 'authorization_endpoint': (
1319 "https://{}/common/oauth2/authorize".format(instance_discovery_host)
1320 ),
1321 },
1322 headers={'Accept': 'application/json'})
1323 resp.raise_for_status()
1324 return json.loads(resp.text)['metadata']
1326 def _get_authority_aliases(self, instance):
1327 if self._instance_discovery is False:
1328 return []
1329 if self.authority._is_known_to_developer:
1330 # Then it is an ADFS/B2C/known_authority_hosts situation
1331 # which may not reach the central endpoint, so we skip it.
1332 return []
1333 if instance not in self.authority_groups:
1334 self.authority_groups[instance] = [
1335 set(group['aliases']) for group in self._get_instance_metadata(instance)]
1336 for group in self.authority_groups[instance]:
1337 if instance in group:
1338 return [alias for alias in group if alias != instance]
1339 return []
1341 def remove_account(self, account):
1342 """Sign me out and forget me from token cache"""
1343 if self._enable_broker:
1344 from .broker import _signout_silently
1345 error = _signout_silently(self.client_id, account["local_account_id"])
1346 if error:
1347 logger.debug("_signout_silently() returns error: %s", error)
1348 # Broker sign-out has been attempted, even if the _forget_me() below throws.
1349 self._forget_me(account)
1351 def _sign_out(self, home_account):
1352 # Remove all relevant RTs and ATs from token cache
1353 owned_by_home_account = {
1354 "environment": home_account["environment"],
1355 "home_account_id": home_account["home_account_id"],} # realm-independent
1356 app_metadata = self._get_app_metadata(home_account["environment"])
1357 # Remove RTs/FRTs, and they are realm-independent
1358 for rt in [ # Remove RTs from a static list (rather than from a dynamic generator),
1359 # to avoid changing self.token_cache while it is being iterated
1360 rt for rt in self.token_cache.search(
1361 TokenCache.CredentialType.REFRESH_TOKEN, query=owned_by_home_account)
1362 # Do RT's app ownership check as a precaution, in case family apps
1363 # and 3rd-party apps share same token cache, although they should not.
1364 if rt["client_id"] == self.client_id or (
1365 app_metadata.get("family_id") # Now let's settle family business
1366 and rt.get("family_id") == app_metadata["family_id"])
1367 ]:
1368 self.token_cache.remove_rt(rt)
1369 for at in list(self.token_cache.search( # Remove ATs from a static list,
1370 # to avoid changing self.token_cache while it is being iterated
1371 TokenCache.CredentialType.ACCESS_TOKEN, query=owned_by_home_account,
1372 # Regardless of realm, b/c we've removed realm-independent RTs anyway
1373 )):
1374 # To avoid the complexity of locating sibling family app's AT,
1375 # we skip AT's app ownership check.
1376 # It means ATs for other apps will also be removed, it is OK because:
1377 # * non-family apps are not supposed to share token cache to begin with;
1378 # * Even if it happens, we keep other app's RT already, so SSO still works
1379 self.token_cache.remove_at(at)
1381 def _forget_me(self, home_account):
1382 # It implies signout, and then also remove all relevant accounts and IDTs
1383 self._sign_out(home_account)
1384 owned_by_home_account = {
1385 "environment": home_account["environment"],
1386 "home_account_id": home_account["home_account_id"],} # realm-independent
1387 for idt in list(self.token_cache.search( # Remove IDTs from a static list,
1388 # to avoid changing self.token_cache while it is being iterated
1389 TokenCache.CredentialType.ID_TOKEN, query=owned_by_home_account, # regardless of realm
1390 )):
1391 self.token_cache.remove_idt(idt)
1392 for a in list(self.token_cache.search( # Remove Accounts from a static list,
1393 # to avoid changing self.token_cache while it is being iterated
1394 TokenCache.CredentialType.ACCOUNT, query=owned_by_home_account, # regardless of realm
1395 )):
1396 self.token_cache.remove_account(a)
1398 def _acquire_token_by_cloud_shell(self, scopes, data=None):
1399 from .cloudshell import _obtain_token
1400 response = _obtain_token(
1401 self.http_client, scopes, client_id=self.client_id, data=data)
1402 if "error" not in response:
1403 self.token_cache.add(dict(
1404 client_id=self.client_id,
1405 scope=response["scope"].split() if "scope" in response else scopes,
1406 token_endpoint=self.authority.token_endpoint,
1407 response=response,
1408 data=data or {},
1409 authority_type=_AUTHORITY_TYPE_CLOUDSHELL,
1410 ))
1411 if "access_token" in response:
1412 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_BROKER
1413 return response
1415 def acquire_token_silent(
1416 self,
1417 scopes, # type: List[str]
1418 account, # type: Optional[Account]
1419 authority=None, # See get_authorization_request_url()
1420 force_refresh=False, # type: Optional[boolean]
1421 claims_challenge=None,
1422 auth_scheme=None,
1423 **kwargs):
1424 """Acquire an access token for given account, without user interaction.
1426 It has same parameters as the :func:`~acquire_token_silent_with_error`.
1427 The difference is the behavior of the return value.
1428 This method will combine the cache empty and refresh error
1429 into one return value, `None`.
1430 If your app does not care about the exact token refresh error during
1431 token cache look-up, then this method is easier and recommended.
1433 :return:
1434 - A dict containing no "error" key,
1435 and typically contains an "access_token" key,
1436 if cache lookup succeeded.
1437 - None when cache lookup does not yield a token.
1438 """
1439 if not account:
1440 return None # A backward-compatible NO-OP to drop the account=None usage
1441 result = _clean_up(self._acquire_token_silent_with_error(
1442 scopes, account, authority=authority, force_refresh=force_refresh,
1443 claims_challenge=claims_challenge, auth_scheme=auth_scheme, **kwargs))
1444 return result if result and "error" not in result else None
1446 def acquire_token_silent_with_error(
1447 self,
1448 scopes, # type: List[str]
1449 account, # type: Optional[Account]
1450 authority=None, # See get_authorization_request_url()
1451 force_refresh=False, # type: Optional[boolean]
1452 claims_challenge=None,
1453 auth_scheme=None,
1454 **kwargs):
1455 """Acquire an access token for given account, without user interaction.
1457 It is done either by finding a valid access token from cache,
1458 or by finding a valid refresh token from cache and then automatically
1459 use it to redeem a new access token.
1461 This method will differentiate cache empty from token refresh error.
1462 If your app cares the exact token refresh error during
1463 token cache look-up, then this method is suitable.
1464 Otherwise, the other method :func:`~acquire_token_silent` is recommended.
1466 :param list[str] scopes: (Required)
1467 Scopes requested to access a protected API (a resource).
1468 :param account: (Required)
1469 One of the account object returned by :func:`~get_accounts`.
1470 Starting from MSAL Python 1.23,
1471 a ``None`` input will become a NO-OP and always return ``None``.
1472 :param force_refresh:
1473 If True, it will skip Access Token look-up,
1474 and try to find a Refresh Token to obtain a new Access Token.
1475 :param claims_challenge:
1476 The claims_challenge parameter requests specific claims requested by the resource provider
1477 in the form of a claims_challenge directive in the www-authenticate header to be
1478 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
1479 It is a string of a JSON object which contains lists of claims being requested from these locations.
1480 :param object auth_scheme:
1481 You can provide an ``msal.auth_scheme.PopAuthScheme`` object
1482 so that MSAL will get a Proof-of-Possession (POP) token for you.
1484 New in version 1.26.0.
1486 :return:
1487 - A dict containing no "error" key,
1488 and typically contains an "access_token" key,
1489 if cache lookup succeeded.
1490 - None when there is simply no token in the cache.
1491 - A dict containing an "error" key, when token refresh failed.
1492 """
1493 if not account:
1494 return None # A backward-compatible NO-OP to drop the account=None usage
1495 return _clean_up(self._acquire_token_silent_with_error(
1496 scopes, account, authority=authority, force_refresh=force_refresh,
1497 claims_challenge=claims_challenge, auth_scheme=auth_scheme, **kwargs))
1499 def _acquire_token_silent_with_error(
1500 self,
1501 scopes, # type: List[str]
1502 account, # type: Optional[Account]
1503 authority=None, # See get_authorization_request_url()
1504 force_refresh=False, # type: Optional[boolean]
1505 claims_challenge=None,
1506 auth_scheme=None,
1507 **kwargs):
1508 assert isinstance(scopes, list), "Invalid parameter type"
1509 self._validate_ssh_cert_input_data(kwargs.get("data", {}))
1510 correlation_id = msal.telemetry._get_new_correlation_id()
1511 if authority:
1512 warnings.warn("We haven't decided how/if this method will accept authority parameter")
1513 # the_authority = Authority(
1514 # authority,
1515 # self.http_client,
1516 # instance_discovery=self._instance_discovery,
1517 # ) if authority else self.authority
1518 result = self._acquire_token_silent_from_cache_and_possibly_refresh_it(
1519 scopes, account, self.authority, force_refresh=force_refresh,
1520 claims_challenge=claims_challenge,
1521 correlation_id=correlation_id,
1522 auth_scheme=auth_scheme,
1523 **kwargs)
1524 if result and "error" not in result:
1525 return result
1526 final_result = result
1527 for alias in self._get_authority_aliases(self.authority.instance):
1528 if not list(self.token_cache.search( # Need a list to test emptiness
1529 self.token_cache.CredentialType.REFRESH_TOKEN,
1530 # target=scopes, # MUST NOT filter by scopes, because:
1531 # 1. AAD RTs are scope-independent;
1532 # 2. therefore target is optional per schema;
1533 query={"environment": alias})):
1534 # Skip heavy weight logic when RT for this alias doesn't exist
1535 continue
1536 the_authority = Authority(
1537 "https://" + alias + "/" + self.authority.tenant,
1538 self.http_client,
1539 instance_discovery=False,
1540 )
1541 result = self._acquire_token_silent_from_cache_and_possibly_refresh_it(
1542 scopes, account, the_authority, force_refresh=force_refresh,
1543 claims_challenge=claims_challenge,
1544 correlation_id=correlation_id,
1545 auth_scheme=auth_scheme,
1546 **kwargs)
1547 if result:
1548 if "error" not in result:
1549 return result
1550 final_result = result
1551 if final_result and final_result.get("suberror"):
1552 final_result["classification"] = { # Suppress these suberrors, per #57
1553 "bad_token": "",
1554 "token_expired": "",
1555 "protection_policy_required": "",
1556 "client_mismatch": "",
1557 "device_authentication_failed": "",
1558 }.get(final_result["suberror"], final_result["suberror"])
1559 return final_result
1561 def _acquire_token_silent_from_cache_and_possibly_refresh_it(
1562 self,
1563 scopes, # type: List[str]
1564 account, # type: Optional[Account]
1565 authority, # This can be different than self.authority
1566 force_refresh=False, # type: Optional[boolean]
1567 claims_challenge=None,
1568 correlation_id=None,
1569 http_exceptions=None,
1570 auth_scheme=None,
1571 **kwargs):
1572 # This internal method has two calling patterns:
1573 # it accepts a non-empty account to find token for a user,
1574 # and accepts account=None to find a token for the current app.
1575 access_token_from_cache = None
1576 if not (force_refresh or claims_challenge or auth_scheme): # Then attempt AT cache
1577 query={
1578 "client_id": self.client_id,
1579 "environment": authority.instance,
1580 "realm": authority.tenant,
1581 "home_account_id": (account or {}).get("home_account_id"),
1582 }
1583 key_id = kwargs.get("data", {}).get("key_id")
1584 if key_id: # Some token types (SSH-certs, POP) are bound to a key
1585 query["key_id"] = key_id
1586 now = time.time()
1587 refresh_reason = msal.telemetry.AT_ABSENT
1588 for entry in self.token_cache.search( # A generator allows us to
1589 # break early in cache-hit without finding a full list
1590 self.token_cache.CredentialType.ACCESS_TOKEN,
1591 target=scopes,
1592 query=query,
1593 ): # This loop is about token search, not about token deletion.
1594 # Note that search() holds a lock during this loop;
1595 # that is fine because this loop is fast
1596 expires_in = int(entry["expires_on"]) - now
1597 if expires_in < 5*60: # Then consider it expired
1598 refresh_reason = msal.telemetry.AT_EXPIRED
1599 continue # Removal is not necessary, it will be overwritten
1600 logger.debug("Cache hit an AT")
1601 access_token_from_cache = { # Mimic a real response
1602 "access_token": entry["secret"],
1603 "token_type": entry.get("token_type", "Bearer"),
1604 "expires_in": int(expires_in), # OAuth2 specs defines it as int
1605 self._TOKEN_SOURCE: self._TOKEN_SOURCE_CACHE,
1606 }
1607 if "refresh_on" in entry:
1608 access_token_from_cache["refresh_on"] = int(entry["refresh_on"])
1609 if int(entry["refresh_on"]) < now: # aging
1610 refresh_reason = msal.telemetry.AT_AGING
1611 break # With a fallback in hand, we break here to go refresh
1612 self._build_telemetry_context(-1).hit_an_access_token()
1613 return access_token_from_cache # It is still good as new
1614 else:
1615 refresh_reason = msal.telemetry.FORCE_REFRESH # TODO: It could also mean claims_challenge
1616 assert refresh_reason, "It should have been established at this point"
1617 if not http_exceptions: # It can be a tuple of exceptions
1618 # The exact HTTP exceptions are transportation-layer dependent
1619 from requests.exceptions import RequestException # Lazy load
1620 http_exceptions = (RequestException,)
1621 try:
1622 data = kwargs.get("data", {})
1623 if account and account.get("authority_type") == _AUTHORITY_TYPE_CLOUDSHELL:
1624 if auth_scheme:
1625 raise ValueError("auth_scheme is not supported in Cloud Shell")
1626 return self._acquire_token_by_cloud_shell(scopes, data=data)
1628 is_ssh_cert_or_pop_request = _is_ssh_cert_or_pop_request(data.get("token_type"), auth_scheme)
1630 if self._enable_broker and account and account.get("account_source") in (
1631 _GRANT_TYPE_BROKER, # Broker successfully established this account previously.
1632 None, # Unknown data from older MSAL. Broker might still work.
1633 ) and (sys.platform in ("win32", "darwin") or not is_ssh_cert_or_pop_request):
1634 from .broker import _acquire_token_silently
1635 response = _acquire_token_silently(
1636 "https://{}/{}".format(self.authority.instance, self.authority.tenant),
1637 self.client_id,
1638 account["local_account_id"],
1639 scopes,
1640 claims=_merge_claims_challenge_and_capabilities(
1641 self._client_capabilities, claims_challenge),
1642 correlation_id=correlation_id,
1643 auth_scheme=auth_scheme,
1644 **data)
1645 if response: # Broker provides a decisive outcome
1646 account_was_established_by_broker = account.get(
1647 "account_source") == _GRANT_TYPE_BROKER
1648 broker_attempt_succeeded_just_now = "error" not in response
1649 if account_was_established_by_broker or broker_attempt_succeeded_just_now:
1650 return self._process_broker_response(response, scopes, data)
1652 if auth_scheme:
1653 raise ValueError(self._AUTH_SCHEME_UNSUPPORTED)
1654 if account:
1655 result = self._acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family(
1656 authority, self._decorate_scope(scopes), account,
1657 refresh_reason=refresh_reason, claims_challenge=claims_challenge,
1658 correlation_id=correlation_id,
1659 **kwargs)
1660 else: # The caller is acquire_token_for_client()
1661 result = self._acquire_token_for_client(
1662 scopes, refresh_reason, claims_challenge=claims_challenge,
1663 **kwargs)
1664 if result and "access_token" in result:
1665 result[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP
1666 if (result and "error" not in result) or (not access_token_from_cache):
1667 return result
1668 except http_exceptions:
1669 # Typically network error. Potential AAD outage?
1670 if not access_token_from_cache: # It means there is no fall back option
1671 raise # We choose to bubble up the exception
1672 return access_token_from_cache
1674 def _process_broker_response(self, response, scopes, data):
1675 if "error" not in response:
1676 self.token_cache.add(dict(
1677 client_id=self.client_id,
1678 scope=response["scope"].split() if "scope" in response else scopes,
1679 token_endpoint=self.authority.token_endpoint,
1680 response=response,
1681 data=data,
1682 _account_id=response["_account_id"],
1683 environment=self.authority.instance, # Be consistent with non-broker flows
1684 grant_type=_GRANT_TYPE_BROKER, # A pseudo grant type for TokenCache to mark account_source as broker
1685 ))
1686 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_BROKER
1687 return _clean_up(response)
1689 def _acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family(
1690 self, authority, scopes, account, **kwargs):
1691 query = {
1692 "environment": authority.instance,
1693 "home_account_id": (account or {}).get("home_account_id"),
1694 # "realm": authority.tenant, # AAD RTs are tenant-independent
1695 }
1696 app_metadata = self._get_app_metadata(authority.instance)
1697 if not app_metadata: # Meaning this app is now used for the first time.
1698 # When/if we have a way to directly detect current app's family,
1699 # we'll rewrite this block, to support multiple families.
1700 # For now, we try existing RTs (*). If it works, we are in that family.
1701 # (*) RTs of a different app/family are not supposed to be
1702 # shared with or accessible by us in the first place.
1703 at = self._acquire_token_silent_by_finding_specific_refresh_token(
1704 authority, scopes,
1705 dict(query, family_id="1"), # A hack, we have only 1 family for now
1706 rt_remover=lambda rt_item: None, # NO-OP b/c RTs are likely not mine
1707 break_condition=lambda response: # Break loop when app not in family
1708 # Based on an AAD-only behavior mentioned in internal doc here
1709 # https://msazure.visualstudio.com/One/_git/ESTS-Docs/pullrequest/1138595
1710 "client_mismatch" in response.get("error_additional_info", []),
1711 **kwargs)
1712 if at and "error" not in at:
1713 return at
1714 last_resp = None
1715 if app_metadata.get("family_id"): # Meaning this app belongs to this family
1716 last_resp = at = self._acquire_token_silent_by_finding_specific_refresh_token(
1717 authority, scopes, dict(query, family_id=app_metadata["family_id"]),
1718 **kwargs)
1719 if at and "error" not in at:
1720 return at
1721 # Either this app is an orphan, so we will naturally use its own RT;
1722 # or all attempts above have failed, so we fall back to non-foci behavior.
1723 return self._acquire_token_silent_by_finding_specific_refresh_token(
1724 authority, scopes, dict(query, client_id=self.client_id),
1725 **kwargs) or last_resp
1727 def _get_app_metadata(self, environment):
1728 return self.token_cache._get_app_metadata(
1729 environment=environment, client_id=self.client_id, default={})
1731 def _acquire_token_silent_by_finding_specific_refresh_token(
1732 self, authority, scopes, query,
1733 rt_remover=None, break_condition=lambda response: False,
1734 refresh_reason=None, correlation_id=None, claims_challenge=None,
1735 **kwargs):
1736 matches = list(self.token_cache.search( # We want a list to test emptiness
1737 self.token_cache.CredentialType.REFRESH_TOKEN,
1738 # target=scopes, # AAD RTs are scope-independent
1739 query=query))
1740 logger.debug("Found %d RTs matching %s", len(matches), {
1741 k: _pii_less_home_account_id(v) if k == "home_account_id" and v else v
1742 for k, v in query.items()
1743 })
1745 response = None # A distinguishable value to mean cache is empty
1746 if not matches: # Then exit early to avoid expensive operations
1747 return response
1748 client, _ = self._build_client(
1749 # Potentially expensive if building regional client
1750 self.client_credential, authority, skip_regional_client=True)
1751 telemetry_context = self._build_telemetry_context(
1752 self.ACQUIRE_TOKEN_SILENT_ID,
1753 correlation_id=correlation_id, refresh_reason=refresh_reason)
1754 for entry in sorted( # Since unfit RTs would not be aggressively removed,
1755 # we start from newer RTs which are more likely fit.
1756 matches,
1757 key=lambda e: int(e.get("last_modification_time", "0")),
1758 reverse=True):
1759 logger.debug("Cache attempts an RT")
1760 headers = telemetry_context.generate_headers()
1761 if query.get("home_account_id"): # Then use it as CCS Routing info
1762 headers["X-AnchorMailbox"] = "Oid:{}".format( # case-insensitive value
1763 query["home_account_id"].replace(".", "@"))
1764 response = client.obtain_token_by_refresh_token(
1765 entry, rt_getter=lambda token_item: token_item["secret"],
1766 on_removing_rt=lambda rt_item: None, # Disable RT removal,
1767 # because an invalid_grant could be caused by new MFA policy,
1768 # the RT could still be useful for other MFA-less scope or tenant
1769 on_obtaining_tokens=lambda event: self.token_cache.add(dict(
1770 event,
1771 environment=authority.instance,
1772 skip_account_creation=True, # To honor a concurrent remove_account()
1773 )),
1774 scope=scopes,
1775 headers=headers,
1776 data=dict(
1777 kwargs.pop("data", {}),
1778 claims=_merge_claims_challenge_and_capabilities(
1779 self._client_capabilities, claims_challenge)),
1780 **kwargs)
1781 telemetry_context.update_telemetry(response)
1782 if "error" not in response:
1783 return response
1784 logger.debug("Refresh failed. {error}: {error_description}".format(
1785 error=response.get("error"),
1786 error_description=response.get("error_description"),
1787 ))
1788 if break_condition(response):
1789 break
1790 return response # Returns the latest error (if any), or just None
1792 def _validate_ssh_cert_input_data(self, data):
1793 if data.get("token_type") == "ssh-cert":
1794 if not data.get("req_cnf"):
1795 raise ValueError(
1796 "When requesting an SSH certificate, "
1797 "you must include a string parameter named 'req_cnf' "
1798 "containing the public key in JWK format "
1799 "(https://tools.ietf.org/html/rfc7517).")
1800 if not data.get("key_id"):
1801 raise ValueError(
1802 "When requesting an SSH certificate, "
1803 "you must include a string parameter named 'key_id' "
1804 "which identifies the key in the 'req_cnf' argument.")
1806 def acquire_token_by_refresh_token(self, refresh_token, scopes, **kwargs):
1807 """Acquire token(s) based on a refresh token (RT) obtained from elsewhere.
1809 You use this method only when you have old RTs from elsewhere,
1810 and now you want to migrate them into MSAL.
1811 Calling this method results in new tokens automatically storing into MSAL.
1813 You do NOT need to use this method if you are already using MSAL.
1814 MSAL maintains RT automatically inside its token cache,
1815 and an access token can be retrieved
1816 when you call :func:`~acquire_token_silent`.
1818 :param str refresh_token: The old refresh token, as a string.
1820 :param list scopes:
1821 The scopes associate with this old RT.
1822 Each scope needs to be in the Microsoft identity platform (v2) format.
1823 See `Scopes not resources <https://docs.microsoft.com/en-us/azure/active-directory/develop/migrate-python-adal-msal#scopes-not-resources>`_.
1825 :return:
1826 * A dict contains "error" and some other keys, when error happened.
1827 * A dict contains no "error" key means migration was successful.
1828 """
1829 self._validate_ssh_cert_input_data(kwargs.get("data", {}))
1830 telemetry_context = self._build_telemetry_context(
1831 self.ACQUIRE_TOKEN_BY_REFRESH_TOKEN,
1832 refresh_reason=msal.telemetry.FORCE_REFRESH)
1833 response = _clean_up(self.client.obtain_token_by_refresh_token(
1834 refresh_token,
1835 scope=self._decorate_scope(scopes),
1836 headers=telemetry_context.generate_headers(),
1837 rt_getter=lambda rt: rt,
1838 on_updating_rt=False,
1839 on_removing_rt=lambda rt_item: None, # No OP
1840 **kwargs))
1841 if "access_token" in response:
1842 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP
1843 telemetry_context.update_telemetry(response)
1844 return response
1846 def acquire_token_by_username_password(
1847 self, username, password, scopes, claims_challenge=None,
1848 # Note: We shouldn't need to surface enable_msa_passthrough,
1849 # because this ROPC won't work with MSA account anyway.
1850 auth_scheme=None,
1851 **kwargs):
1852 """Gets a token for a given resource via user credentials.
1854 See this page for constraints of Username Password Flow.
1855 https://github.com/AzureAD/microsoft-authentication-library-for-python/wiki/Username-Password-Authentication
1857 :param str username: Typically a UPN in the form of an email address.
1858 :param str password: The password.
1859 :param list[str] scopes:
1860 Scopes requested to access a protected API (a resource).
1861 :param claims_challenge:
1862 The claims_challenge parameter requests specific claims requested by the resource provider
1863 in the form of a claims_challenge directive in the www-authenticate header to be
1864 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
1865 It is a string of a JSON object which contains lists of claims being requested from these locations.
1867 :param object auth_scheme:
1868 You can provide an ``msal.auth_scheme.PopAuthScheme`` object
1869 so that MSAL will get a Proof-of-Possession (POP) token for you.
1871 New in version 1.26.0.
1873 :return: A dict representing the json response from Microsoft Entra:
1875 - A successful response would contain "access_token" key,
1876 - an error response would contain "error" and usually "error_description".
1878 [Deprecated] This API is deprecated for public client flows and will be
1879 removed in a future release. Use a more secure flow instead.
1880 Migration guide: https://aka.ms/msal-ropc-migration
1882 """
1883 is_confidential_app = self.client_credential or isinstance(
1884 self, ConfidentialClientApplication)
1885 if not is_confidential_app:
1886 warnings.warn("""This API has been deprecated for public client flows, please use a more secure flow.
1887 See https://aka.ms/msal-ropc-migration for migration guidance""", DeprecationWarning)
1888 claims = _merge_claims_challenge_and_capabilities(
1889 self._client_capabilities, claims_challenge)
1890 if self._enable_broker and sys.platform in ("win32", "darwin"):
1891 from .broker import _signin_silently
1892 response = _signin_silently(
1893 "https://{}/{}".format(self.authority.instance, self.authority.tenant),
1894 self.client_id,
1895 scopes, # Decorated scopes won't work due to offline_access
1896 MSALRuntime_Username=username,
1897 MSALRuntime_Password=password,
1898 validateAuthority="no" if (
1899 self.authority._is_known_to_developer
1900 or self._instance_discovery is False) else None,
1901 claims=claims,
1902 auth_scheme=auth_scheme,
1903 )
1904 return self._process_broker_response(response, scopes, kwargs.get("data", {}))
1906 if auth_scheme:
1907 raise ValueError(self._AUTH_SCHEME_UNSUPPORTED)
1908 scopes = self._decorate_scope(scopes)
1909 telemetry_context = self._build_telemetry_context(
1910 self.ACQUIRE_TOKEN_BY_USERNAME_PASSWORD_ID)
1911 headers = telemetry_context.generate_headers()
1912 data = dict(kwargs.pop("data", {}), claims=claims)
1913 response = None
1914 if not self.authority.is_adfs:
1915 user_realm_result = self.authority.user_realm_discovery(
1916 username, correlation_id=headers[msal.telemetry.CLIENT_REQUEST_ID])
1917 if user_realm_result.get("account_type") == "Federated":
1918 response = _clean_up(self._acquire_token_by_username_password_federated(
1919 user_realm_result, username, password, scopes=scopes,
1920 data=data,
1921 headers=headers, **kwargs))
1922 if response is None: # Either ADFS or not federated
1923 response = _clean_up(self.client.obtain_token_by_username_password(
1924 username, password, scope=scopes,
1925 headers=headers,
1926 data=data,
1927 **kwargs))
1928 if "access_token" in response:
1929 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP
1930 telemetry_context.update_telemetry(response)
1931 return response
1933 def _acquire_token_by_username_password_federated(
1934 self, user_realm_result, username, password, scopes=None, **kwargs):
1935 wstrust_endpoint = {}
1936 if user_realm_result.get("federation_metadata_url"):
1937 wstrust_endpoint = mex_send_request(
1938 user_realm_result["federation_metadata_url"],
1939 self.http_client)
1940 if wstrust_endpoint is None:
1941 raise ValueError("Unable to find wstrust endpoint from MEX. "
1942 "This typically happens when attempting MSA accounts. "
1943 "More details available here. "
1944 "https://github.com/AzureAD/microsoft-authentication-library-for-python/wiki/Username-Password-Authentication")
1945 logger.debug("wstrust_endpoint = %s", wstrust_endpoint)
1946 wstrust_result = wst_send_request(
1947 username, password,
1948 user_realm_result.get("cloud_audience_urn", "urn:federation:MicrosoftOnline"),
1949 wstrust_endpoint.get("address",
1950 # Fallback to an AAD supplied endpoint
1951 user_realm_result.get("federation_active_auth_url")),
1952 wstrust_endpoint.get("action"), self.http_client)
1953 if not ("token" in wstrust_result and "type" in wstrust_result):
1954 raise RuntimeError("Unsuccessful RSTR. %s" % wstrust_result)
1955 GRANT_TYPE_SAML1_1 = 'urn:ietf:params:oauth:grant-type:saml1_1-bearer'
1956 grant_type = {
1957 SAML_TOKEN_TYPE_V1: GRANT_TYPE_SAML1_1,
1958 SAML_TOKEN_TYPE_V2: self.client.GRANT_TYPE_SAML2,
1959 WSS_SAML_TOKEN_PROFILE_V1_1: GRANT_TYPE_SAML1_1,
1960 WSS_SAML_TOKEN_PROFILE_V2: self.client.GRANT_TYPE_SAML2
1961 }.get(wstrust_result.get("type"))
1962 if not grant_type:
1963 raise RuntimeError(
1964 "RSTR returned unknown token type: %s", wstrust_result.get("type"))
1965 self.client.grant_assertion_encoders.setdefault( # Register a non-standard type
1966 grant_type, self.client.encode_saml_assertion)
1967 return self.client.obtain_token_by_assertion(
1968 wstrust_result["token"], grant_type, scope=scopes,
1969 on_obtaining_tokens=lambda event: self.token_cache.add(dict(
1970 event,
1971 environment=self.authority.instance,
1972 username=username, # Useful in case IDT contains no such info
1973 )),
1974 **kwargs)
1977class PublicClientApplication(ClientApplication): # browser app or mobile app
1979 DEVICE_FLOW_CORRELATION_ID = "_correlation_id"
1980 CONSOLE_WINDOW_HANDLE = object()
1982 def __init__(
1983 self, client_id, client_credential=None,
1984 *,
1985 enable_broker_on_windows=None,
1986 enable_broker_on_mac=None,
1987 enable_broker_on_linux=None,
1988 enable_broker_on_wsl=None,
1989 **kwargs):
1990 """Same as :func:`ClientApplication.__init__`,
1991 except that ``client_credential`` parameter shall remain ``None``.
1993 .. note::
1995 **What is a broker, and why use it?**
1997 A broker is a component installed on your device.
1998 Broker implicitly gives your device an identity. By using a broker,
1999 your device becomes a factor that can satisfy MFA (Multi-factor authentication).
2000 This factor would become mandatory
2001 if a tenant's admin enables a corresponding Conditional Access (CA) policy.
2002 The broker's presence allows Microsoft identity platform
2003 to have higher confidence that the tokens are being issued to your device,
2004 and that is more secure.
2006 An additional benefit of broker is,
2007 it runs as a long-lived process with your device's OS,
2008 and maintains its own cache,
2009 so that your broker-enabled apps (even a CLI)
2010 could automatically SSO from a previously established signed-in session.
2012 **How to opt in to use broker?**
2014 1. You can set any combination of the following opt-in parameters to true:
2016 +--------------------------+-----------------------------------+------------------------------------------------------------------------------------+
2017 | Opt-in flag | If app will run on | App has registered this as a Desktop platform redirect URI in Azure Portal |
2018 +==========================+===================================+====================================================================================+
2019 | enable_broker_on_windows | Windows 10+ | ms-appx-web://Microsoft.AAD.BrokerPlugin/your_client_id |
2020 +--------------------------+-----------------------------------+------------------------------------------------------------------------------------+
2021 | enable_broker_on_wsl | WSL | ms-appx-web://Microsoft.AAD.BrokerPlugin/your_client_id |
2022 +--------------------------+-----------------------------------+------------------------------------------------------------------------------------+
2023 | enable_broker_on_mac | Mac with Company Portal installed | msauth.com.msauth.unsignedapp://auth |
2024 +--------------------------+-----------------------------------+------------------------------------------------------------------------------------+
2025 | enable_broker_on_linux | Linux with Intune installed | ``https://login.microsoftonline.com/common/oauth2/nativeclient`` (MUST be enabled) |
2026 +--------------------------+-----------------------------------+------------------------------------------------------------------------------------+
2028 2. Install broker dependency,
2029 e.g. ``pip install msal[broker]>=1.33,<2``.
2031 3. Test with ``acquire_token_interactive()`` and ``acquire_token_silent()``.
2033 **The fallback behaviors of MSAL Python's broker support**
2035 MSAL will either error out, or silently fallback to non-broker flows.
2037 1. MSAL will ignore the `enable_broker_...` and bypass broker
2038 on those auth flows that are known to be NOT supported by broker.
2039 This includes ADFS, B2C, etc..
2040 For other "could-use-broker" scenarios, please see below.
2041 2. MSAL errors out when app developer opted-in to use broker
2042 but a direct dependency "mid-tier" package is not installed.
2043 Error message guides app developer to declare the correct dependency
2044 ``msal[broker]``.
2045 We error out here because the error is actionable to app developers.
2046 3. MSAL silently "deactivates" the broker and fallback to non-broker,
2047 when opted-in, dependency installed yet failed to initialize.
2048 We anticipate this would happen on a device whose OS is too old
2049 or the underlying broker component is somehow unavailable.
2050 There is not much an app developer or the end user can do here.
2051 Eventually, the conditional access policy shall
2052 force the user to switch to a different device.
2053 4. MSAL errors out when broker is opted in, installed, initialized,
2054 but subsequent token request(s) failed.
2056 :param boolean enable_broker_on_windows:
2057 This setting is only effective if your app is running on Windows 10+.
2058 This parameter defaults to None, which means MSAL will not utilize a broker.
2060 New in MSAL Python 1.25.0.
2062 :param boolean enable_broker_on_mac:
2063 This setting is only effective if your app is running on Mac.
2064 This parameter defaults to None, which means MSAL will not utilize a broker.
2066 New in MSAL Python 1.31.0.
2068 :param boolean enable_broker_on_linux:
2069 This setting is only effective if your app is running on Linux, including WSL.
2070 This parameter defaults to None, which means MSAL will not utilize a broker.
2072 New in MSAL Python 1.33.0.
2074 :param boolean enable_broker_on_wsl:
2075 This setting is only effective if your app is running on WSL.
2076 This parameter defaults to None, which means MSAL will not utilize a broker.
2078 New in MSAL Python 1.33.0.
2079 """
2080 if client_credential is not None:
2081 raise ValueError("Public Client should not possess credentials")
2083 self._enable_broker = bool(
2084 enable_broker_on_windows and sys.platform == "win32"
2085 or enable_broker_on_mac and sys.platform == "darwin"
2086 or enable_broker_on_linux and sys.platform == "linux"
2087 or enable_broker_on_wsl and is_wsl()
2088 )
2090 super(PublicClientApplication, self).__init__(
2091 client_id, client_credential=None, **kwargs)
2093 def acquire_token_interactive(
2094 self,
2095 scopes, # type: list[str]
2096 prompt=None,
2097 login_hint=None, # type: Optional[str]
2098 domain_hint=None, # type: Optional[str]
2099 claims_challenge=None,
2100 timeout=None,
2101 port=None,
2102 extra_scopes_to_consent=None,
2103 max_age=None,
2104 parent_window_handle=None,
2105 on_before_launching_ui=None,
2106 auth_scheme=None,
2107 **kwargs):
2108 """Acquire token interactively i.e. via a local browser.
2110 Prerequisite: In Azure Portal, configure the Redirect URI of your
2111 "Mobile and Desktop application" as ``http://localhost``.
2112 If you opts in to use broker during ``PublicClientApplication`` creation,
2113 your app also need this Redirect URI:
2114 ``ms-appx-web://Microsoft.AAD.BrokerPlugin/YOUR_CLIENT_ID``
2116 :param list scopes:
2117 It is a list of case-sensitive strings.
2118 :param str prompt:
2119 By default, no prompt value will be sent, not even string ``"none"``.
2120 You will have to specify a value explicitly.
2121 Its valid values are the constants defined in
2122 :class:`Prompt <msal.Prompt>`.
2123 :param str login_hint:
2124 Optional. Identifier of the user. Generally a User Principal Name (UPN).
2125 :param domain_hint:
2126 Can be one of "consumers" or "organizations" or your tenant domain "contoso.com".
2127 If included, it will skip the email-based discovery process that user goes
2128 through on the sign-in page, leading to a slightly more streamlined user experience.
2129 More information on possible values available in
2130 `Auth Code Flow doc <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-auth-code-flow#request-an-authorization-code>`_ and
2131 `domain_hint doc <https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-oapx/86fb452d-e34a-494e-ac61-e526e263b6d8>`_.
2133 :param claims_challenge:
2134 The claims_challenge parameter requests specific claims requested by the resource provider
2135 in the form of a claims_challenge directive in the www-authenticate header to be
2136 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
2137 It is a string of a JSON object which contains lists of claims being requested from these locations.
2139 :param int timeout:
2140 This method will block the current thread.
2141 This parameter specifies the timeout value in seconds.
2142 Default value ``None`` means wait indefinitely.
2144 :param int port:
2145 The port to be used to listen to an incoming auth response.
2146 By default we will use a system-allocated port.
2147 (The rest of the redirect_uri is hard coded as ``http://localhost``.)
2149 :param list extra_scopes_to_consent:
2150 "Extra scopes to consent" is a concept only available in Microsoft Entra.
2151 It refers to other resources you might want to prompt to consent for,
2152 in the same interaction, but for which you won't get back a
2153 token for in this particular operation.
2155 :param int max_age:
2156 OPTIONAL. Maximum Authentication Age.
2157 Specifies the allowable elapsed time in seconds
2158 since the last time the End-User was actively authenticated.
2159 If the elapsed time is greater than this value,
2160 Microsoft identity platform will actively re-authenticate the End-User.
2162 MSAL Python will also automatically validate the auth_time in ID token.
2164 New in version 1.15.
2166 :param int parent_window_handle:
2167 OPTIONAL.
2169 * If your app does not opt in to use broker,
2170 you do not need to provide a ``parent_window_handle`` here.
2172 * If your app opts in to use broker,
2173 ``parent_window_handle`` is required.
2175 - If your app is a GUI app running on Windows or Mac system,
2176 you are required to also provide its window handle,
2177 so that the sign-in window will pop up on top of your window.
2178 - If your app is a console app running on Windows or Mac system,
2179 you can use a placeholder
2180 ``PublicClientApplication.CONSOLE_WINDOW_HANDLE``.
2182 Most Python scripts are console apps.
2184 New in version 1.20.0.
2186 :param function on_before_launching_ui:
2187 A callback with the form of
2188 ``lambda ui="xyz", **kwargs: print("A {} will be launched".format(ui))``,
2189 where ``ui`` will be either "browser" or "broker".
2190 You can use it to inform your end user to expect a pop-up window.
2192 New in version 1.20.0.
2194 :param object auth_scheme:
2195 You can provide an ``msal.auth_scheme.PopAuthScheme`` object
2196 so that MSAL will get a Proof-of-Possession (POP) token for you.
2198 New in version 1.26.0.
2200 :return:
2201 - A dict containing no "error" key,
2202 and typically contains an "access_token" key.
2203 - A dict containing an "error" key, when token refresh failed.
2204 """
2205 data = kwargs.pop("data", {})
2206 enable_msa_passthrough = kwargs.pop( # MUST remove it from kwargs
2207 "enable_msa_passthrough", # Keep it as a hidden param, for now.
2208 # OPTIONAL. MSA-Passthrough is a legacy configuration,
2209 # needed by a small amount of Microsoft first-party apps,
2210 # which would login MSA accounts via ".../organizations" authority.
2211 # If you app belongs to this category, AND you are enabling broker,
2212 # you would want to enable this flag. Default value is False.
2213 # More background of MSA-PT is available from this internal docs:
2214 # https://microsoft.sharepoint.com/:w:/t/Identity-DevEx/EatIUauX3c9Ctw1l7AQ6iM8B5CeBZxc58eoQCE0IuZ0VFw?e=tgc3jP&CID=39c853be-76ea-79d7-ee73-f1b2706ede05
2215 False
2216 ) and data.get("token_type") != "ssh-cert" # Work around a known issue as of PyMsalRuntime 0.8
2217 self._validate_ssh_cert_input_data(data)
2218 is_ssh_cert_or_pop_request = _is_ssh_cert_or_pop_request(data.get("token_type"), auth_scheme)
2220 if not on_before_launching_ui:
2221 on_before_launching_ui = lambda **kwargs: None
2222 if _is_running_in_cloud_shell() and prompt == "none":
2223 # Note: _acquire_token_by_cloud_shell() is always silent,
2224 # so we would not fire on_before_launching_ui()
2225 return self._acquire_token_by_cloud_shell(scopes, data=data)
2226 claims = _merge_claims_challenge_and_capabilities(
2227 self._client_capabilities, claims_challenge)
2228 if self._enable_broker and (sys.platform in ("win32", "darwin") or not is_ssh_cert_or_pop_request):
2229 if parent_window_handle is None:
2230 raise ValueError(
2231 "parent_window_handle is required when you opted into using broker. "
2232 "You need to provide the window handle of your GUI application, "
2233 "or use msal.PublicClientApplication.CONSOLE_WINDOW_HANDLE "
2234 "when and only when your application is a console app.")
2235 if extra_scopes_to_consent:
2236 logger.warning(
2237 "Ignoring parameter extra_scopes_to_consent, "
2238 "which is not supported by broker")
2239 response = self._acquire_token_interactive_via_broker(
2240 scopes,
2241 parent_window_handle,
2242 enable_msa_passthrough,
2243 claims,
2244 data,
2245 on_before_launching_ui,
2246 auth_scheme,
2247 prompt=prompt,
2248 login_hint=login_hint,
2249 max_age=max_age,
2250 )
2251 return self._process_broker_response(response, scopes, data)
2253 if isinstance(auth_scheme, msal.auth_scheme.PopAuthScheme) and sys.platform == "linux":
2254 raise ValueError("POP is not supported on Linux")
2255 elif auth_scheme:
2256 raise ValueError(self._AUTH_SCHEME_UNSUPPORTED)
2257 on_before_launching_ui(ui="browser")
2258 telemetry_context = self._build_telemetry_context(
2259 self.ACQUIRE_TOKEN_INTERACTIVE)
2260 response = _clean_up(self.client.obtain_token_by_browser(
2261 scope=self._decorate_scope(scopes) if scopes else None,
2262 extra_scope_to_consent=extra_scopes_to_consent,
2263 redirect_uri="http://localhost:{port}".format(
2264 # Hardcode the host, for now. AAD portal rejects 127.0.0.1 anyway
2265 port=port or 0),
2266 prompt=prompt,
2267 login_hint=login_hint,
2268 max_age=max_age,
2269 timeout=timeout,
2270 auth_params={
2271 "claims": claims,
2272 "domain_hint": domain_hint,
2273 },
2274 data=dict(data, claims=claims),
2275 headers=telemetry_context.generate_headers(),
2276 browser_name=_preferred_browser(),
2277 **kwargs))
2278 if "access_token" in response:
2279 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP
2280 telemetry_context.update_telemetry(response)
2281 return response
2283 def _acquire_token_interactive_via_broker(
2284 self,
2285 scopes, # type: list[str]
2286 parent_window_handle, # type: int
2287 enable_msa_passthrough, # type: boolean
2288 claims, # type: str
2289 data, # type: dict
2290 on_before_launching_ui, # type: callable
2291 auth_scheme, # type: object
2292 prompt=None,
2293 login_hint=None, # type: Optional[str]
2294 max_age=None,
2295 **kwargs):
2296 from .broker import _signin_interactively, _signin_silently, _acquire_token_silently
2297 if "welcome_template" in kwargs:
2298 logger.debug(kwargs["welcome_template"]) # Experimental
2299 authority = "https://{}/{}".format(
2300 self.authority.instance, self.authority.tenant)
2301 validate_authority = "no" if (
2302 self.authority._is_known_to_developer
2303 or self._instance_discovery is False) else None
2304 # Calls different broker methods to mimic the OIDC behaviors
2305 if login_hint and prompt != "select_account": # OIDC prompts when the user did not sign in
2306 accounts = self.get_accounts(username=login_hint)
2307 if len(accounts) == 1: # Unambiguously proceed with this account
2308 logger.debug("Calling broker._acquire_token_silently()")
2309 response = _acquire_token_silently( # When it works, it bypasses prompt
2310 authority,
2311 self.client_id,
2312 accounts[0]["local_account_id"],
2313 scopes,
2314 claims=claims,
2315 auth_scheme=auth_scheme,
2316 **data)
2317 if response and "error" not in response:
2318 return response
2319 # login_hint undecisive or not exists
2320 if prompt == "none" or not prompt: # Must/Can attempt _signin_silently()
2321 logger.debug("Calling broker._signin_silently()")
2322 response = _signin_silently( # Unlike OIDC, it doesn't honor login_hint
2323 authority, self.client_id, scopes,
2324 validateAuthority=validate_authority,
2325 claims=claims,
2326 max_age=max_age,
2327 enable_msa_pt=enable_msa_passthrough,
2328 auth_scheme=auth_scheme,
2329 **data)
2330 is_wrong_account = bool(
2331 # _signin_silently() only gets tokens for default account,
2332 # but this seems to have been fixed in PyMsalRuntime 0.11.2
2333 "access_token" in response and login_hint
2334 and login_hint != response.get(
2335 "id_token_claims", {}).get("preferred_username"))
2336 wrong_account_error_message = (
2337 'prompt="none" will not work for login_hint="non-default-user"')
2338 if is_wrong_account:
2339 logger.debug(wrong_account_error_message)
2340 if prompt == "none":
2341 return response if not is_wrong_account else {
2342 "error": "broker_error",
2343 "error_description": wrong_account_error_message,
2344 }
2345 else:
2346 assert bool(prompt) is False
2347 from pymsalruntime import Response_Status
2348 recoverable_errors = frozenset([
2349 Response_Status.Status_AccountUnusable,
2350 Response_Status.Status_InteractionRequired,
2351 ])
2352 if is_wrong_account or "error" in response and response.get(
2353 "_broker_status") in recoverable_errors:
2354 pass # It will fall back to the _signin_interactively()
2355 else:
2356 return response
2358 logger.debug("Falls back to broker._signin_interactively()")
2359 on_before_launching_ui(ui="broker")
2360 return _signin_interactively(
2361 authority, self.client_id, scopes,
2362 None if parent_window_handle is self.CONSOLE_WINDOW_HANDLE
2363 else parent_window_handle,
2364 validateAuthority=validate_authority,
2365 login_hint=login_hint,
2366 prompt=prompt,
2367 claims=claims,
2368 max_age=max_age,
2369 enable_msa_pt=enable_msa_passthrough,
2370 auth_scheme=auth_scheme,
2371 **data)
2373 def initiate_device_flow(self, scopes=None, *, claims_challenge=None, **kwargs):
2374 """Initiate a Device Flow instance,
2375 which will be used in :func:`~acquire_token_by_device_flow`.
2377 :param list[str] scopes:
2378 Scopes requested to access a protected API (a resource).
2379 :return: A dict representing a newly created Device Flow object.
2381 - A successful response would contain "user_code" key, among others
2382 - an error response would contain some other readable key/value pairs.
2383 """
2384 correlation_id = msal.telemetry._get_new_correlation_id()
2385 flow = self.client.initiate_device_flow(
2386 scope=self._decorate_scope(scopes or []),
2387 headers={msal.telemetry.CLIENT_REQUEST_ID: correlation_id},
2388 data={"claims": _merge_claims_challenge_and_capabilities(
2389 self._client_capabilities, claims_challenge)},
2390 **kwargs)
2391 flow[self.DEVICE_FLOW_CORRELATION_ID] = correlation_id
2392 return flow
2394 def acquire_token_by_device_flow(self, flow, claims_challenge=None, **kwargs):
2395 """Obtain token by a device flow object, with customizable polling effect.
2397 :param dict flow:
2398 A dict previously generated by :func:`~initiate_device_flow`.
2399 By default, this method's polling effect will block current thread.
2400 You can abort the polling loop at any time,
2401 by changing the value of the flow's "expires_at" key to 0.
2402 :param claims_challenge:
2403 The claims_challenge parameter requests specific claims requested by the resource provider
2404 in the form of a claims_challenge directive in the www-authenticate header to be
2405 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
2406 It is a string of a JSON object which contains lists of claims being requested from these locations.
2408 :return: A dict representing the json response from Microsoft Entra:
2410 - A successful response would contain "access_token" key,
2411 - an error response would contain "error" and usually "error_description".
2412 """
2413 telemetry_context = self._build_telemetry_context(
2414 self.ACQUIRE_TOKEN_BY_DEVICE_FLOW_ID,
2415 correlation_id=flow.get(self.DEVICE_FLOW_CORRELATION_ID))
2416 response = _clean_up(self.client.obtain_token_by_device_flow(
2417 flow,
2418 data=dict(
2419 kwargs.pop("data", {}),
2420 code=flow["device_code"], # 2018-10-4 Hack:
2421 # during transition period,
2422 # service seemingly need both device_code and code parameter.
2423 claims=_merge_claims_challenge_and_capabilities(
2424 self._client_capabilities, claims_challenge),
2425 ),
2426 headers=telemetry_context.generate_headers(),
2427 **kwargs))
2428 if "access_token" in response:
2429 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP
2430 telemetry_context.update_telemetry(response)
2431 return response
2434class ConfidentialClientApplication(ClientApplication): # server-side web app
2435 """Same as :func:`ClientApplication.__init__`,
2436 except that ``allow_broker`` parameter shall remain ``None``.
2437 """
2439 def acquire_token_for_client(self, scopes, claims_challenge=None, **kwargs):
2440 """Acquires token for the current confidential client, not for an end user.
2442 Since MSAL Python 1.23, it will automatically look for token from cache,
2443 and only send request to Identity Provider when cache misses.
2445 :param list[str] scopes: (Required)
2446 Scopes requested to access a protected API (a resource).
2447 :param claims_challenge:
2448 The claims_challenge parameter requests specific claims requested by the resource provider
2449 in the form of a claims_challenge directive in the www-authenticate header to be
2450 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
2451 It is a string of a JSON object which contains lists of claims being requested from these locations.
2453 :return: A dict representing the json response from Microsoft Entra:
2455 - A successful response would contain "access_token" key,
2456 - an error response would contain "error" and usually "error_description".
2457 """
2458 if kwargs.get("force_refresh"):
2459 raise ValueError( # We choose to disallow force_refresh
2460 "Historically, this method does not support force_refresh behavior. "
2461 )
2462 return _clean_up(self._acquire_token_silent_with_error(
2463 scopes, None, claims_challenge=claims_challenge, **kwargs))
2465 def _acquire_token_for_client(
2466 self,
2467 scopes,
2468 refresh_reason,
2469 claims_challenge=None,
2470 **kwargs
2471 ):
2472 if self.authority.tenant.lower() in ["common", "organizations"]:
2473 warnings.warn(
2474 "Using /common or /organizations authority "
2475 "in acquire_token_for_client() is unreliable. "
2476 "Please use a specific tenant instead.", DeprecationWarning)
2477 self._validate_ssh_cert_input_data(kwargs.get("data", {}))
2478 telemetry_context = self._build_telemetry_context(
2479 self.ACQUIRE_TOKEN_FOR_CLIENT_ID, refresh_reason=refresh_reason)
2480 client = self._regional_client or self.client
2481 response = client.obtain_token_for_client(
2482 scope=scopes, # This grant flow requires no scope decoration
2483 headers=telemetry_context.generate_headers(),
2484 data=dict(
2485 kwargs.pop("data", {}),
2486 claims=_merge_claims_challenge_and_capabilities(
2487 self._client_capabilities, claims_challenge)),
2488 **kwargs)
2489 telemetry_context.update_telemetry(response)
2490 return response
2492 def remove_tokens_for_client(self):
2493 """Remove all tokens that were previously acquired via
2494 :func:`~acquire_token_for_client()` for the current client."""
2495 for env in [self.authority.instance] + self._get_authority_aliases(
2496 self.authority.instance):
2497 for at in list(self.token_cache.search( # Remove ATs from a snapshot
2498 TokenCache.CredentialType.ACCESS_TOKEN, query={
2499 "client_id": self.client_id,
2500 "environment": env,
2501 "home_account_id": None, # These are mostly app-only tokens
2502 })):
2503 self.token_cache.remove_at(at)
2504 # acquire_token_for_client() obtains no RTs, so we have no RT to remove
2506 def acquire_token_on_behalf_of(self, user_assertion, scopes, claims_challenge=None, **kwargs):
2507 """Acquires token using on-behalf-of (OBO) flow.
2509 The current app is a middle-tier service which was called with a token
2510 representing an end user.
2511 The current app can use such token (a.k.a. a user assertion) to request
2512 another token to access downstream web API, on behalf of that user.
2513 See `detail docs here <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-on-behalf-of-flow>`_ .
2515 The current middle-tier app has no user interaction to obtain consent.
2516 See how to gain consent upfront for your middle-tier app from this article.
2517 https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-on-behalf-of-flow#gaining-consent-for-the-middle-tier-application
2519 :param str user_assertion: The incoming token already received by this app
2520 :param list[str] scopes: Scopes required by downstream API (a resource).
2521 :param claims_challenge:
2522 The claims_challenge parameter requests specific claims requested by the resource provider
2523 in the form of a claims_challenge directive in the www-authenticate header to be
2524 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
2525 It is a string of a JSON object which contains lists of claims being requested from these locations.
2527 :return: A dict representing the json response from Microsoft Entra:
2529 - A successful response would contain "access_token" key,
2530 - an error response would contain "error" and usually "error_description".
2531 """
2532 telemetry_context = self._build_telemetry_context(
2533 self.ACQUIRE_TOKEN_ON_BEHALF_OF_ID)
2534 # The implementation is NOT based on Token Exchange (RFC 8693)
2535 response = _clean_up(self.client.obtain_token_by_assertion( # bases on assertion RFC 7521
2536 user_assertion,
2537 self.client.GRANT_TYPE_JWT, # IDTs and AAD ATs are all JWTs
2538 scope=self._decorate_scope(scopes), # Decoration is used for:
2539 # 1. Explicitly requesting an RT, without relying on AAD default
2540 # behavior, even though it currently still issues an RT.
2541 # 2. Requesting an IDT (which would otherwise be unavailable)
2542 # so that the calling app could use id_token_claims to implement
2543 # their own cache mapping, which is likely needed in web apps.
2544 data=dict(
2545 kwargs.pop("data", {}),
2546 requested_token_use="on_behalf_of",
2547 claims=_merge_claims_challenge_and_capabilities(
2548 self._client_capabilities, claims_challenge)),
2549 headers=telemetry_context.generate_headers(),
2550 # TBD: Expose a login_hint (or ccs_routing_hint) param for web app
2551 **kwargs))
2552 if "access_token" in response:
2553 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP
2554 telemetry_context.update_telemetry(response)
2555 return response