Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/msal/application.py: 16%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import functools
2import json
3import time
4import logging
5import sys
6import warnings
7from threading import Lock
8from typing import Optional # Needed in Python 3.7 & 3.8
9from urllib.parse import urlparse
10import os
12from .oauth2cli import Client, JwtAssertionCreator
13from .oauth2cli.oidc import decode_part
14from .authority import Authority, WORLD_WIDE
15from .mex import send_request as mex_send_request
16from .wstrust_request import send_request as wst_send_request
17from .wstrust_response import *
18from .token_cache import TokenCache, _get_username, _GRANT_TYPE_BROKER
19import msal.telemetry
20from .region import _detect_region
21from .throttled_http_client import ThrottledHttpClient
22from .cloudshell import _is_running_in_cloud_shell
23from .sku import SKU, __version__
24from .oauth2cli.authcode import is_wsl
27logger = logging.getLogger(__name__)
28_AUTHORITY_TYPE_CLOUDSHELL = "CLOUDSHELL"
30def _init_broker(enable_pii_log): # Make it a function to allow mocking
31 from . import broker # Trigger Broker's initialization, lazily
32 if enable_pii_log:
33 broker._enable_pii_log()
35def extract_certs(public_cert_content):
36 # Parses raw public certificate file contents and returns a list of strings
37 # Usage: headers = {"x5c": extract_certs(open("my_cert.pem").read())}
38 public_certificates = re.findall(
39 r'-----BEGIN CERTIFICATE-----(?P<cert_value>[^-]+)-----END CERTIFICATE-----',
40 public_cert_content, re.I)
41 if public_certificates:
42 return [cert.strip() for cert in public_certificates]
43 # The public cert tags are not found in the input,
44 # let's make best effort to exclude a private key pem file.
45 if "PRIVATE KEY" in public_cert_content:
46 raise ValueError(
47 "We expect your public key but detect a private key instead")
48 return [public_cert_content.strip()]
51def _merge_claims_challenge_and_capabilities(capabilities, claims_challenge):
52 # Represent capabilities as {"access_token": {"xms_cc": {"values": capabilities}}}
53 # and then merge/add it into incoming claims
54 if not capabilities:
55 return claims_challenge
56 claims_dict = json.loads(claims_challenge) if claims_challenge else {}
57 for key in ["access_token"]: # We could add "id_token" if we'd decide to
58 claims_dict.setdefault(key, {}).update(xms_cc={"values": capabilities})
59 return json.dumps(claims_dict)
62def _str2bytes(raw):
63 # A conversion based on duck-typing rather than six.text_type
64 try:
65 return raw.encode(encoding="utf-8")
66 except:
67 return raw
69def _extract_cert_and_thumbprints(cert):
70 # Cert concepts https://security.stackexchange.com/a/226758/125264
71 from cryptography.hazmat.primitives import hashes, serialization
72 cert_pem = cert.public_bytes( # Requires cryptography 1.0+
73 encoding=serialization.Encoding.PEM).decode()
74 x5c = [
75 '\n'.join(
76 cert_pem.splitlines()
77 [1:-1] # Strip the "--- header ---" and "--- footer ---"
78 )
79 ]
80 # https://cryptography.io/en/latest/x509/reference/#x-509-certificate-object
81 sha256_thumbprint = cert.fingerprint(hashes.SHA256()).hex() # Requires cryptography 0.7+
82 sha1_thumbprint = cert.fingerprint(hashes.SHA1()).hex() # Requires cryptography 0.7+
83 return sha256_thumbprint, sha1_thumbprint, x5c
85def _parse_pfx(pfx_path, passphrase_bytes):
86 # Cert concepts https://security.stackexchange.com/a/226758/125264
87 from cryptography.hazmat.primitives.serialization import pkcs12
88 with open(pfx_path, 'rb') as f:
89 private_key, cert, _ = pkcs12.load_key_and_certificates( # cryptography 2.5+
90 # https://cryptography.io/en/latest/hazmat/primitives/asymmetric/serialization/#cryptography.hazmat.primitives.serialization.pkcs12.load_key_and_certificates
91 f.read(), passphrase_bytes)
92 if not (private_key and cert):
93 raise ValueError("Your PFX file shall contain both private key and cert")
94 sha256_thumbprint, sha1_thumbprint, x5c = _extract_cert_and_thumbprints(cert)
95 return private_key, sha256_thumbprint, sha1_thumbprint, x5c
98def _load_private_key_from_pem_str(private_key_pem_str, passphrase_bytes):
99 from cryptography.hazmat.primitives import serialization
100 from cryptography.hazmat.backends import default_backend
101 return serialization.load_pem_private_key( # cryptography 0.6+
102 _str2bytes(private_key_pem_str),
103 passphrase_bytes,
104 backend=default_backend(), # It was a required param until 2020
105 )
108def _pii_less_home_account_id(home_account_id):
109 parts = home_account_id.split(".") # It could contain one or two parts
110 parts[0] = "********"
111 return ".".join(parts)
114def _clean_up(result):
115 if isinstance(result, dict):
116 if "_msalruntime_telemetry" in result or "_msal_python_telemetry" in result:
117 result["msal_telemetry"] = json.dumps({ # Telemetry as an opaque string
118 "msalruntime_telemetry": result.get("_msalruntime_telemetry"),
119 "msal_python_telemetry": result.get("_msal_python_telemetry"),
120 }, separators=(",", ":"))
121 return_value = {
122 k: result[k] for k in result
123 if k != "refresh_in" # MSAL handled refresh_in, customers need not
124 and not k.startswith('_') # Skim internal properties
125 }
126 if "refresh_in" in result: # To encourage proactive refresh
127 return_value["refresh_on"] = int(time.time() + result["refresh_in"])
128 return return_value
129 return result # It could be None
132def _preferred_browser():
133 """Register Edge and return a name suitable for subsequent webbrowser.get(...)
134 when appropriate. Otherwise return None.
135 """
136 # On Linux, only Edge will provide device-based Conditional Access support
137 if sys.platform != "linux": # On other platforms, we have no browser preference
138 return None
139 browser_path = "/usr/bin/microsoft-edge" # Use a full path owned by sys admin
140 # Note: /usr/bin/microsoft-edge, /usr/bin/microsoft-edge-stable, etc.
141 # are symlinks that point to the actual binaries which are found under
142 # /opt/microsoft/msedge/msedge or /opt/microsoft/msedge-beta/msedge.
143 # Either method can be used to detect an Edge installation.
144 user_has_no_preference = "BROWSER" not in os.environ
145 user_wont_mind_edge = "microsoft-edge" in os.environ.get("BROWSER", "") # Note:
146 # BROWSER could contain "microsoft-edge" or "/path/to/microsoft-edge".
147 # Python documentation (https://docs.python.org/3/library/webbrowser.html)
148 # does not document the name being implicitly register,
149 # so there is no public API to know whether the ENV VAR browser would work.
150 # Therefore, we would not bother examine the env var browser's type.
151 # We would just register our own Edge instance.
152 if (user_has_no_preference or user_wont_mind_edge) and os.path.exists(browser_path):
153 try:
154 import webbrowser # Lazy import. Some distro may not have this.
155 browser_name = "msal-edge" # Avoid popular name "microsoft-edge"
156 # otherwise `BROWSER="microsoft-edge"; webbrowser.get("microsoft-edge")`
157 # would return a GenericBrowser instance which won't work.
158 try:
159 registration_available = isinstance(
160 webbrowser.get(browser_name), webbrowser.BackgroundBrowser)
161 except webbrowser.Error:
162 registration_available = False
163 if not registration_available:
164 logger.debug("Register %s with %s", browser_name, browser_path)
165 # By registering our own browser instance with our own name,
166 # rather than populating a process-wide BROWSER enn var,
167 # this approach does not have side effect on non-MSAL code path.
168 webbrowser.register( # Even double-register happens to work fine
169 browser_name, None, webbrowser.BackgroundBrowser(browser_path))
170 return browser_name
171 except ImportError:
172 pass # We may still proceed
173 return None
175def _is_ssh_cert_or_pop_request(token_type, auth_scheme) -> bool:
176 return token_type == "ssh-cert" or token_type == "pop" or isinstance(auth_scheme, msal.auth_scheme.PopAuthScheme)
178class _ClientWithCcsRoutingInfo(Client):
180 def initiate_auth_code_flow(self, **kwargs):
181 if kwargs.get("login_hint"): # eSTS could have utilized this as-is, but nope
182 kwargs["X-AnchorMailbox"] = "UPN:%s" % kwargs["login_hint"]
183 return super(_ClientWithCcsRoutingInfo, self).initiate_auth_code_flow(
184 client_info=1, # To be used as CSS Routing info
185 **kwargs)
187 def obtain_token_by_auth_code_flow(
188 self, auth_code_flow, auth_response, **kwargs):
189 # Note: the obtain_token_by_browser() is also covered by this
190 assert isinstance(auth_code_flow, dict) and isinstance(auth_response, dict)
191 headers = kwargs.pop("headers", {})
192 client_info = json.loads(
193 decode_part(auth_response["client_info"])
194 ) if auth_response.get("client_info") else {}
195 if "uid" in client_info and "utid" in client_info:
196 # Note: The value of X-AnchorMailbox is also case-insensitive
197 headers["X-AnchorMailbox"] = "Oid:{uid}@{utid}".format(**client_info)
198 return super(_ClientWithCcsRoutingInfo, self).obtain_token_by_auth_code_flow(
199 auth_code_flow, auth_response, headers=headers, **kwargs)
201 def obtain_token_by_username_password(self, username, password, **kwargs):
202 headers = kwargs.pop("headers", {})
203 headers["X-AnchorMailbox"] = "upn:{}".format(username)
204 return super(_ClientWithCcsRoutingInfo, self).obtain_token_by_username_password(
205 username, password, headers=headers, **kwargs)
208def _msal_extension_check():
209 # Can't run this in module or class level otherwise you'll get circular import error
210 try:
211 from msal_extensions import __version__ as v
212 major, minor, _ = v.split(".", maxsplit=3)
213 if not (int(major) >= 1 and int(minor) >= 2):
214 warnings.warn(
215 "Please upgrade msal-extensions. "
216 "Only msal-extensions 1.2+ can work with msal 1.30+")
217 except ImportError:
218 pass # The optional msal_extensions is not installed. Business as usual.
219 except ValueError:
220 logger.exception(f"msal_extensions version {v} not in major.minor.patch format")
221 except:
222 logger.exception(
223 "Unable to import msal_extensions during an optional check. "
224 "This exception can be safely ignored."
225 )
228class ClientApplication(object):
229 """You do not usually directly use this class. Use its subclasses instead:
230 :class:`PublicClientApplication` and :class:`ConfidentialClientApplication`.
231 """
232 ACQUIRE_TOKEN_SILENT_ID = "84"
233 ACQUIRE_TOKEN_BY_REFRESH_TOKEN = "85"
234 ACQUIRE_TOKEN_BY_USERNAME_PASSWORD_ID = "301"
235 ACQUIRE_TOKEN_ON_BEHALF_OF_ID = "523"
236 ACQUIRE_TOKEN_BY_DEVICE_FLOW_ID = "622"
237 ACQUIRE_TOKEN_FOR_CLIENT_ID = "730"
238 ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID = "832"
239 ACQUIRE_TOKEN_INTERACTIVE = "169"
240 GET_ACCOUNTS_ID = "902"
241 REMOVE_ACCOUNT_ID = "903"
243 ATTEMPT_REGION_DISCOVERY = True # "TryAutoDetect"
244 DISABLE_MSAL_FORCE_REGION = False # Used in azure_region to disable MSAL_FORCE_REGION behavior
245 _TOKEN_SOURCE = "token_source"
246 _TOKEN_SOURCE_IDP = "identity_provider"
247 _TOKEN_SOURCE_CACHE = "cache"
248 _TOKEN_SOURCE_BROKER = "broker"
250 _enable_broker = False
251 _AUTH_SCHEME_UNSUPPORTED = (
252 "auth_scheme is currently only available from broker. "
253 "You can enable broker by following these instructions. "
254 "https://msal-python.readthedocs.io/en/latest/#publicclientapplication")
256 def __init__(
257 self, client_id,
258 client_credential=None, authority=None, validate_authority=True,
259 token_cache=None,
260 http_client=None,
261 verify=True, proxies=None, timeout=None,
262 client_claims=None, app_name=None, app_version=None,
263 client_capabilities=None,
264 azure_region=None, # Note: We choose to add this param in this base class,
265 # despite it is currently only needed by ConfidentialClientApplication.
266 # This way, it holds the same positional param place for PCA,
267 # when we would eventually want to add this feature to PCA in future.
268 exclude_scopes=None,
269 http_cache=None,
270 instance_discovery=None,
271 allow_broker=None,
272 enable_pii_log=None,
273 oidc_authority=None,
274 ):
275 """Create an instance of application.
277 :param str client_id: Your app has a client_id after you register it on Microsoft Entra admin center.
279 :param client_credential:
280 For :class:`PublicClientApplication`, you use `None` here.
282 For :class:`ConfidentialClientApplication`,
283 it supports many different input formats for different scenarios.
285 .. admonition:: Support using a client secret.
287 Just feed in a string, such as ``"your client secret"``.
289 .. admonition:: Support using a certificate in X.509 (.pem) format
291 Deprecated because it uses SHA-1 thumbprint,
292 unless you are still using ADFS which supports SHA-1 thumbprint only.
293 Please use the .pfx option documented later in this page.
295 Feed in a dict in this form::
297 {
298 "private_key": "...-----BEGIN PRIVATE KEY-----... in PEM format",
299 "thumbprint": "An SHA-1 thumbprint such as A1B2C3D4E5F6..."
300 "Changed in version 1.35.0, if thumbprint is absent"
301 "and a public_certificate is present, MSAL will"
302 "automatically calculate an SHA-256 thumbprint instead.",
303 "passphrase": "Needed if the private_key is encrypted (Added in version 1.6.0)",
304 "public_certificate": "...-----BEGIN CERTIFICATE-----...", # Needed if you use Subject Name/Issuer auth. Added in version 0.5.0.
305 }
307 MSAL Python requires a "private_key" in PEM format.
308 If your cert is in PKCS12 (.pfx) format,
309 you can convert it to X.509 (.pem) format,
310 by ``openssl pkcs12 -in file.pfx -out file.pem -nodes``.
312 The thumbprint is available in your app's registration in Azure Portal.
313 Alternatively, you can `calculate the thumbprint <https://github.com/Azure/azure-sdk-for-python/blob/07d10639d7e47f4852eaeb74aef5d569db499d6e/sdk/identity/azure-identity/azure/identity/_credentials/certificate.py#L94-L97>`_.
315 ``public_certificate`` (optional) is public key certificate
316 which will be sent through 'x5c' JWT header.
317 This is useful when you use `Subject Name/Issuer Authentication
318 <https://github.com/AzureAD/microsoft-authentication-library-for-python/issues/60>`_
319 which is an approach to allow easier certificate rotation.
320 Per `specs <https://tools.ietf.org/html/rfc7515#section-4.1.6>`_,
321 "the certificate containing
322 the public key corresponding to the key used to digitally sign the
323 JWS MUST be the first certificate. This MAY be followed by
324 additional certificates, with each subsequent certificate being the
325 one used to certify the previous one."
326 However, your certificate's issuer may use a different order.
327 So, if your attempt ends up with an error AADSTS700027 -
328 "The provided signature value did not match the expected signature value",
329 you may try use only the leaf cert (in PEM/str format) instead.
331 .. admonition:: Supporting raw assertion obtained from elsewhere
333 *Added in version 1.13.0*:
334 It can also be a completely pre-signed assertion that you've assembled yourself.
335 Simply pass a container containing only the key "client_assertion", like this::
337 {
338 "client_assertion": "...a JWT with claims aud, exp, iss, jti, nbf, and sub..."
339 }
341 .. admonition:: Supporting reading client certificates from PFX files
343 This usage will automatically use SHA-256 thumbprint of the certificate.
345 *Added in version 1.29.0*:
346 Feed in a dictionary containing the path to a PFX file::
348 {
349 "private_key_pfx_path": "/path/to/your.pfx", # Added in version 1.29.0
350 "public_certificate": True, # Only needed if you use Subject Name/Issuer auth. Added in version 1.30.0
351 "passphrase": "Passphrase if the private_key is encrypted (Optional)",
352 }
354 The following command will generate a .pfx file from your .key and .pem file::
356 openssl pkcs12 -export -out certificate.pfx -inkey privateKey.key -in certificate.pem
358 `Subject Name/Issuer Auth
359 <https://github.com/AzureAD/microsoft-authentication-library-for-python/issues/60>`_
360 is an approach to allow easier certificate rotation.
361 If your .pfx file contains both the private key and public cert,
362 you can opt in for Subject Name/Issuer Auth by setting "public_certificate" to ``True``.
364 :type client_credential: Union[dict, str, None]
366 :param dict client_claims:
367 *Added in version 0.5.0*:
368 It is a dictionary of extra claims that would be signed by
369 by this :class:`ConfidentialClientApplication` 's private key.
370 For example, you can use {"client_ip": "x.x.x.x"}.
371 You may also override any of the following default claims::
373 {
374 "aud": the_token_endpoint,
375 "iss": self.client_id,
376 "sub": same_as_issuer,
377 "exp": now + 10_min,
378 "iat": now,
379 "jti": a_random_uuid
380 }
382 :param str authority:
383 A URL that identifies a token authority. It should be of the format
384 ``https://login.microsoftonline.com/your_tenant``
385 By default, we will use ``https://login.microsoftonline.com/common``
387 *Changed in version 1.17*: you can also use predefined constant
388 and a builder like this::
390 from msal.authority import (
391 AuthorityBuilder,
392 AZURE_US_GOVERNMENT, AZURE_CHINA, AZURE_PUBLIC)
393 my_authority = AuthorityBuilder(AZURE_PUBLIC, "contoso.onmicrosoft.com")
394 # Now you get an equivalent of
395 # "https://login.microsoftonline.com/contoso.onmicrosoft.com"
397 # You can feed such an authority to msal's ClientApplication
398 from msal import PublicClientApplication
399 app = PublicClientApplication("my_client_id", authority=my_authority, ...)
401 :param bool validate_authority: (optional) Turns authority validation
402 on or off. This parameter default to true.
403 :param TokenCache token_cache:
404 Sets the token cache used by this ClientApplication instance.
405 By default, an in-memory cache will be created and used.
406 :param http_client: (optional)
407 Your implementation of abstract class HttpClient <msal.oauth2cli.http.http_client>
408 Defaults to a requests session instance.
409 Since MSAL 1.11.0, the default session would be configured
410 to attempt one retry on connection error.
411 If you are providing your own http_client,
412 it will be your http_client's duty to decide whether to perform retry.
414 :param verify: (optional)
415 It will be passed to the
416 `verify parameter in the underlying requests library
417 <http://docs.python-requests.org/en/v2.9.1/user/advanced/#ssl-cert-verification>`_
418 This does not apply if you have chosen to pass your own Http client
419 :param proxies: (optional)
420 It will be passed to the
421 `proxies parameter in the underlying requests library
422 <http://docs.python-requests.org/en/v2.9.1/user/advanced/#proxies>`_
423 This does not apply if you have chosen to pass your own Http client
424 :param timeout: (optional)
425 It will be passed to the
426 `timeout parameter in the underlying requests library
427 <http://docs.python-requests.org/en/v2.9.1/user/advanced/#timeouts>`_
428 This does not apply if you have chosen to pass your own Http client
429 :param app_name: (optional)
430 You can provide your application name for Microsoft telemetry purposes.
431 Default value is None, means it will not be passed to Microsoft.
432 :param app_version: (optional)
433 You can provide your application version for Microsoft telemetry purposes.
434 Default value is None, means it will not be passed to Microsoft.
435 :param list[str] client_capabilities: (optional)
436 Allows configuration of one or more client capabilities, e.g. ["CP1"].
438 Client capability is meant to inform the Microsoft identity platform
439 (STS) what this client is capable for,
440 so STS can decide to turn on certain features.
441 For example, if client is capable to handle *claims challenge*,
442 STS may issue
443 `Continuous Access Evaluation (CAE) <https://learn.microsoft.com/entra/identity/conditional-access/concept-continuous-access-evaluation>`_
444 access tokens to resources,
445 knowing that when the resource emits a *claims challenge*
446 the client will be able to handle those challenges.
448 Implementation details:
449 Client capability is implemented using "claims" parameter on the wire,
450 for now.
451 MSAL will combine them into
452 `claims parameter <https://openid.net/specs/openid-connect-core-1_0-final.html#ClaimsParameter>`_
453 which you will later provide via one of the acquire-token request.
455 :param str azure_region: (optional)
456 Instructs MSAL to use the Entra regional token service. This legacy feature is only available to
457 first-party applications. Only ``acquire_token_for_client()`` is supported.
459 Supports 4 values:
461 1. ``azure_region=None`` - This default value means no region is configured.
462 MSAL will use the region defined in env var ``MSAL_FORCE_REGION``.
463 2. ``azure_region="some_region"`` - meaning the specified region is used.
464 3. ``azure_region=True`` - meaning
465 MSAL will try to auto-detect the region. This is not recommended.
466 4. ``azure_region=False`` - meaning MSAL will use no region.
468 .. note::
469 Region auto-discovery has been tested on VMs and on Azure Functions. It is unreliable.
470 Applications using this option should configure a short timeout.
472 For more details and for the values of the region string
473 see https://learn.microsoft.com/entra/msal/dotnet/resources/region-discovery-troubleshooting
475 New in version 1.12.0.
477 :param list[str] exclude_scopes: (optional)
478 Historically MSAL hardcodes `offline_access` scope,
479 which would allow your app to have prolonged access to user's data.
480 If that is unnecessary or undesirable for your app,
481 now you can use this parameter to supply an exclusion list of scopes,
482 such as ``exclude_scopes = ["offline_access"]``.
484 :param dict http_cache:
485 MSAL has long been caching tokens in the ``token_cache``.
486 Recently, MSAL also introduced a concept of ``http_cache``,
487 by automatically caching some finite amount of non-token http responses,
488 so that *long-lived*
489 ``PublicClientApplication`` and ``ConfidentialClientApplication``
490 would be more performant and responsive in some situations.
492 This ``http_cache`` parameter accepts any dict-like object.
493 If not provided, MSAL will use an in-memory dict.
495 If your app is a command-line app (CLI),
496 you would want to persist your http_cache across different CLI runs.
497 The persisted file's format may change due to, but not limited to,
498 `unstable protocol <https://docs.python.org/3/library/pickle.html#data-stream-format>`_,
499 so your implementation shall tolerate unexpected loading errors.
500 The following recipe shows a way to do so::
502 # Just add the following lines at the beginning of your CLI script
503 import sys, atexit, pickle, logging
504 http_cache_filename = sys.argv[0] + ".http_cache"
505 try:
506 with open(http_cache_filename, "rb") as f:
507 persisted_http_cache = pickle.load(f) # Take a snapshot
508 except (
509 FileNotFoundError, # Or IOError in Python 2
510 pickle.UnpicklingError, # A corrupted http cache file
511 AttributeError, # Cache created by a different version of MSAL
512 ):
513 persisted_http_cache = {} # Recover by starting afresh
514 except: # Unexpected exceptions
515 logging.exception("You may want to debug this")
516 persisted_http_cache = {} # Recover by starting afresh
517 atexit.register(lambda: pickle.dump(
518 # When exit, flush it back to the file.
519 # It may occasionally overwrite another process's concurrent write,
520 # but that is fine. Subsequent runs will reach eventual consistency.
521 persisted_http_cache, open(http_cache_file, "wb")))
523 # And then you can implement your app as you normally would
524 app = msal.PublicClientApplication(
525 "your_client_id",
526 ...,
527 http_cache=persisted_http_cache, # Utilize persisted_http_cache
528 ...,
529 #token_cache=..., # You may combine the old token_cache trick
530 # Please refer to token_cache recipe at
531 # https://msal-python.readthedocs.io/en/latest/#msal.SerializableTokenCache
532 )
533 app.acquire_token_interactive(["your", "scope"], ...)
535 Content inside ``http_cache`` are cheap to obtain.
536 There is no need to share them among different apps.
538 Content inside ``http_cache`` will contain no tokens nor
539 Personally Identifiable Information (PII). Encryption is unnecessary.
541 New in version 1.16.0.
543 :param boolean instance_discovery:
544 Historically, MSAL would connect to a central endpoint located at
545 ``https://login.microsoftonline.com`` to acquire some metadata,
546 especially when using an unfamiliar authority.
547 This behavior is known as Instance Discovery.
549 This parameter defaults to None, which enables the Instance Discovery.
551 If you know some authorities which you allow MSAL to operate with as-is,
552 without involving any Instance Discovery, the recommended pattern is::
554 known_authorities = frozenset([ # Treat your known authorities as const
555 "https://contoso.com/adfs", "https://login.azs/foo"])
556 ...
557 authority = "https://contoso.com/adfs" # Assuming your app will use this
558 app1 = PublicClientApplication(
559 "client_id",
560 authority=authority,
561 # Conditionally disable Instance Discovery for known authorities
562 instance_discovery=authority not in known_authorities,
563 )
565 If you do not know some authorities beforehand,
566 yet still want MSAL to accept any authority that you will provide,
567 you can use a ``False`` to unconditionally disable Instance Discovery.
569 New in version 1.19.0.
571 :param boolean allow_broker:
572 Deprecated. Please use ``enable_broker_on_windows`` instead.
574 :param boolean enable_pii_log:
575 When enabled, logs may include PII (Personal Identifiable Information).
576 This can be useful in troubleshooting broker behaviors.
577 The default behavior is False.
579 New in version 1.24.0.
581 :param str oidc_authority:
582 *Added in version 1.28.0*:
583 It is a URL that identifies an OpenID Connect (OIDC) authority of
584 the format ``https://contoso.com/tenant``.
585 MSAL will append ".well-known/openid-configuration" to the authority
586 and retrieve the OIDC metadata from there, to figure out the endpoints.
588 Note: Broker will NOT be used for OIDC authority.
589 """
590 self.client_id = client_id
591 self.client_credential = client_credential
592 self.client_claims = client_claims
593 self._client_capabilities = client_capabilities
594 self._instance_discovery = instance_discovery
596 if exclude_scopes and not isinstance(exclude_scopes, list):
597 raise ValueError(
598 "Invalid exclude_scopes={}. It need to be a list of strings.".format(
599 repr(exclude_scopes)))
600 self._exclude_scopes = frozenset(exclude_scopes or [])
601 if "openid" in self._exclude_scopes:
602 raise ValueError(
603 'Invalid exclude_scopes={}. You can not opt out "openid" scope'.format(
604 repr(exclude_scopes)))
606 if http_client:
607 self.http_client = http_client
608 else:
609 import requests # Lazy load
611 self.http_client = requests.Session()
612 self.http_client.verify = verify
613 self.http_client.proxies = proxies
614 # Requests, does not support session - wide timeout
615 # But you can patch that (https://github.com/psf/requests/issues/3341):
616 self.http_client.request = functools.partial(
617 self.http_client.request, timeout=timeout)
619 # Enable a minimal retry. Better than nothing.
620 # https://github.com/psf/requests/blob/v2.25.1/requests/adapters.py#L94-L108
621 a = requests.adapters.HTTPAdapter(max_retries=1)
622 self.http_client.mount("http://", a)
623 self.http_client.mount("https://", a)
624 self.http_client = ThrottledHttpClient(
625 self.http_client,
626 http_cache=http_cache,
627 default_throttle_time=60
628 # The default value 60 was recommended mainly for PCA at the end of
629 # https://identitydivision.visualstudio.com/devex/_git/AuthLibrariesApiReview?version=GBdev&path=%2FService%20protection%2FIntial%20set%20of%20protection%20measures.md&_a=preview
630 if isinstance(self, PublicClientApplication) else 5,
631 )
633 self.app_name = app_name
634 self.app_version = app_version
636 # Here the self.authority will not be the same type as authority in input
637 if oidc_authority and authority:
638 raise ValueError("You can not provide both authority and oidc_authority")
639 if isinstance(authority, str) and urlparse(authority).path.startswith(
640 "/dstsv2"): # dSTS authority's path always starts with "/dstsv2"
641 oidc_authority = authority # So we treat it as if an oidc_authority
642 try:
643 authority_to_use = authority or "https://{}/common/".format(WORLD_WIDE)
644 self.authority = Authority(
645 authority_to_use,
646 self.http_client,
647 validate_authority=validate_authority,
648 instance_discovery=self._instance_discovery,
649 oidc_authority_url=oidc_authority,
650 )
651 except ValueError: # Those are explicit authority validation errors
652 raise
653 except Exception: # The rest are typically connection errors
654 if validate_authority and not oidc_authority and (
655 azure_region # Opted in to use region
656 or (azure_region is None and os.getenv("MSAL_FORCE_REGION")) # Will use region
657 ):
658 # Since caller opts in to use region, here we tolerate connection
659 # errors happened during authority validation at non-region endpoint
660 self.authority = Authority(
661 authority_to_use,
662 self.http_client,
663 instance_discovery=False,
664 )
665 else:
666 raise
668 self._decide_broker(allow_broker, enable_pii_log)
669 self.token_cache = token_cache or TokenCache()
670 self._region_configured = azure_region
671 self._region_detected = None
672 self.client, self._regional_client = self._build_client(
673 client_credential, self.authority)
674 self.authority_groups = None
675 self._telemetry_buffer = {}
676 self._telemetry_lock = Lock()
677 _msal_extension_check()
680 def _decide_broker(self, allow_broker, enable_pii_log):
681 is_confidential_app = self.client_credential or isinstance(
682 self, ConfidentialClientApplication)
683 if is_confidential_app and allow_broker:
684 raise ValueError("allow_broker=True is only supported in PublicClientApplication")
685 # Historically, we chose to support ClientApplication("client_id", allow_broker=True)
686 if allow_broker:
687 warnings.warn(
688 "allow_broker is deprecated. "
689 "Please use PublicClientApplication(..., "
690 "enable_broker_on_windows=True, "
691 # No need to mention non-Windows platforms, because allow_broker is only for Windows
692 "...)",
693 DeprecationWarning)
694 opted_in_for_broker = (
695 self._enable_broker # True means Opted-in from PCA
696 or (
697 # When we started the broker project on Windows platform,
698 # the allow_broker was meant to be cross-platform. Now we realize
699 # that other platforms have different redirect_uri requirements,
700 # so the old allow_broker is deprecated and will only for Windows.
701 allow_broker and sys.platform == "win32")
702 )
703 self._enable_broker = ( # This same variable will also store the state
704 opted_in_for_broker
705 and not is_confidential_app
706 and not self.authority.is_adfs
707 and not self.authority._is_b2c
708 )
709 if self._enable_broker:
710 try:
711 _init_broker(enable_pii_log)
712 except RuntimeError:
713 self._enable_broker = False
714 logger.warning( # It is common on Mac and Linux where broker is not built-in
715 "Broker is unavailable on this platform. "
716 "We will fallback to non-broker.")
717 logger.debug("Broker enabled? %s", self._enable_broker)
719 def is_pop_supported(self):
720 """Returns True if this client supports Proof-of-Possession Access Token."""
721 return self._enable_broker and sys.platform in ("win32", "darwin")
723 def _decorate_scope(
724 self, scopes,
725 reserved_scope=frozenset(['openid', 'profile', 'offline_access'])):
726 if not isinstance(scopes, (list, set, tuple)):
727 raise ValueError("The input scopes should be a list, tuple, or set")
728 scope_set = set(scopes) # Input scopes is typically a list. Copy it to a set.
729 if scope_set & reserved_scope:
730 # These scopes are reserved for the API to provide good experience.
731 # We could make the developer pass these and then if they do they will
732 # come back asking why they don't see refresh token or user information.
733 raise ValueError(
734 """You cannot use any scope value that is reserved.
735Your input: {}
736The reserved list: {}""".format(list(scope_set), list(reserved_scope)))
737 raise ValueError(
738 "You cannot use any scope value that is in this reserved list: {}".format(
739 list(reserved_scope)))
741 # client_id can also be used as a scope in B2C
742 decorated = scope_set | reserved_scope
743 decorated -= self._exclude_scopes
744 return list(decorated)
746 def _build_telemetry_context(
747 self, api_id, correlation_id=None, refresh_reason=None):
748 return msal.telemetry._TelemetryContext(
749 self._telemetry_buffer, self._telemetry_lock, api_id,
750 correlation_id=correlation_id, refresh_reason=refresh_reason)
752 def _get_regional_authority(self, central_authority) -> Optional[Authority]:
753 if self._region_configured is False: # User opts out of ESTS-R
754 return None # Short circuit to completely bypass region detection
755 if self._region_configured is None: # User did not make an ESTS-R choice
756 self._region_configured = os.getenv("MSAL_FORCE_REGION") or None
757 self._region_detected = self._region_detected or _detect_region(
758 self.http_client if self._region_configured is not None else None)
759 if (self._region_configured != self.ATTEMPT_REGION_DISCOVERY
760 and self._region_configured != self._region_detected):
761 logger.warning('Region configured ({}) != region detected ({})'.format(
762 repr(self._region_configured), repr(self._region_detected)))
763 region_to_use = (
764 self._region_detected
765 if self._region_configured == self.ATTEMPT_REGION_DISCOVERY
766 else self._region_configured) # It will retain the None i.e. opted out
767 logger.debug('Region to be used: {}'.format(repr(region_to_use)))
768 if region_to_use:
769 regional_host = ("{}.login.microsoft.com".format(region_to_use)
770 if central_authority.instance in (
771 # The list came from point 3 of the algorithm section in this internal doc
772 # https://identitydivision.visualstudio.com/DevEx/_git/AuthLibrariesApiReview?path=/PinAuthToRegion/AAD%20SDK%20Proposal%20to%20Pin%20Auth%20to%20region.md&anchor=algorithm&_a=preview
773 "login.microsoftonline.com",
774 "login.microsoft.com",
775 "login.windows.net",
776 "sts.windows.net",
777 )
778 else "{}.{}".format(region_to_use, central_authority.instance))
779 return Authority( # The central_authority has already been validated
780 "https://{}/{}".format(regional_host, central_authority.tenant),
781 self.http_client,
782 instance_discovery=False,
783 )
784 return None
786 def _build_client(self, client_credential, authority, skip_regional_client=False):
787 client_assertion = None
788 client_assertion_type = None
789 default_headers = {
790 "x-client-sku": SKU, "x-client-ver": __version__,
791 "x-client-os": sys.platform,
792 "x-ms-lib-capability": "retry-after, h429",
793 }
794 if self.app_name:
795 default_headers['x-app-name'] = self.app_name
796 if self.app_version:
797 default_headers['x-app-ver'] = self.app_version
798 default_body = {"client_info": 1}
799 if isinstance(client_credential, dict):
800 client_assertion_type = Client.CLIENT_ASSERTION_TYPE_JWT
801 # Use client_credential.get("...") rather than "..." in client_credential
802 # so that we can ignore an empty string came from an empty ENV VAR.
803 if client_credential.get("client_assertion"):
804 client_assertion = client_credential['client_assertion']
805 else:
806 headers = {}
807 sha1_thumbprint = sha256_thumbprint = None
808 passphrase_bytes = _str2bytes(
809 client_credential["passphrase"]
810 ) if client_credential.get("passphrase") else None
811 if client_credential.get("private_key_pfx_path"):
812 private_key, sha256_thumbprint, sha1_thumbprint, x5c = _parse_pfx(
813 client_credential["private_key_pfx_path"],
814 passphrase_bytes)
815 if client_credential.get("public_certificate") is True and x5c:
816 headers["x5c"] = x5c
817 elif client_credential.get("private_key"): # PEM blob
818 private_key = ( # handles both encrypted and unencrypted
819 _load_private_key_from_pem_str(
820 client_credential['private_key'], passphrase_bytes)
821 if passphrase_bytes
822 else client_credential['private_key']
823 )
825 # Determine thumbprints based on what's provided
826 if client_credential.get("thumbprint"):
827 # User provided a thumbprint - use it as SHA-1 (legacy/manual approach)
828 sha1_thumbprint = client_credential["thumbprint"]
829 sha256_thumbprint = None
830 elif isinstance(client_credential.get('public_certificate'), str):
831 # No thumbprint provided, but we have a certificate to calculate thumbprints
832 from cryptography import x509
833 cert = x509.load_pem_x509_certificate(
834 _str2bytes(client_credential['public_certificate']))
835 sha256_thumbprint, sha1_thumbprint, headers["x5c"] = (
836 _extract_cert_and_thumbprints(cert))
837 else:
838 raise ValueError(
839 "You must provide either 'thumbprint' or 'public_certificate' "
840 "from which the thumbprint can be calculated.")
841 else:
842 raise ValueError(
843 "client_credential needs to follow this format "
844 "https://msal-python.readthedocs.io/en/latest/#msal.ClientApplication.params.client_credential")
845 if ("x5c" not in headers # So the .pfx file contains no certificate
846 and isinstance(client_credential.get('public_certificate'), str)
847 ): # Then we treat the public_certificate value as PEM content
848 headers["x5c"] = extract_certs(client_credential['public_certificate'])
849 if sha256_thumbprint and not authority.is_adfs:
850 assertion_params = {
851 "algorithm": "PS256", "sha256_thumbprint": sha256_thumbprint,
852 }
853 else: # Fall back
854 if not sha1_thumbprint:
855 raise ValueError("You shall provide a thumbprint in SHA1.")
856 assertion_params = {
857 "algorithm": "RS256", "sha1_thumbprint": sha1_thumbprint,
858 }
859 assertion = JwtAssertionCreator(
860 private_key, headers=headers, **assertion_params)
861 client_assertion = assertion.create_regenerative_assertion(
862 audience=authority.token_endpoint, issuer=self.client_id,
863 additional_claims=self.client_claims or {})
864 else:
865 default_body['client_secret'] = client_credential
866 central_configuration = {
867 "authorization_endpoint": authority.authorization_endpoint,
868 "token_endpoint": authority.token_endpoint,
869 "device_authorization_endpoint": authority.device_authorization_endpoint,
870 }
871 central_client = _ClientWithCcsRoutingInfo(
872 central_configuration,
873 self.client_id,
874 http_client=self.http_client,
875 default_headers=default_headers,
876 default_body=default_body,
877 client_assertion=client_assertion,
878 client_assertion_type=client_assertion_type,
879 on_obtaining_tokens=lambda event: self.token_cache.add(dict(
880 event, environment=authority.instance)),
881 on_removing_rt=self.token_cache.remove_rt,
882 on_updating_rt=self.token_cache.update_rt)
884 regional_client = None
885 if (client_credential # Currently regional endpoint only serves some CCA flows
886 and not skip_regional_client):
887 regional_authority = self._get_regional_authority(authority)
888 if regional_authority:
889 regional_configuration = {
890 "authorization_endpoint": regional_authority.authorization_endpoint,
891 "token_endpoint": regional_authority.token_endpoint,
892 "device_authorization_endpoint":
893 regional_authority.device_authorization_endpoint,
894 }
895 regional_client = _ClientWithCcsRoutingInfo(
896 regional_configuration,
897 self.client_id,
898 http_client=self.http_client,
899 default_headers=default_headers,
900 default_body=default_body,
901 client_assertion=client_assertion,
902 client_assertion_type=client_assertion_type,
903 on_obtaining_tokens=lambda event: self.token_cache.add(dict(
904 event, environment=authority.instance)),
905 on_removing_rt=self.token_cache.remove_rt,
906 on_updating_rt=self.token_cache.update_rt)
907 return central_client, regional_client
909 def initiate_auth_code_flow(
910 self,
911 scopes, # type: list[str]
912 redirect_uri=None,
913 state=None, # Recommended by OAuth2 for CSRF protection
914 prompt=None,
915 login_hint=None, # type: Optional[str]
916 domain_hint=None, # type: Optional[str]
917 claims_challenge=None,
918 max_age=None,
919 response_mode=None, # type: Optional[str]
920 ):
921 """Initiate an auth code flow.
923 Later when the response reaches your redirect_uri,
924 you can use :func:`~acquire_token_by_auth_code_flow()`
925 to complete the authentication/authorization.
927 :param list scopes:
928 It is a list of case-sensitive strings.
929 :param str redirect_uri:
930 Optional. If not specified, server will use the pre-registered one.
931 :param str state:
932 An opaque value used by the client to
933 maintain state between the request and callback.
934 If absent, this library will automatically generate one internally.
935 :param str prompt:
936 By default, no prompt value will be sent, not even string ``"none"``.
937 You will have to specify a value explicitly.
938 Its valid values are the constants defined in
939 :class:`Prompt <msal.Prompt>`.
941 :param str login_hint:
942 Optional. Identifier of the user. Generally a User Principal Name (UPN).
943 :param domain_hint:
944 Can be one of "consumers" or "organizations" or your tenant domain "contoso.com".
945 If included, it will skip the email-based discovery process that user goes
946 through on the sign-in page, leading to a slightly more streamlined user experience.
947 More information on possible values available in
948 `Auth Code Flow doc <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-auth-code-flow#request-an-authorization-code>`_ and
949 `domain_hint doc <https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-oapx/86fb452d-e34a-494e-ac61-e526e263b6d8>`_.
951 :param int max_age:
952 OPTIONAL. Maximum Authentication Age.
953 Specifies the allowable elapsed time in seconds
954 since the last time the End-User was actively authenticated.
955 If the elapsed time is greater than this value,
956 Microsoft identity platform will actively re-authenticate the End-User.
958 MSAL Python will also automatically validate the auth_time in ID token.
960 New in version 1.15.
962 :param str response_mode:
963 OPTIONAL. Specifies the method with which response parameters should be returned.
964 The default value is equivalent to ``query``, which is still secure enough in MSAL Python
965 (because MSAL Python does not transfer tokens via query parameter in the first place).
966 For even better security, we recommend using the value ``form_post``.
967 In "form_post" mode, response parameters
968 will be encoded as HTML form values that are transmitted via the HTTP POST method and
969 encoded in the body using the application/x-www-form-urlencoded format.
970 Valid values can be either "form_post" for HTTP POST to callback URI or
971 "query" (the default) for HTTP GET with parameters encoded in query string.
972 More information on possible values
973 `here <https://openid.net/specs/oauth-v2-multiple-response-types-1_0.html#ResponseModes>`
974 and `here <https://openid.net/specs/oauth-v2-form-post-response-mode-1_0.html#FormPostResponseMode>`
976 :return:
977 The auth code flow. It is a dict in this form::
979 {
980 "auth_uri": "https://...", // Guide user to visit this
981 "state": "...", // You may choose to verify it by yourself,
982 // or just let acquire_token_by_auth_code_flow()
983 // do that for you.
984 "...": "...", // Everything else are reserved and internal
985 }
987 The caller is expected to:
989 1. somehow store this content, typically inside the current session,
990 2. guide the end user (i.e. resource owner) to visit that auth_uri,
991 3. and then relay this dict and subsequent auth response to
992 :func:`~acquire_token_by_auth_code_flow()`.
993 """
994 client = _ClientWithCcsRoutingInfo(
995 {"authorization_endpoint": self.authority.authorization_endpoint},
996 self.client_id,
997 http_client=self.http_client)
998 flow = client.initiate_auth_code_flow(
999 redirect_uri=redirect_uri, state=state, login_hint=login_hint,
1000 prompt=prompt,
1001 scope=self._decorate_scope(scopes),
1002 domain_hint=domain_hint,
1003 claims=_merge_claims_challenge_and_capabilities(
1004 self._client_capabilities, claims_challenge),
1005 max_age=max_age,
1006 response_mode=response_mode,
1007 )
1008 flow["claims_challenge"] = claims_challenge
1009 return flow
1011 def get_authorization_request_url(
1012 self,
1013 scopes, # type: list[str]
1014 login_hint=None, # type: Optional[str]
1015 state=None, # Recommended by OAuth2 for CSRF protection
1016 redirect_uri=None,
1017 response_type="code", # Could be "token" if you use Implicit Grant
1018 prompt=None,
1019 nonce=None,
1020 domain_hint=None, # type: Optional[str]
1021 claims_challenge=None,
1022 **kwargs):
1023 """Constructs a URL for you to start a Authorization Code Grant.
1025 :param list[str] scopes: (Required)
1026 Scopes requested to access a protected API (a resource).
1027 :param str state: Recommended by OAuth2 for CSRF protection.
1028 :param str login_hint:
1029 Identifier of the user. Generally a User Principal Name (UPN).
1030 :param str redirect_uri:
1031 Address to return to upon receiving a response from the authority.
1032 :param str response_type:
1033 Default value is "code" for an OAuth2 Authorization Code grant.
1035 You could use other content such as "id_token" or "token",
1036 which would trigger an Implicit Grant, but that is
1037 `not recommended <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-implicit-grant-flow#is-the-implicit-grant-suitable-for-my-app>`_.
1039 :param str prompt:
1040 By default, no prompt value will be sent, not even string ``"none"``.
1041 You will have to specify a value explicitly.
1042 Its valid values are the constants defined in
1043 :class:`Prompt <msal.Prompt>`.
1044 :param nonce:
1045 A cryptographically random value used to mitigate replay attacks. See also
1046 `OIDC specs <https://openid.net/specs/openid-connect-core-1_0.html#AuthRequest>`_.
1047 :param domain_hint:
1048 Can be one of "consumers" or "organizations" or your tenant domain "contoso.com".
1049 If included, it will skip the email-based discovery process that user goes
1050 through on the sign-in page, leading to a slightly more streamlined user experience.
1051 More information on possible values available in
1052 `Auth Code Flow doc <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-auth-code-flow#request-an-authorization-code>`_ and
1053 `domain_hint doc <https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-oapx/86fb452d-e34a-494e-ac61-e526e263b6d8>`_.
1054 :param claims_challenge:
1055 The claims_challenge parameter requests specific claims requested by the resource provider
1056 in the form of a claims_challenge directive in the www-authenticate header to be
1057 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
1058 It is a string of a JSON object which contains lists of claims being requested from these locations.
1060 :return: The authorization url as a string.
1061 """
1062 authority = kwargs.pop("authority", None) # Historically we support this
1063 if authority:
1064 warnings.warn(
1065 "We haven't decided if this method will accept authority parameter")
1066 # The previous implementation is, it will use self.authority by default.
1067 # Multi-tenant app can use new authority on demand
1068 the_authority = Authority(
1069 authority,
1070 self.http_client,
1071 instance_discovery=self._instance_discovery,
1072 ) if authority else self.authority
1074 client = _ClientWithCcsRoutingInfo(
1075 {"authorization_endpoint": the_authority.authorization_endpoint},
1076 self.client_id,
1077 http_client=self.http_client)
1078 warnings.warn(
1079 "Change your get_authorization_request_url() "
1080 "to initiate_auth_code_flow()", DeprecationWarning)
1081 with warnings.catch_warnings(record=True):
1082 return client.build_auth_request_uri(
1083 response_type=response_type,
1084 redirect_uri=redirect_uri, state=state, login_hint=login_hint,
1085 prompt=prompt,
1086 scope=self._decorate_scope(scopes),
1087 nonce=nonce,
1088 domain_hint=domain_hint,
1089 claims=_merge_claims_challenge_and_capabilities(
1090 self._client_capabilities, claims_challenge),
1091 )
1093 def acquire_token_by_auth_code_flow(
1094 self, auth_code_flow, auth_response, scopes=None, **kwargs):
1095 """Validate the auth response being redirected back, and obtain tokens.
1097 It automatically provides nonce protection.
1099 :param dict auth_code_flow:
1100 The same dict returned by :func:`~initiate_auth_code_flow()`.
1101 :param dict auth_response:
1102 A dict of the query string received from auth server.
1103 :param list[str] scopes:
1104 Scopes requested to access a protected API (a resource).
1106 Most of the time, you can leave it empty.
1108 If you requested user consent for multiple resources, here you will
1109 need to provide a subset of what you required in
1110 :func:`~initiate_auth_code_flow()`.
1112 OAuth2 was designed mostly for singleton services,
1113 where tokens are always meant for the same resource and the only
1114 changes are in the scopes.
1115 In Microsoft Entra, tokens can be issued for multiple 3rd party resources.
1116 You can ask authorization code for multiple resources,
1117 but when you redeem it, the token is for only one intended
1118 recipient, called audience.
1119 So the developer need to specify a scope so that we can restrict the
1120 token to be issued for the corresponding audience.
1122 :return:
1123 * A dict containing "access_token" and/or "id_token", among others,
1124 depends on what scope was used.
1125 (See https://tools.ietf.org/html/rfc6749#section-5.1)
1126 * A dict containing "error", optionally "error_description", "error_uri".
1127 (It is either `this <https://tools.ietf.org/html/rfc6749#section-4.1.2.1>`_
1128 or `that <https://tools.ietf.org/html/rfc6749#section-5.2>`_)
1129 * Most client-side data error would result in ValueError exception.
1130 So the usage pattern could be without any protocol details::
1132 def authorize(): # A controller in a web app
1133 try:
1134 result = msal_app.acquire_token_by_auth_code_flow(
1135 session.get("flow", {}), request.args)
1136 if "error" in result:
1137 return render_template("error.html", result)
1138 use(result) # Token(s) are available in result and cache
1139 except ValueError: # Usually caused by CSRF
1140 pass # Simply ignore them
1141 return redirect(url_for("index"))
1142 """
1143 self._validate_ssh_cert_input_data(kwargs.get("data", {}))
1144 telemetry_context = self._build_telemetry_context(
1145 self.ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID)
1146 response = _clean_up(self.client.obtain_token_by_auth_code_flow(
1147 auth_code_flow,
1148 auth_response,
1149 scope=self._decorate_scope(scopes) if scopes else None,
1150 headers=telemetry_context.generate_headers(),
1151 data=dict(
1152 kwargs.pop("data", {}),
1153 claims=_merge_claims_challenge_and_capabilities(
1154 self._client_capabilities,
1155 auth_code_flow.pop("claims_challenge", None))),
1156 **kwargs))
1157 if "access_token" in response:
1158 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP
1159 telemetry_context.update_telemetry(response)
1160 return response
1162 def acquire_token_by_authorization_code(
1163 self,
1164 code,
1165 scopes, # Syntactically required. STS accepts empty value though.
1166 redirect_uri=None,
1167 # REQUIRED, if the "redirect_uri" parameter was included in the
1168 # authorization request as described in Section 4.1.1, and their
1169 # values MUST be identical.
1170 nonce=None,
1171 claims_challenge=None,
1172 **kwargs):
1173 """The second half of the Authorization Code Grant.
1175 :param code: The authorization code returned from Authorization Server.
1176 :param list[str] scopes: (Required)
1177 Scopes requested to access a protected API (a resource).
1179 If you requested user consent for multiple resources, here you will
1180 typically want to provide a subset of what you required in AuthCode.
1182 OAuth2 was designed mostly for singleton services,
1183 where tokens are always meant for the same resource and the only
1184 changes are in the scopes.
1185 In Microsoft Entra, tokens can be issued for multiple 3rd party resources.
1186 You can ask authorization code for multiple resources,
1187 but when you redeem it, the token is for only one intended
1188 recipient, called audience.
1189 So the developer need to specify a scope so that we can restrict the
1190 token to be issued for the corresponding audience.
1192 :param nonce:
1193 If you provided a nonce when calling :func:`get_authorization_request_url`,
1194 same nonce should also be provided here, so that we'll validate it.
1195 An exception will be raised if the nonce in id token mismatches.
1197 :param claims_challenge:
1198 The claims_challenge parameter requests specific claims requested by the resource provider
1199 in the form of a claims_challenge directive in the www-authenticate header to be
1200 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
1201 It is a string of a JSON object which contains lists of claims being requested from these locations.
1203 :return: A dict representing the json response from Microsoft Entra:
1205 - A successful response would contain "access_token" key,
1206 - an error response would contain "error" and usually "error_description".
1207 """
1208 # If scope is absent on the wire, STS will give you a token associated
1209 # to the FIRST scope sent during the authorization request.
1210 # So in theory, you can omit scope here when you were working with only
1211 # one scope. But, MSAL decorates your scope anyway, so they are never
1212 # really empty.
1213 assert isinstance(scopes, list), "Invalid parameter type"
1214 self._validate_ssh_cert_input_data(kwargs.get("data", {}))
1215 warnings.warn(
1216 "Change your acquire_token_by_authorization_code() "
1217 "to acquire_token_by_auth_code_flow()", DeprecationWarning)
1218 with warnings.catch_warnings(record=True):
1219 telemetry_context = self._build_telemetry_context(
1220 self.ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID)
1221 response = _clean_up(self.client.obtain_token_by_authorization_code(
1222 code, redirect_uri=redirect_uri,
1223 scope=self._decorate_scope(scopes),
1224 headers=telemetry_context.generate_headers(),
1225 data=dict(
1226 kwargs.pop("data", {}),
1227 claims=_merge_claims_challenge_and_capabilities(
1228 self._client_capabilities, claims_challenge)),
1229 nonce=nonce,
1230 **kwargs))
1231 if "access_token" in response:
1232 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP
1233 telemetry_context.update_telemetry(response)
1234 return response
1236 def get_accounts(self, username=None):
1237 """Get a list of accounts which previously signed in, i.e. exists in cache.
1239 An account can later be used in :func:`~acquire_token_silent`
1240 to find its tokens.
1242 :param username:
1243 Filter accounts with this username only. Case insensitive.
1244 :return: A list of account objects.
1245 Each account is a dict. For now, we only document its "username" field.
1246 Your app can choose to display those information to end user,
1247 and allow user to choose one of his/her accounts to proceed.
1248 """
1249 accounts = self._find_msal_accounts(environment=self.authority.instance)
1250 if not accounts: # Now try other aliases of this authority instance
1251 for alias in self._get_authority_aliases(self.authority.instance):
1252 accounts = self._find_msal_accounts(environment=alias)
1253 if accounts:
1254 break
1255 if username:
1256 # Federated account["username"] from AAD could contain mixed case
1257 lowercase_username = username.lower()
1258 accounts = [a for a in accounts
1259 if a["username"].lower() == lowercase_username]
1260 if not accounts:
1261 logger.debug(( # This would also happen when the cache is empty
1262 "get_accounts(username='{}') finds no account. "
1263 "If tokens were acquired without 'profile' scope, "
1264 "they would contain no username for filtering. "
1265 "Consider calling get_accounts(username=None) instead."
1266 ).format(username))
1267 # Does not further filter by existing RTs here. It probably won't matter.
1268 # Because in most cases Accounts and RTs co-exist.
1269 # Even in the rare case when an RT is revoked and then removed,
1270 # acquire_token_silent() would then yield no result,
1271 # apps would fall back to other acquire methods. This is the standard pattern.
1272 return accounts
1274 def _find_msal_accounts(self, environment):
1275 interested_authority_types = [
1276 TokenCache.AuthorityType.ADFS, TokenCache.AuthorityType.MSSTS]
1277 if _is_running_in_cloud_shell():
1278 interested_authority_types.append(_AUTHORITY_TYPE_CLOUDSHELL)
1279 grouped_accounts = {
1280 a.get("home_account_id"): # Grouped by home tenant's id
1281 { # These are minimal amount of non-tenant-specific account info
1282 "home_account_id": a.get("home_account_id"),
1283 "environment": a.get("environment"),
1284 "username": a.get("username"),
1285 "account_source": a.get("account_source"),
1287 # The following fields for backward compatibility, for now
1288 "authority_type": a.get("authority_type"),
1289 "local_account_id": a.get("local_account_id"), # Tenant-specific
1290 "realm": a.get("realm"), # Tenant-specific
1291 }
1292 for a in self.token_cache.search(
1293 TokenCache.CredentialType.ACCOUNT,
1294 query={"environment": environment})
1295 if a["authority_type"] in interested_authority_types
1296 }
1297 return list(grouped_accounts.values())
1299 def _get_instance_metadata(self): # This exists so it can be mocked in unit test
1300 resp = self.http_client.get(
1301 "https://login.microsoftonline.com/common/discovery/instance?api-version=1.1&authorization_endpoint=https://login.microsoftonline.com/common/oauth2/authorize", # TBD: We may extend this to use self._instance_discovery endpoint
1302 headers={'Accept': 'application/json'})
1303 resp.raise_for_status()
1304 return json.loads(resp.text)['metadata']
1306 def _get_authority_aliases(self, instance):
1307 if self._instance_discovery is False:
1308 return []
1309 if self.authority._is_known_to_developer:
1310 # Then it is an ADFS/B2C/known_authority_hosts situation
1311 # which may not reach the central endpoint, so we skip it.
1312 return []
1313 if not self.authority_groups:
1314 self.authority_groups = [
1315 set(group['aliases']) for group in self._get_instance_metadata()]
1316 for group in self.authority_groups:
1317 if instance in group:
1318 return [alias for alias in group if alias != instance]
1319 return []
1321 def remove_account(self, account):
1322 """Sign me out and forget me from token cache"""
1323 if self._enable_broker:
1324 from .broker import _signout_silently
1325 error = _signout_silently(self.client_id, account["local_account_id"])
1326 if error:
1327 logger.debug("_signout_silently() returns error: %s", error)
1328 # Broker sign-out has been attempted, even if the _forget_me() below throws.
1329 self._forget_me(account)
1331 def _sign_out(self, home_account):
1332 # Remove all relevant RTs and ATs from token cache
1333 owned_by_home_account = {
1334 "environment": home_account["environment"],
1335 "home_account_id": home_account["home_account_id"],} # realm-independent
1336 app_metadata = self._get_app_metadata(home_account["environment"])
1337 # Remove RTs/FRTs, and they are realm-independent
1338 for rt in [ # Remove RTs from a static list (rather than from a dynamic generator),
1339 # to avoid changing self.token_cache while it is being iterated
1340 rt for rt in self.token_cache.search(
1341 TokenCache.CredentialType.REFRESH_TOKEN, query=owned_by_home_account)
1342 # Do RT's app ownership check as a precaution, in case family apps
1343 # and 3rd-party apps share same token cache, although they should not.
1344 if rt["client_id"] == self.client_id or (
1345 app_metadata.get("family_id") # Now let's settle family business
1346 and rt.get("family_id") == app_metadata["family_id"])
1347 ]:
1348 self.token_cache.remove_rt(rt)
1349 for at in list(self.token_cache.search( # Remove ATs from a static list,
1350 # to avoid changing self.token_cache while it is being iterated
1351 TokenCache.CredentialType.ACCESS_TOKEN, query=owned_by_home_account,
1352 # Regardless of realm, b/c we've removed realm-independent RTs anyway
1353 )):
1354 # To avoid the complexity of locating sibling family app's AT,
1355 # we skip AT's app ownership check.
1356 # It means ATs for other apps will also be removed, it is OK because:
1357 # * non-family apps are not supposed to share token cache to begin with;
1358 # * Even if it happens, we keep other app's RT already, so SSO still works
1359 self.token_cache.remove_at(at)
1361 def _forget_me(self, home_account):
1362 # It implies signout, and then also remove all relevant accounts and IDTs
1363 self._sign_out(home_account)
1364 owned_by_home_account = {
1365 "environment": home_account["environment"],
1366 "home_account_id": home_account["home_account_id"],} # realm-independent
1367 for idt in list(self.token_cache.search( # Remove IDTs from a static list,
1368 # to avoid changing self.token_cache while it is being iterated
1369 TokenCache.CredentialType.ID_TOKEN, query=owned_by_home_account, # regardless of realm
1370 )):
1371 self.token_cache.remove_idt(idt)
1372 for a in list(self.token_cache.search( # Remove Accounts from a static list,
1373 # to avoid changing self.token_cache while it is being iterated
1374 TokenCache.CredentialType.ACCOUNT, query=owned_by_home_account, # regardless of realm
1375 )):
1376 self.token_cache.remove_account(a)
1378 def _acquire_token_by_cloud_shell(self, scopes, data=None):
1379 from .cloudshell import _obtain_token
1380 response = _obtain_token(
1381 self.http_client, scopes, client_id=self.client_id, data=data)
1382 if "error" not in response:
1383 self.token_cache.add(dict(
1384 client_id=self.client_id,
1385 scope=response["scope"].split() if "scope" in response else scopes,
1386 token_endpoint=self.authority.token_endpoint,
1387 response=response,
1388 data=data or {},
1389 authority_type=_AUTHORITY_TYPE_CLOUDSHELL,
1390 ))
1391 if "access_token" in response:
1392 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_BROKER
1393 return response
1395 def acquire_token_silent(
1396 self,
1397 scopes, # type: List[str]
1398 account, # type: Optional[Account]
1399 authority=None, # See get_authorization_request_url()
1400 force_refresh=False, # type: Optional[boolean]
1401 claims_challenge=None,
1402 auth_scheme=None,
1403 **kwargs):
1404 """Acquire an access token for given account, without user interaction.
1406 It has same parameters as the :func:`~acquire_token_silent_with_error`.
1407 The difference is the behavior of the return value.
1408 This method will combine the cache empty and refresh error
1409 into one return value, `None`.
1410 If your app does not care about the exact token refresh error during
1411 token cache look-up, then this method is easier and recommended.
1413 :return:
1414 - A dict containing no "error" key,
1415 and typically contains an "access_token" key,
1416 if cache lookup succeeded.
1417 - None when cache lookup does not yield a token.
1418 """
1419 if not account:
1420 return None # A backward-compatible NO-OP to drop the account=None usage
1421 result = _clean_up(self._acquire_token_silent_with_error(
1422 scopes, account, authority=authority, force_refresh=force_refresh,
1423 claims_challenge=claims_challenge, auth_scheme=auth_scheme, **kwargs))
1424 return result if result and "error" not in result else None
1426 def acquire_token_silent_with_error(
1427 self,
1428 scopes, # type: List[str]
1429 account, # type: Optional[Account]
1430 authority=None, # See get_authorization_request_url()
1431 force_refresh=False, # type: Optional[boolean]
1432 claims_challenge=None,
1433 auth_scheme=None,
1434 **kwargs):
1435 """Acquire an access token for given account, without user interaction.
1437 It is done either by finding a valid access token from cache,
1438 or by finding a valid refresh token from cache and then automatically
1439 use it to redeem a new access token.
1441 This method will differentiate cache empty from token refresh error.
1442 If your app cares the exact token refresh error during
1443 token cache look-up, then this method is suitable.
1444 Otherwise, the other method :func:`~acquire_token_silent` is recommended.
1446 :param list[str] scopes: (Required)
1447 Scopes requested to access a protected API (a resource).
1448 :param account: (Required)
1449 One of the account object returned by :func:`~get_accounts`.
1450 Starting from MSAL Python 1.23,
1451 a ``None`` input will become a NO-OP and always return ``None``.
1452 :param force_refresh:
1453 If True, it will skip Access Token look-up,
1454 and try to find a Refresh Token to obtain a new Access Token.
1455 :param claims_challenge:
1456 The claims_challenge parameter requests specific claims requested by the resource provider
1457 in the form of a claims_challenge directive in the www-authenticate header to be
1458 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
1459 It is a string of a JSON object which contains lists of claims being requested from these locations.
1460 :param object auth_scheme:
1461 You can provide an ``msal.auth_scheme.PopAuthScheme`` object
1462 so that MSAL will get a Proof-of-Possession (POP) token for you.
1464 New in version 1.26.0.
1466 :return:
1467 - A dict containing no "error" key,
1468 and typically contains an "access_token" key,
1469 if cache lookup succeeded.
1470 - None when there is simply no token in the cache.
1471 - A dict containing an "error" key, when token refresh failed.
1472 """
1473 if not account:
1474 return None # A backward-compatible NO-OP to drop the account=None usage
1475 return _clean_up(self._acquire_token_silent_with_error(
1476 scopes, account, authority=authority, force_refresh=force_refresh,
1477 claims_challenge=claims_challenge, auth_scheme=auth_scheme, **kwargs))
1479 def _acquire_token_silent_with_error(
1480 self,
1481 scopes, # type: List[str]
1482 account, # type: Optional[Account]
1483 authority=None, # See get_authorization_request_url()
1484 force_refresh=False, # type: Optional[boolean]
1485 claims_challenge=None,
1486 auth_scheme=None,
1487 **kwargs):
1488 assert isinstance(scopes, list), "Invalid parameter type"
1489 self._validate_ssh_cert_input_data(kwargs.get("data", {}))
1490 correlation_id = msal.telemetry._get_new_correlation_id()
1491 if authority:
1492 warnings.warn("We haven't decided how/if this method will accept authority parameter")
1493 # the_authority = Authority(
1494 # authority,
1495 # self.http_client,
1496 # instance_discovery=self._instance_discovery,
1497 # ) if authority else self.authority
1498 result = self._acquire_token_silent_from_cache_and_possibly_refresh_it(
1499 scopes, account, self.authority, force_refresh=force_refresh,
1500 claims_challenge=claims_challenge,
1501 correlation_id=correlation_id,
1502 auth_scheme=auth_scheme,
1503 **kwargs)
1504 if result and "error" not in result:
1505 return result
1506 final_result = result
1507 for alias in self._get_authority_aliases(self.authority.instance):
1508 if not list(self.token_cache.search( # Need a list to test emptiness
1509 self.token_cache.CredentialType.REFRESH_TOKEN,
1510 # target=scopes, # MUST NOT filter by scopes, because:
1511 # 1. AAD RTs are scope-independent;
1512 # 2. therefore target is optional per schema;
1513 query={"environment": alias})):
1514 # Skip heavy weight logic when RT for this alias doesn't exist
1515 continue
1516 the_authority = Authority(
1517 "https://" + alias + "/" + self.authority.tenant,
1518 self.http_client,
1519 instance_discovery=False,
1520 )
1521 result = self._acquire_token_silent_from_cache_and_possibly_refresh_it(
1522 scopes, account, the_authority, force_refresh=force_refresh,
1523 claims_challenge=claims_challenge,
1524 correlation_id=correlation_id,
1525 auth_scheme=auth_scheme,
1526 **kwargs)
1527 if result:
1528 if "error" not in result:
1529 return result
1530 final_result = result
1531 if final_result and final_result.get("suberror"):
1532 final_result["classification"] = { # Suppress these suberrors, per #57
1533 "bad_token": "",
1534 "token_expired": "",
1535 "protection_policy_required": "",
1536 "client_mismatch": "",
1537 "device_authentication_failed": "",
1538 }.get(final_result["suberror"], final_result["suberror"])
1539 return final_result
1541 def _acquire_token_silent_from_cache_and_possibly_refresh_it(
1542 self,
1543 scopes, # type: List[str]
1544 account, # type: Optional[Account]
1545 authority, # This can be different than self.authority
1546 force_refresh=False, # type: Optional[boolean]
1547 claims_challenge=None,
1548 correlation_id=None,
1549 http_exceptions=None,
1550 auth_scheme=None,
1551 **kwargs):
1552 # This internal method has two calling patterns:
1553 # it accepts a non-empty account to find token for a user,
1554 # and accepts account=None to find a token for the current app.
1555 access_token_from_cache = None
1556 if not (force_refresh or claims_challenge or auth_scheme): # Then attempt AT cache
1557 query={
1558 "client_id": self.client_id,
1559 "environment": authority.instance,
1560 "realm": authority.tenant,
1561 "home_account_id": (account or {}).get("home_account_id"),
1562 }
1563 key_id = kwargs.get("data", {}).get("key_id")
1564 if key_id: # Some token types (SSH-certs, POP) are bound to a key
1565 query["key_id"] = key_id
1566 now = time.time()
1567 refresh_reason = msal.telemetry.AT_ABSENT
1568 for entry in self.token_cache.search( # A generator allows us to
1569 # break early in cache-hit without finding a full list
1570 self.token_cache.CredentialType.ACCESS_TOKEN,
1571 target=scopes,
1572 query=query,
1573 ): # This loop is about token search, not about token deletion.
1574 # Note that search() holds a lock during this loop;
1575 # that is fine because this loop is fast
1576 expires_in = int(entry["expires_on"]) - now
1577 if expires_in < 5*60: # Then consider it expired
1578 refresh_reason = msal.telemetry.AT_EXPIRED
1579 continue # Removal is not necessary, it will be overwritten
1580 logger.debug("Cache hit an AT")
1581 access_token_from_cache = { # Mimic a real response
1582 "access_token": entry["secret"],
1583 "token_type": entry.get("token_type", "Bearer"),
1584 "expires_in": int(expires_in), # OAuth2 specs defines it as int
1585 self._TOKEN_SOURCE: self._TOKEN_SOURCE_CACHE,
1586 }
1587 if "refresh_on" in entry:
1588 access_token_from_cache["refresh_on"] = int(entry["refresh_on"])
1589 if int(entry["refresh_on"]) < now: # aging
1590 refresh_reason = msal.telemetry.AT_AGING
1591 break # With a fallback in hand, we break here to go refresh
1592 self._build_telemetry_context(-1).hit_an_access_token()
1593 return access_token_from_cache # It is still good as new
1594 else:
1595 refresh_reason = msal.telemetry.FORCE_REFRESH # TODO: It could also mean claims_challenge
1596 assert refresh_reason, "It should have been established at this point"
1597 if not http_exceptions: # It can be a tuple of exceptions
1598 # The exact HTTP exceptions are transportation-layer dependent
1599 from requests.exceptions import RequestException # Lazy load
1600 http_exceptions = (RequestException,)
1601 try:
1602 data = kwargs.get("data", {})
1603 if account and account.get("authority_type") == _AUTHORITY_TYPE_CLOUDSHELL:
1604 if auth_scheme:
1605 raise ValueError("auth_scheme is not supported in Cloud Shell")
1606 return self._acquire_token_by_cloud_shell(scopes, data=data)
1608 is_ssh_cert_or_pop_request = _is_ssh_cert_or_pop_request(data.get("token_type"), auth_scheme)
1610 if self._enable_broker and account and account.get("account_source") in (
1611 _GRANT_TYPE_BROKER, # Broker successfully established this account previously.
1612 None, # Unknown data from older MSAL. Broker might still work.
1613 ) and (sys.platform in ("win32", "darwin") or not is_ssh_cert_or_pop_request):
1614 from .broker import _acquire_token_silently
1615 response = _acquire_token_silently(
1616 "https://{}/{}".format(self.authority.instance, self.authority.tenant),
1617 self.client_id,
1618 account["local_account_id"],
1619 scopes,
1620 claims=_merge_claims_challenge_and_capabilities(
1621 self._client_capabilities, claims_challenge),
1622 correlation_id=correlation_id,
1623 auth_scheme=auth_scheme,
1624 **data)
1625 if response: # Broker provides a decisive outcome
1626 account_was_established_by_broker = account.get(
1627 "account_source") == _GRANT_TYPE_BROKER
1628 broker_attempt_succeeded_just_now = "error" not in response
1629 if account_was_established_by_broker or broker_attempt_succeeded_just_now:
1630 return self._process_broker_response(response, scopes, data)
1632 if auth_scheme:
1633 raise ValueError(self._AUTH_SCHEME_UNSUPPORTED)
1634 if account:
1635 result = self._acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family(
1636 authority, self._decorate_scope(scopes), account,
1637 refresh_reason=refresh_reason, claims_challenge=claims_challenge,
1638 correlation_id=correlation_id,
1639 **kwargs)
1640 else: # The caller is acquire_token_for_client()
1641 result = self._acquire_token_for_client(
1642 scopes, refresh_reason, claims_challenge=claims_challenge,
1643 **kwargs)
1644 if result and "access_token" in result:
1645 result[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP
1646 if (result and "error" not in result) or (not access_token_from_cache):
1647 return result
1648 except http_exceptions:
1649 # Typically network error. Potential AAD outage?
1650 if not access_token_from_cache: # It means there is no fall back option
1651 raise # We choose to bubble up the exception
1652 return access_token_from_cache
1654 def _process_broker_response(self, response, scopes, data):
1655 if "error" not in response:
1656 self.token_cache.add(dict(
1657 client_id=self.client_id,
1658 scope=response["scope"].split() if "scope" in response else scopes,
1659 token_endpoint=self.authority.token_endpoint,
1660 response=response,
1661 data=data,
1662 _account_id=response["_account_id"],
1663 environment=self.authority.instance, # Be consistent with non-broker flows
1664 grant_type=_GRANT_TYPE_BROKER, # A pseudo grant type for TokenCache to mark account_source as broker
1665 ))
1666 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_BROKER
1667 return _clean_up(response)
1669 def _acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family(
1670 self, authority, scopes, account, **kwargs):
1671 query = {
1672 "environment": authority.instance,
1673 "home_account_id": (account or {}).get("home_account_id"),
1674 # "realm": authority.tenant, # AAD RTs are tenant-independent
1675 }
1676 app_metadata = self._get_app_metadata(authority.instance)
1677 if not app_metadata: # Meaning this app is now used for the first time.
1678 # When/if we have a way to directly detect current app's family,
1679 # we'll rewrite this block, to support multiple families.
1680 # For now, we try existing RTs (*). If it works, we are in that family.
1681 # (*) RTs of a different app/family are not supposed to be
1682 # shared with or accessible by us in the first place.
1683 at = self._acquire_token_silent_by_finding_specific_refresh_token(
1684 authority, scopes,
1685 dict(query, family_id="1"), # A hack, we have only 1 family for now
1686 rt_remover=lambda rt_item: None, # NO-OP b/c RTs are likely not mine
1687 break_condition=lambda response: # Break loop when app not in family
1688 # Based on an AAD-only behavior mentioned in internal doc here
1689 # https://msazure.visualstudio.com/One/_git/ESTS-Docs/pullrequest/1138595
1690 "client_mismatch" in response.get("error_additional_info", []),
1691 **kwargs)
1692 if at and "error" not in at:
1693 return at
1694 last_resp = None
1695 if app_metadata.get("family_id"): # Meaning this app belongs to this family
1696 last_resp = at = self._acquire_token_silent_by_finding_specific_refresh_token(
1697 authority, scopes, dict(query, family_id=app_metadata["family_id"]),
1698 **kwargs)
1699 if at and "error" not in at:
1700 return at
1701 # Either this app is an orphan, so we will naturally use its own RT;
1702 # or all attempts above have failed, so we fall back to non-foci behavior.
1703 return self._acquire_token_silent_by_finding_specific_refresh_token(
1704 authority, scopes, dict(query, client_id=self.client_id),
1705 **kwargs) or last_resp
1707 def _get_app_metadata(self, environment):
1708 return self.token_cache._get_app_metadata(
1709 environment=environment, client_id=self.client_id, default={})
1711 def _acquire_token_silent_by_finding_specific_refresh_token(
1712 self, authority, scopes, query,
1713 rt_remover=None, break_condition=lambda response: False,
1714 refresh_reason=None, correlation_id=None, claims_challenge=None,
1715 **kwargs):
1716 matches = list(self.token_cache.search( # We want a list to test emptiness
1717 self.token_cache.CredentialType.REFRESH_TOKEN,
1718 # target=scopes, # AAD RTs are scope-independent
1719 query=query))
1720 logger.debug("Found %d RTs matching %s", len(matches), {
1721 k: _pii_less_home_account_id(v) if k == "home_account_id" and v else v
1722 for k, v in query.items()
1723 })
1725 response = None # A distinguishable value to mean cache is empty
1726 if not matches: # Then exit early to avoid expensive operations
1727 return response
1728 client, _ = self._build_client(
1729 # Potentially expensive if building regional client
1730 self.client_credential, authority, skip_regional_client=True)
1731 telemetry_context = self._build_telemetry_context(
1732 self.ACQUIRE_TOKEN_SILENT_ID,
1733 correlation_id=correlation_id, refresh_reason=refresh_reason)
1734 for entry in sorted( # Since unfit RTs would not be aggressively removed,
1735 # we start from newer RTs which are more likely fit.
1736 matches,
1737 key=lambda e: int(e.get("last_modification_time", "0")),
1738 reverse=True):
1739 logger.debug("Cache attempts an RT")
1740 headers = telemetry_context.generate_headers()
1741 if query.get("home_account_id"): # Then use it as CCS Routing info
1742 headers["X-AnchorMailbox"] = "Oid:{}".format( # case-insensitive value
1743 query["home_account_id"].replace(".", "@"))
1744 response = client.obtain_token_by_refresh_token(
1745 entry, rt_getter=lambda token_item: token_item["secret"],
1746 on_removing_rt=lambda rt_item: None, # Disable RT removal,
1747 # because an invalid_grant could be caused by new MFA policy,
1748 # the RT could still be useful for other MFA-less scope or tenant
1749 on_obtaining_tokens=lambda event: self.token_cache.add(dict(
1750 event,
1751 environment=authority.instance,
1752 skip_account_creation=True, # To honor a concurrent remove_account()
1753 )),
1754 scope=scopes,
1755 headers=headers,
1756 data=dict(
1757 kwargs.pop("data", {}),
1758 claims=_merge_claims_challenge_and_capabilities(
1759 self._client_capabilities, claims_challenge)),
1760 **kwargs)
1761 telemetry_context.update_telemetry(response)
1762 if "error" not in response:
1763 return response
1764 logger.debug("Refresh failed. {error}: {error_description}".format(
1765 error=response.get("error"),
1766 error_description=response.get("error_description"),
1767 ))
1768 if break_condition(response):
1769 break
1770 return response # Returns the latest error (if any), or just None
1772 def _validate_ssh_cert_input_data(self, data):
1773 if data.get("token_type") == "ssh-cert":
1774 if not data.get("req_cnf"):
1775 raise ValueError(
1776 "When requesting an SSH certificate, "
1777 "you must include a string parameter named 'req_cnf' "
1778 "containing the public key in JWK format "
1779 "(https://tools.ietf.org/html/rfc7517).")
1780 if not data.get("key_id"):
1781 raise ValueError(
1782 "When requesting an SSH certificate, "
1783 "you must include a string parameter named 'key_id' "
1784 "which identifies the key in the 'req_cnf' argument.")
1786 def acquire_token_by_refresh_token(self, refresh_token, scopes, **kwargs):
1787 """Acquire token(s) based on a refresh token (RT) obtained from elsewhere.
1789 You use this method only when you have old RTs from elsewhere,
1790 and now you want to migrate them into MSAL.
1791 Calling this method results in new tokens automatically storing into MSAL.
1793 You do NOT need to use this method if you are already using MSAL.
1794 MSAL maintains RT automatically inside its token cache,
1795 and an access token can be retrieved
1796 when you call :func:`~acquire_token_silent`.
1798 :param str refresh_token: The old refresh token, as a string.
1800 :param list scopes:
1801 The scopes associate with this old RT.
1802 Each scope needs to be in the Microsoft identity platform (v2) format.
1803 See `Scopes not resources <https://docs.microsoft.com/en-us/azure/active-directory/develop/migrate-python-adal-msal#scopes-not-resources>`_.
1805 :return:
1806 * A dict contains "error" and some other keys, when error happened.
1807 * A dict contains no "error" key means migration was successful.
1808 """
1809 self._validate_ssh_cert_input_data(kwargs.get("data", {}))
1810 telemetry_context = self._build_telemetry_context(
1811 self.ACQUIRE_TOKEN_BY_REFRESH_TOKEN,
1812 refresh_reason=msal.telemetry.FORCE_REFRESH)
1813 response = _clean_up(self.client.obtain_token_by_refresh_token(
1814 refresh_token,
1815 scope=self._decorate_scope(scopes),
1816 headers=telemetry_context.generate_headers(),
1817 rt_getter=lambda rt: rt,
1818 on_updating_rt=False,
1819 on_removing_rt=lambda rt_item: None, # No OP
1820 **kwargs))
1821 if "access_token" in response:
1822 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP
1823 telemetry_context.update_telemetry(response)
1824 return response
1826 def acquire_token_by_username_password(
1827 self, username, password, scopes, claims_challenge=None,
1828 # Note: We shouldn't need to surface enable_msa_passthrough,
1829 # because this ROPC won't work with MSA account anyway.
1830 auth_scheme=None,
1831 **kwargs):
1832 """Gets a token for a given resource via user credentials.
1834 See this page for constraints of Username Password Flow.
1835 https://github.com/AzureAD/microsoft-authentication-library-for-python/wiki/Username-Password-Authentication
1837 :param str username: Typically a UPN in the form of an email address.
1838 :param str password: The password.
1839 :param list[str] scopes:
1840 Scopes requested to access a protected API (a resource).
1841 :param claims_challenge:
1842 The claims_challenge parameter requests specific claims requested by the resource provider
1843 in the form of a claims_challenge directive in the www-authenticate header to be
1844 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
1845 It is a string of a JSON object which contains lists of claims being requested from these locations.
1847 :param object auth_scheme:
1848 You can provide an ``msal.auth_scheme.PopAuthScheme`` object
1849 so that MSAL will get a Proof-of-Possession (POP) token for you.
1851 New in version 1.26.0.
1853 :return: A dict representing the json response from Microsoft Entra:
1855 - A successful response would contain "access_token" key,
1856 - an error response would contain "error" and usually "error_description".
1858 [Deprecated] This API is deprecated for public client flows and will be
1859 removed in a future release. Use a more secure flow instead.
1860 Migration guide: https://aka.ms/msal-ropc-migration
1862 """
1863 is_confidential_app = self.client_credential or isinstance(
1864 self, ConfidentialClientApplication)
1865 if not is_confidential_app:
1866 warnings.warn("""This API has been deprecated for public client flows, please use a more secure flow.
1867 See https://aka.ms/msal-ropc-migration for migration guidance""", DeprecationWarning)
1868 claims = _merge_claims_challenge_and_capabilities(
1869 self._client_capabilities, claims_challenge)
1870 if self._enable_broker and sys.platform in ("win32", "darwin"):
1871 from .broker import _signin_silently
1872 response = _signin_silently(
1873 "https://{}/{}".format(self.authority.instance, self.authority.tenant),
1874 self.client_id,
1875 scopes, # Decorated scopes won't work due to offline_access
1876 MSALRuntime_Username=username,
1877 MSALRuntime_Password=password,
1878 validateAuthority="no" if (
1879 self.authority._is_known_to_developer
1880 or self._instance_discovery is False) else None,
1881 claims=claims,
1882 auth_scheme=auth_scheme,
1883 )
1884 return self._process_broker_response(response, scopes, kwargs.get("data", {}))
1886 if auth_scheme:
1887 raise ValueError(self._AUTH_SCHEME_UNSUPPORTED)
1888 scopes = self._decorate_scope(scopes)
1889 telemetry_context = self._build_telemetry_context(
1890 self.ACQUIRE_TOKEN_BY_USERNAME_PASSWORD_ID)
1891 headers = telemetry_context.generate_headers()
1892 data = dict(kwargs.pop("data", {}), claims=claims)
1893 response = None
1894 if not self.authority.is_adfs:
1895 user_realm_result = self.authority.user_realm_discovery(
1896 username, correlation_id=headers[msal.telemetry.CLIENT_REQUEST_ID])
1897 if user_realm_result.get("account_type") == "Federated":
1898 response = _clean_up(self._acquire_token_by_username_password_federated(
1899 user_realm_result, username, password, scopes=scopes,
1900 data=data,
1901 headers=headers, **kwargs))
1902 if response is None: # Either ADFS or not federated
1903 response = _clean_up(self.client.obtain_token_by_username_password(
1904 username, password, scope=scopes,
1905 headers=headers,
1906 data=data,
1907 **kwargs))
1908 if "access_token" in response:
1909 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP
1910 telemetry_context.update_telemetry(response)
1911 return response
1913 def _acquire_token_by_username_password_federated(
1914 self, user_realm_result, username, password, scopes=None, **kwargs):
1915 wstrust_endpoint = {}
1916 if user_realm_result.get("federation_metadata_url"):
1917 wstrust_endpoint = mex_send_request(
1918 user_realm_result["federation_metadata_url"],
1919 self.http_client)
1920 if wstrust_endpoint is None:
1921 raise ValueError("Unable to find wstrust endpoint from MEX. "
1922 "This typically happens when attempting MSA accounts. "
1923 "More details available here. "
1924 "https://github.com/AzureAD/microsoft-authentication-library-for-python/wiki/Username-Password-Authentication")
1925 logger.debug("wstrust_endpoint = %s", wstrust_endpoint)
1926 wstrust_result = wst_send_request(
1927 username, password,
1928 user_realm_result.get("cloud_audience_urn", "urn:federation:MicrosoftOnline"),
1929 wstrust_endpoint.get("address",
1930 # Fallback to an AAD supplied endpoint
1931 user_realm_result.get("federation_active_auth_url")),
1932 wstrust_endpoint.get("action"), self.http_client)
1933 if not ("token" in wstrust_result and "type" in wstrust_result):
1934 raise RuntimeError("Unsuccessful RSTR. %s" % wstrust_result)
1935 GRANT_TYPE_SAML1_1 = 'urn:ietf:params:oauth:grant-type:saml1_1-bearer'
1936 grant_type = {
1937 SAML_TOKEN_TYPE_V1: GRANT_TYPE_SAML1_1,
1938 SAML_TOKEN_TYPE_V2: self.client.GRANT_TYPE_SAML2,
1939 WSS_SAML_TOKEN_PROFILE_V1_1: GRANT_TYPE_SAML1_1,
1940 WSS_SAML_TOKEN_PROFILE_V2: self.client.GRANT_TYPE_SAML2
1941 }.get(wstrust_result.get("type"))
1942 if not grant_type:
1943 raise RuntimeError(
1944 "RSTR returned unknown token type: %s", wstrust_result.get("type"))
1945 self.client.grant_assertion_encoders.setdefault( # Register a non-standard type
1946 grant_type, self.client.encode_saml_assertion)
1947 return self.client.obtain_token_by_assertion(
1948 wstrust_result["token"], grant_type, scope=scopes,
1949 on_obtaining_tokens=lambda event: self.token_cache.add(dict(
1950 event,
1951 environment=self.authority.instance,
1952 username=username, # Useful in case IDT contains no such info
1953 )),
1954 **kwargs)
1957class PublicClientApplication(ClientApplication): # browser app or mobile app
1959 DEVICE_FLOW_CORRELATION_ID = "_correlation_id"
1960 CONSOLE_WINDOW_HANDLE = object()
1962 def __init__(
1963 self, client_id, client_credential=None,
1964 *,
1965 enable_broker_on_windows=None,
1966 enable_broker_on_mac=None,
1967 enable_broker_on_linux=None,
1968 enable_broker_on_wsl=None,
1969 **kwargs):
1970 """Same as :func:`ClientApplication.__init__`,
1971 except that ``client_credential`` parameter shall remain ``None``.
1973 .. note::
1975 **What is a broker, and why use it?**
1977 A broker is a component installed on your device.
1978 Broker implicitly gives your device an identity. By using a broker,
1979 your device becomes a factor that can satisfy MFA (Multi-factor authentication).
1980 This factor would become mandatory
1981 if a tenant's admin enables a corresponding Conditional Access (CA) policy.
1982 The broker's presence allows Microsoft identity platform
1983 to have higher confidence that the tokens are being issued to your device,
1984 and that is more secure.
1986 An additional benefit of broker is,
1987 it runs as a long-lived process with your device's OS,
1988 and maintains its own cache,
1989 so that your broker-enabled apps (even a CLI)
1990 could automatically SSO from a previously established signed-in session.
1992 **How to opt in to use broker?**
1994 1. You can set any combination of the following opt-in parameters to true:
1996 +--------------------------+-----------------------------------+------------------------------------------------------------------------------------+
1997 | Opt-in flag | If app will run on | App has registered this as a Desktop platform redirect URI in Azure Portal |
1998 +==========================+===================================+====================================================================================+
1999 | enable_broker_on_windows | Windows 10+ | ms-appx-web://Microsoft.AAD.BrokerPlugin/your_client_id |
2000 +--------------------------+-----------------------------------+------------------------------------------------------------------------------------+
2001 | enable_broker_on_wsl | WSL | ms-appx-web://Microsoft.AAD.BrokerPlugin/your_client_id |
2002 +--------------------------+-----------------------------------+------------------------------------------------------------------------------------+
2003 | enable_broker_on_mac | Mac with Company Portal installed | msauth.com.msauth.unsignedapp://auth |
2004 +--------------------------+-----------------------------------+------------------------------------------------------------------------------------+
2005 | enable_broker_on_linux | Linux with Intune installed | ``https://login.microsoftonline.com/common/oauth2/nativeclient`` (MUST be enabled) |
2006 +--------------------------+-----------------------------------+------------------------------------------------------------------------------------+
2008 2. Install broker dependency,
2009 e.g. ``pip install msal[broker]>=1.33,<2``.
2011 3. Test with ``acquire_token_interactive()`` and ``acquire_token_silent()``.
2013 **The fallback behaviors of MSAL Python's broker support**
2015 MSAL will either error out, or silently fallback to non-broker flows.
2017 1. MSAL will ignore the `enable_broker_...` and bypass broker
2018 on those auth flows that are known to be NOT supported by broker.
2019 This includes ADFS, B2C, etc..
2020 For other "could-use-broker" scenarios, please see below.
2021 2. MSAL errors out when app developer opted-in to use broker
2022 but a direct dependency "mid-tier" package is not installed.
2023 Error message guides app developer to declare the correct dependency
2024 ``msal[broker]``.
2025 We error out here because the error is actionable to app developers.
2026 3. MSAL silently "deactivates" the broker and fallback to non-broker,
2027 when opted-in, dependency installed yet failed to initialize.
2028 We anticipate this would happen on a device whose OS is too old
2029 or the underlying broker component is somehow unavailable.
2030 There is not much an app developer or the end user can do here.
2031 Eventually, the conditional access policy shall
2032 force the user to switch to a different device.
2033 4. MSAL errors out when broker is opted in, installed, initialized,
2034 but subsequent token request(s) failed.
2036 :param boolean enable_broker_on_windows:
2037 This setting is only effective if your app is running on Windows 10+.
2038 This parameter defaults to None, which means MSAL will not utilize a broker.
2040 New in MSAL Python 1.25.0.
2042 :param boolean enable_broker_on_mac:
2043 This setting is only effective if your app is running on Mac.
2044 This parameter defaults to None, which means MSAL will not utilize a broker.
2046 New in MSAL Python 1.31.0.
2048 :param boolean enable_broker_on_linux:
2049 This setting is only effective if your app is running on Linux, including WSL.
2050 This parameter defaults to None, which means MSAL will not utilize a broker.
2052 New in MSAL Python 1.33.0.
2054 :param boolean enable_broker_on_wsl:
2055 This setting is only effective if your app is running on WSL.
2056 This parameter defaults to None, which means MSAL will not utilize a broker.
2058 New in MSAL Python 1.33.0.
2059 """
2060 if client_credential is not None:
2061 raise ValueError("Public Client should not possess credentials")
2063 self._enable_broker = bool(
2064 enable_broker_on_windows and sys.platform == "win32"
2065 or enable_broker_on_mac and sys.platform == "darwin"
2066 or enable_broker_on_linux and sys.platform == "linux"
2067 or enable_broker_on_wsl and is_wsl()
2068 )
2070 super(PublicClientApplication, self).__init__(
2071 client_id, client_credential=None, **kwargs)
2073 def acquire_token_interactive(
2074 self,
2075 scopes, # type: list[str]
2076 prompt=None,
2077 login_hint=None, # type: Optional[str]
2078 domain_hint=None, # type: Optional[str]
2079 claims_challenge=None,
2080 timeout=None,
2081 port=None,
2082 extra_scopes_to_consent=None,
2083 max_age=None,
2084 parent_window_handle=None,
2085 on_before_launching_ui=None,
2086 auth_scheme=None,
2087 **kwargs):
2088 """Acquire token interactively i.e. via a local browser.
2090 Prerequisite: In Azure Portal, configure the Redirect URI of your
2091 "Mobile and Desktop application" as ``http://localhost``.
2092 If you opts in to use broker during ``PublicClientApplication`` creation,
2093 your app also need this Redirect URI:
2094 ``ms-appx-web://Microsoft.AAD.BrokerPlugin/YOUR_CLIENT_ID``
2096 :param list scopes:
2097 It is a list of case-sensitive strings.
2098 :param str prompt:
2099 By default, no prompt value will be sent, not even string ``"none"``.
2100 You will have to specify a value explicitly.
2101 Its valid values are the constants defined in
2102 :class:`Prompt <msal.Prompt>`.
2103 :param str login_hint:
2104 Optional. Identifier of the user. Generally a User Principal Name (UPN).
2105 :param domain_hint:
2106 Can be one of "consumers" or "organizations" or your tenant domain "contoso.com".
2107 If included, it will skip the email-based discovery process that user goes
2108 through on the sign-in page, leading to a slightly more streamlined user experience.
2109 More information on possible values available in
2110 `Auth Code Flow doc <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-auth-code-flow#request-an-authorization-code>`_ and
2111 `domain_hint doc <https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-oapx/86fb452d-e34a-494e-ac61-e526e263b6d8>`_.
2113 :param claims_challenge:
2114 The claims_challenge parameter requests specific claims requested by the resource provider
2115 in the form of a claims_challenge directive in the www-authenticate header to be
2116 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
2117 It is a string of a JSON object which contains lists of claims being requested from these locations.
2119 :param int timeout:
2120 This method will block the current thread.
2121 This parameter specifies the timeout value in seconds.
2122 Default value ``None`` means wait indefinitely.
2124 :param int port:
2125 The port to be used to listen to an incoming auth response.
2126 By default we will use a system-allocated port.
2127 (The rest of the redirect_uri is hard coded as ``http://localhost``.)
2129 :param list extra_scopes_to_consent:
2130 "Extra scopes to consent" is a concept only available in Microsoft Entra.
2131 It refers to other resources you might want to prompt to consent for,
2132 in the same interaction, but for which you won't get back a
2133 token for in this particular operation.
2135 :param int max_age:
2136 OPTIONAL. Maximum Authentication Age.
2137 Specifies the allowable elapsed time in seconds
2138 since the last time the End-User was actively authenticated.
2139 If the elapsed time is greater than this value,
2140 Microsoft identity platform will actively re-authenticate the End-User.
2142 MSAL Python will also automatically validate the auth_time in ID token.
2144 New in version 1.15.
2146 :param int parent_window_handle:
2147 OPTIONAL.
2149 * If your app does not opt in to use broker,
2150 you do not need to provide a ``parent_window_handle`` here.
2152 * If your app opts in to use broker,
2153 ``parent_window_handle`` is required.
2155 - If your app is a GUI app running on Windows or Mac system,
2156 you are required to also provide its window handle,
2157 so that the sign-in window will pop up on top of your window.
2158 - If your app is a console app running on Windows or Mac system,
2159 you can use a placeholder
2160 ``PublicClientApplication.CONSOLE_WINDOW_HANDLE``.
2162 Most Python scripts are console apps.
2164 New in version 1.20.0.
2166 :param function on_before_launching_ui:
2167 A callback with the form of
2168 ``lambda ui="xyz", **kwargs: print("A {} will be launched".format(ui))``,
2169 where ``ui`` will be either "browser" or "broker".
2170 You can use it to inform your end user to expect a pop-up window.
2172 New in version 1.20.0.
2174 :param object auth_scheme:
2175 You can provide an ``msal.auth_scheme.PopAuthScheme`` object
2176 so that MSAL will get a Proof-of-Possession (POP) token for you.
2178 New in version 1.26.0.
2180 :return:
2181 - A dict containing no "error" key,
2182 and typically contains an "access_token" key.
2183 - A dict containing an "error" key, when token refresh failed.
2184 """
2185 data = kwargs.pop("data", {})
2186 enable_msa_passthrough = kwargs.pop( # MUST remove it from kwargs
2187 "enable_msa_passthrough", # Keep it as a hidden param, for now.
2188 # OPTIONAL. MSA-Passthrough is a legacy configuration,
2189 # needed by a small amount of Microsoft first-party apps,
2190 # which would login MSA accounts via ".../organizations" authority.
2191 # If you app belongs to this category, AND you are enabling broker,
2192 # you would want to enable this flag. Default value is False.
2193 # More background of MSA-PT is available from this internal docs:
2194 # https://microsoft.sharepoint.com/:w:/t/Identity-DevEx/EatIUauX3c9Ctw1l7AQ6iM8B5CeBZxc58eoQCE0IuZ0VFw?e=tgc3jP&CID=39c853be-76ea-79d7-ee73-f1b2706ede05
2195 False
2196 ) and data.get("token_type") != "ssh-cert" # Work around a known issue as of PyMsalRuntime 0.8
2197 self._validate_ssh_cert_input_data(data)
2198 is_ssh_cert_or_pop_request = _is_ssh_cert_or_pop_request(data.get("token_type"), auth_scheme)
2200 if not on_before_launching_ui:
2201 on_before_launching_ui = lambda **kwargs: None
2202 if _is_running_in_cloud_shell() and prompt == "none":
2203 # Note: _acquire_token_by_cloud_shell() is always silent,
2204 # so we would not fire on_before_launching_ui()
2205 return self._acquire_token_by_cloud_shell(scopes, data=data)
2206 claims = _merge_claims_challenge_and_capabilities(
2207 self._client_capabilities, claims_challenge)
2208 if self._enable_broker and (sys.platform in ("win32", "darwin") or not is_ssh_cert_or_pop_request):
2209 if parent_window_handle is None:
2210 raise ValueError(
2211 "parent_window_handle is required when you opted into using broker. "
2212 "You need to provide the window handle of your GUI application, "
2213 "or use msal.PublicClientApplication.CONSOLE_WINDOW_HANDLE "
2214 "when and only when your application is a console app.")
2215 if extra_scopes_to_consent:
2216 logger.warning(
2217 "Ignoring parameter extra_scopes_to_consent, "
2218 "which is not supported by broker")
2219 response = self._acquire_token_interactive_via_broker(
2220 scopes,
2221 parent_window_handle,
2222 enable_msa_passthrough,
2223 claims,
2224 data,
2225 on_before_launching_ui,
2226 auth_scheme,
2227 prompt=prompt,
2228 login_hint=login_hint,
2229 max_age=max_age,
2230 )
2231 return self._process_broker_response(response, scopes, data)
2233 if isinstance(auth_scheme, msal.auth_scheme.PopAuthScheme) and sys.platform == "linux":
2234 raise ValueError("POP is not supported on Linux")
2235 elif auth_scheme:
2236 raise ValueError(self._AUTH_SCHEME_UNSUPPORTED)
2237 on_before_launching_ui(ui="browser")
2238 telemetry_context = self._build_telemetry_context(
2239 self.ACQUIRE_TOKEN_INTERACTIVE)
2240 response = _clean_up(self.client.obtain_token_by_browser(
2241 scope=self._decorate_scope(scopes) if scopes else None,
2242 extra_scope_to_consent=extra_scopes_to_consent,
2243 redirect_uri="http://localhost:{port}".format(
2244 # Hardcode the host, for now. AAD portal rejects 127.0.0.1 anyway
2245 port=port or 0),
2246 prompt=prompt,
2247 login_hint=login_hint,
2248 max_age=max_age,
2249 timeout=timeout,
2250 auth_params={
2251 "claims": claims,
2252 "domain_hint": domain_hint,
2253 },
2254 data=dict(data, claims=claims),
2255 headers=telemetry_context.generate_headers(),
2256 browser_name=_preferred_browser(),
2257 **kwargs))
2258 if "access_token" in response:
2259 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP
2260 telemetry_context.update_telemetry(response)
2261 return response
2263 def _acquire_token_interactive_via_broker(
2264 self,
2265 scopes, # type: list[str]
2266 parent_window_handle, # type: int
2267 enable_msa_passthrough, # type: boolean
2268 claims, # type: str
2269 data, # type: dict
2270 on_before_launching_ui, # type: callable
2271 auth_scheme, # type: object
2272 prompt=None,
2273 login_hint=None, # type: Optional[str]
2274 max_age=None,
2275 **kwargs):
2276 from .broker import _signin_interactively, _signin_silently, _acquire_token_silently
2277 if "welcome_template" in kwargs:
2278 logger.debug(kwargs["welcome_template"]) # Experimental
2279 authority = "https://{}/{}".format(
2280 self.authority.instance, self.authority.tenant)
2281 validate_authority = "no" if (
2282 self.authority._is_known_to_developer
2283 or self._instance_discovery is False) else None
2284 # Calls different broker methods to mimic the OIDC behaviors
2285 if login_hint and prompt != "select_account": # OIDC prompts when the user did not sign in
2286 accounts = self.get_accounts(username=login_hint)
2287 if len(accounts) == 1: # Unambiguously proceed with this account
2288 logger.debug("Calling broker._acquire_token_silently()")
2289 response = _acquire_token_silently( # When it works, it bypasses prompt
2290 authority,
2291 self.client_id,
2292 accounts[0]["local_account_id"],
2293 scopes,
2294 claims=claims,
2295 auth_scheme=auth_scheme,
2296 **data)
2297 if response and "error" not in response:
2298 return response
2299 # login_hint undecisive or not exists
2300 if prompt == "none" or not prompt: # Must/Can attempt _signin_silently()
2301 logger.debug("Calling broker._signin_silently()")
2302 response = _signin_silently( # Unlike OIDC, it doesn't honor login_hint
2303 authority, self.client_id, scopes,
2304 validateAuthority=validate_authority,
2305 claims=claims,
2306 max_age=max_age,
2307 enable_msa_pt=enable_msa_passthrough,
2308 auth_scheme=auth_scheme,
2309 **data)
2310 is_wrong_account = bool(
2311 # _signin_silently() only gets tokens for default account,
2312 # but this seems to have been fixed in PyMsalRuntime 0.11.2
2313 "access_token" in response and login_hint
2314 and login_hint != response.get(
2315 "id_token_claims", {}).get("preferred_username"))
2316 wrong_account_error_message = (
2317 'prompt="none" will not work for login_hint="non-default-user"')
2318 if is_wrong_account:
2319 logger.debug(wrong_account_error_message)
2320 if prompt == "none":
2321 return response if not is_wrong_account else {
2322 "error": "broker_error",
2323 "error_description": wrong_account_error_message,
2324 }
2325 else:
2326 assert bool(prompt) is False
2327 from pymsalruntime import Response_Status
2328 recoverable_errors = frozenset([
2329 Response_Status.Status_AccountUnusable,
2330 Response_Status.Status_InteractionRequired,
2331 ])
2332 if is_wrong_account or "error" in response and response.get(
2333 "_broker_status") in recoverable_errors:
2334 pass # It will fall back to the _signin_interactively()
2335 else:
2336 return response
2338 logger.debug("Falls back to broker._signin_interactively()")
2339 on_before_launching_ui(ui="broker")
2340 return _signin_interactively(
2341 authority, self.client_id, scopes,
2342 None if parent_window_handle is self.CONSOLE_WINDOW_HANDLE
2343 else parent_window_handle,
2344 validateAuthority=validate_authority,
2345 login_hint=login_hint,
2346 prompt=prompt,
2347 claims=claims,
2348 max_age=max_age,
2349 enable_msa_pt=enable_msa_passthrough,
2350 auth_scheme=auth_scheme,
2351 **data)
2353 def initiate_device_flow(self, scopes=None, *, claims_challenge=None, **kwargs):
2354 """Initiate a Device Flow instance,
2355 which will be used in :func:`~acquire_token_by_device_flow`.
2357 :param list[str] scopes:
2358 Scopes requested to access a protected API (a resource).
2359 :return: A dict representing a newly created Device Flow object.
2361 - A successful response would contain "user_code" key, among others
2362 - an error response would contain some other readable key/value pairs.
2363 """
2364 correlation_id = msal.telemetry._get_new_correlation_id()
2365 flow = self.client.initiate_device_flow(
2366 scope=self._decorate_scope(scopes or []),
2367 headers={msal.telemetry.CLIENT_REQUEST_ID: correlation_id},
2368 data={"claims": _merge_claims_challenge_and_capabilities(
2369 self._client_capabilities, claims_challenge)},
2370 **kwargs)
2371 flow[self.DEVICE_FLOW_CORRELATION_ID] = correlation_id
2372 return flow
2374 def acquire_token_by_device_flow(self, flow, claims_challenge=None, **kwargs):
2375 """Obtain token by a device flow object, with customizable polling effect.
2377 :param dict flow:
2378 A dict previously generated by :func:`~initiate_device_flow`.
2379 By default, this method's polling effect will block current thread.
2380 You can abort the polling loop at any time,
2381 by changing the value of the flow's "expires_at" key to 0.
2382 :param claims_challenge:
2383 The claims_challenge parameter requests specific claims requested by the resource provider
2384 in the form of a claims_challenge directive in the www-authenticate header to be
2385 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
2386 It is a string of a JSON object which contains lists of claims being requested from these locations.
2388 :return: A dict representing the json response from Microsoft Entra:
2390 - A successful response would contain "access_token" key,
2391 - an error response would contain "error" and usually "error_description".
2392 """
2393 telemetry_context = self._build_telemetry_context(
2394 self.ACQUIRE_TOKEN_BY_DEVICE_FLOW_ID,
2395 correlation_id=flow.get(self.DEVICE_FLOW_CORRELATION_ID))
2396 response = _clean_up(self.client.obtain_token_by_device_flow(
2397 flow,
2398 data=dict(
2399 kwargs.pop("data", {}),
2400 code=flow["device_code"], # 2018-10-4 Hack:
2401 # during transition period,
2402 # service seemingly need both device_code and code parameter.
2403 claims=_merge_claims_challenge_and_capabilities(
2404 self._client_capabilities, claims_challenge),
2405 ),
2406 headers=telemetry_context.generate_headers(),
2407 **kwargs))
2408 if "access_token" in response:
2409 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP
2410 telemetry_context.update_telemetry(response)
2411 return response
2414class ConfidentialClientApplication(ClientApplication): # server-side web app
2415 """Same as :func:`ClientApplication.__init__`,
2416 except that ``allow_broker`` parameter shall remain ``None``.
2417 """
2419 def acquire_token_for_client(self, scopes, claims_challenge=None, **kwargs):
2420 """Acquires token for the current confidential client, not for an end user.
2422 Since MSAL Python 1.23, it will automatically look for token from cache,
2423 and only send request to Identity Provider when cache misses.
2425 :param list[str] scopes: (Required)
2426 Scopes requested to access a protected API (a resource).
2427 :param claims_challenge:
2428 The claims_challenge parameter requests specific claims requested by the resource provider
2429 in the form of a claims_challenge directive in the www-authenticate header to be
2430 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
2431 It is a string of a JSON object which contains lists of claims being requested from these locations.
2433 :return: A dict representing the json response from Microsoft Entra:
2435 - A successful response would contain "access_token" key,
2436 - an error response would contain "error" and usually "error_description".
2437 """
2438 if kwargs.get("force_refresh"):
2439 raise ValueError( # We choose to disallow force_refresh
2440 "Historically, this method does not support force_refresh behavior. "
2441 )
2442 return _clean_up(self._acquire_token_silent_with_error(
2443 scopes, None, claims_challenge=claims_challenge, **kwargs))
2445 def _acquire_token_for_client(
2446 self,
2447 scopes,
2448 refresh_reason,
2449 claims_challenge=None,
2450 **kwargs
2451 ):
2452 if self.authority.tenant.lower() in ["common", "organizations"]:
2453 warnings.warn(
2454 "Using /common or /organizations authority "
2455 "in acquire_token_for_client() is unreliable. "
2456 "Please use a specific tenant instead.", DeprecationWarning)
2457 self._validate_ssh_cert_input_data(kwargs.get("data", {}))
2458 telemetry_context = self._build_telemetry_context(
2459 self.ACQUIRE_TOKEN_FOR_CLIENT_ID, refresh_reason=refresh_reason)
2460 client = self._regional_client or self.client
2461 response = client.obtain_token_for_client(
2462 scope=scopes, # This grant flow requires no scope decoration
2463 headers=telemetry_context.generate_headers(),
2464 data=dict(
2465 kwargs.pop("data", {}),
2466 claims=_merge_claims_challenge_and_capabilities(
2467 self._client_capabilities, claims_challenge)),
2468 **kwargs)
2469 telemetry_context.update_telemetry(response)
2470 return response
2472 def remove_tokens_for_client(self):
2473 """Remove all tokens that were previously acquired via
2474 :func:`~acquire_token_for_client()` for the current client."""
2475 for env in [self.authority.instance] + self._get_authority_aliases(
2476 self.authority.instance):
2477 for at in list(self.token_cache.search( # Remove ATs from a snapshot
2478 TokenCache.CredentialType.ACCESS_TOKEN, query={
2479 "client_id": self.client_id,
2480 "environment": env,
2481 "home_account_id": None, # These are mostly app-only tokens
2482 })):
2483 self.token_cache.remove_at(at)
2484 # acquire_token_for_client() obtains no RTs, so we have no RT to remove
2486 def acquire_token_on_behalf_of(self, user_assertion, scopes, claims_challenge=None, **kwargs):
2487 """Acquires token using on-behalf-of (OBO) flow.
2489 The current app is a middle-tier service which was called with a token
2490 representing an end user.
2491 The current app can use such token (a.k.a. a user assertion) to request
2492 another token to access downstream web API, on behalf of that user.
2493 See `detail docs here <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-on-behalf-of-flow>`_ .
2495 The current middle-tier app has no user interaction to obtain consent.
2496 See how to gain consent upfront for your middle-tier app from this article.
2497 https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-on-behalf-of-flow#gaining-consent-for-the-middle-tier-application
2499 :param str user_assertion: The incoming token already received by this app
2500 :param list[str] scopes: Scopes required by downstream API (a resource).
2501 :param claims_challenge:
2502 The claims_challenge parameter requests specific claims requested by the resource provider
2503 in the form of a claims_challenge directive in the www-authenticate header to be
2504 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
2505 It is a string of a JSON object which contains lists of claims being requested from these locations.
2507 :return: A dict representing the json response from Microsoft Entra:
2509 - A successful response would contain "access_token" key,
2510 - an error response would contain "error" and usually "error_description".
2511 """
2512 telemetry_context = self._build_telemetry_context(
2513 self.ACQUIRE_TOKEN_ON_BEHALF_OF_ID)
2514 # The implementation is NOT based on Token Exchange (RFC 8693)
2515 response = _clean_up(self.client.obtain_token_by_assertion( # bases on assertion RFC 7521
2516 user_assertion,
2517 self.client.GRANT_TYPE_JWT, # IDTs and AAD ATs are all JWTs
2518 scope=self._decorate_scope(scopes), # Decoration is used for:
2519 # 1. Explicitly requesting an RT, without relying on AAD default
2520 # behavior, even though it currently still issues an RT.
2521 # 2. Requesting an IDT (which would otherwise be unavailable)
2522 # so that the calling app could use id_token_claims to implement
2523 # their own cache mapping, which is likely needed in web apps.
2524 data=dict(
2525 kwargs.pop("data", {}),
2526 requested_token_use="on_behalf_of",
2527 claims=_merge_claims_challenge_and_capabilities(
2528 self._client_capabilities, claims_challenge)),
2529 headers=telemetry_context.generate_headers(),
2530 # TBD: Expose a login_hint (or ccs_routing_hint) param for web app
2531 **kwargs))
2532 if "access_token" in response:
2533 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP
2534 telemetry_context.update_telemetry(response)
2535 return response