Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/msal/application.py: 21%
531 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:20 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:20 +0000
1import functools
2import json
3import time
4try: # Python 2
5 from urlparse import urljoin
6except: # Python 3
7 from urllib.parse import urljoin
8import logging
9import sys
10import warnings
11from threading import Lock
12import os
14from .oauth2cli import Client, JwtAssertionCreator
15from .oauth2cli.oidc import decode_part
16from .authority import Authority, WORLD_WIDE
17from .mex import send_request as mex_send_request
18from .wstrust_request import send_request as wst_send_request
19from .wstrust_response import *
20from .token_cache import TokenCache, _get_username
21import msal.telemetry
22from .region import _detect_region
23from .throttled_http_client import ThrottledHttpClient
24from .cloudshell import _is_running_in_cloud_shell
27# The __init__.py will import this. Not the other way around.
28__version__ = "1.22.0" # When releasing, also check and bump our dependencies's versions if needed
30logger = logging.getLogger(__name__)
31_AUTHORITY_TYPE_CLOUDSHELL = "CLOUDSHELL"
33def extract_certs(public_cert_content):
34 # Parses raw public certificate file contents and returns a list of strings
35 # Usage: headers = {"x5c": extract_certs(open("my_cert.pem").read())}
36 public_certificates = re.findall(
37 r'-----BEGIN CERTIFICATE-----(?P<cert_value>[^-]+)-----END CERTIFICATE-----',
38 public_cert_content, re.I)
39 if public_certificates:
40 return [cert.strip() for cert in public_certificates]
41 # The public cert tags are not found in the input,
42 # let's make best effort to exclude a private key pem file.
43 if "PRIVATE KEY" in public_cert_content:
44 raise ValueError(
45 "We expect your public key but detect a private key instead")
46 return [public_cert_content.strip()]
49def _merge_claims_challenge_and_capabilities(capabilities, claims_challenge):
50 # Represent capabilities as {"access_token": {"xms_cc": {"values": capabilities}}}
51 # and then merge/add it into incoming claims
52 if not capabilities:
53 return claims_challenge
54 claims_dict = json.loads(claims_challenge) if claims_challenge else {}
55 for key in ["access_token"]: # We could add "id_token" if we'd decide to
56 claims_dict.setdefault(key, {}).update(xms_cc={"values": capabilities})
57 return json.dumps(claims_dict)
60def _str2bytes(raw):
61 # A conversion based on duck-typing rather than six.text_type
62 try:
63 return raw.encode(encoding="utf-8")
64 except:
65 return raw
68def _clean_up(result):
69 if isinstance(result, dict):
70 return {
71 k: result[k] for k in result
72 if k != "refresh_in" # MSAL handled refresh_in, customers need not
73 and not k.startswith('_') # Skim internal properties
74 }
75 return result # It could be None
78def _preferred_browser():
79 """Register Edge and return a name suitable for subsequent webbrowser.get(...)
80 when appropriate. Otherwise return None.
81 """
82 # On Linux, only Edge will provide device-based Conditional Access support
83 if sys.platform != "linux": # On other platforms, we have no browser preference
84 return None
85 browser_path = "/usr/bin/microsoft-edge" # Use a full path owned by sys admin
86 # Note: /usr/bin/microsoft-edge, /usr/bin/microsoft-edge-stable, etc.
87 # are symlinks that point to the actual binaries which are found under
88 # /opt/microsoft/msedge/msedge or /opt/microsoft/msedge-beta/msedge.
89 # Either method can be used to detect an Edge installation.
90 user_has_no_preference = "BROWSER" not in os.environ
91 user_wont_mind_edge = "microsoft-edge" in os.environ.get("BROWSER", "") # Note:
92 # BROWSER could contain "microsoft-edge" or "/path/to/microsoft-edge".
93 # Python documentation (https://docs.python.org/3/library/webbrowser.html)
94 # does not document the name being implicitly register,
95 # so there is no public API to know whether the ENV VAR browser would work.
96 # Therefore, we would not bother examine the env var browser's type.
97 # We would just register our own Edge instance.
98 if (user_has_no_preference or user_wont_mind_edge) and os.path.exists(browser_path):
99 try:
100 import webbrowser # Lazy import. Some distro may not have this.
101 browser_name = "msal-edge" # Avoid popular name "microsoft-edge"
102 # otherwise `BROWSER="microsoft-edge"; webbrowser.get("microsoft-edge")`
103 # would return a GenericBrowser instance which won't work.
104 try:
105 registration_available = isinstance(
106 webbrowser.get(browser_name), webbrowser.BackgroundBrowser)
107 except webbrowser.Error:
108 registration_available = False
109 if not registration_available:
110 logger.debug("Register %s with %s", browser_name, browser_path)
111 # By registering our own browser instance with our own name,
112 # rather than populating a process-wide BROWSER enn var,
113 # this approach does not have side effect on non-MSAL code path.
114 webbrowser.register( # Even double-register happens to work fine
115 browser_name, None, webbrowser.BackgroundBrowser(browser_path))
116 return browser_name
117 except ImportError:
118 pass # We may still proceed
119 return None
122class _ClientWithCcsRoutingInfo(Client):
124 def initiate_auth_code_flow(self, **kwargs):
125 if kwargs.get("login_hint"): # eSTS could have utilized this as-is, but nope
126 kwargs["X-AnchorMailbox"] = "UPN:%s" % kwargs["login_hint"]
127 return super(_ClientWithCcsRoutingInfo, self).initiate_auth_code_flow(
128 client_info=1, # To be used as CSS Routing info
129 **kwargs)
131 def obtain_token_by_auth_code_flow(
132 self, auth_code_flow, auth_response, **kwargs):
133 # Note: the obtain_token_by_browser() is also covered by this
134 assert isinstance(auth_code_flow, dict) and isinstance(auth_response, dict)
135 headers = kwargs.pop("headers", {})
136 client_info = json.loads(
137 decode_part(auth_response["client_info"])
138 ) if auth_response.get("client_info") else {}
139 if "uid" in client_info and "utid" in client_info:
140 # Note: The value of X-AnchorMailbox is also case-insensitive
141 headers["X-AnchorMailbox"] = "Oid:{uid}@{utid}".format(**client_info)
142 return super(_ClientWithCcsRoutingInfo, self).obtain_token_by_auth_code_flow(
143 auth_code_flow, auth_response, headers=headers, **kwargs)
145 def obtain_token_by_username_password(self, username, password, **kwargs):
146 headers = kwargs.pop("headers", {})
147 headers["X-AnchorMailbox"] = "upn:{}".format(username)
148 return super(_ClientWithCcsRoutingInfo, self).obtain_token_by_username_password(
149 username, password, headers=headers, **kwargs)
152class ClientApplication(object):
153 ACQUIRE_TOKEN_SILENT_ID = "84"
154 ACQUIRE_TOKEN_BY_REFRESH_TOKEN = "85"
155 ACQUIRE_TOKEN_BY_USERNAME_PASSWORD_ID = "301"
156 ACQUIRE_TOKEN_ON_BEHALF_OF_ID = "523"
157 ACQUIRE_TOKEN_BY_DEVICE_FLOW_ID = "622"
158 ACQUIRE_TOKEN_FOR_CLIENT_ID = "730"
159 ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID = "832"
160 ACQUIRE_TOKEN_INTERACTIVE = "169"
161 GET_ACCOUNTS_ID = "902"
162 REMOVE_ACCOUNT_ID = "903"
164 ATTEMPT_REGION_DISCOVERY = True # "TryAutoDetect"
166 def __init__(
167 self, client_id,
168 client_credential=None, authority=None, validate_authority=True,
169 token_cache=None,
170 http_client=None,
171 verify=True, proxies=None, timeout=None,
172 client_claims=None, app_name=None, app_version=None,
173 client_capabilities=None,
174 azure_region=None, # Note: We choose to add this param in this base class,
175 # despite it is currently only needed by ConfidentialClientApplication.
176 # This way, it holds the same positional param place for PCA,
177 # when we would eventually want to add this feature to PCA in future.
178 exclude_scopes=None,
179 http_cache=None,
180 instance_discovery=None,
181 allow_broker=None,
182 ):
183 """Create an instance of application.
185 :param str client_id: Your app has a client_id after you register it on AAD.
187 :param Union[str, dict] client_credential:
188 For :class:`PublicClientApplication`, you simply use `None` here.
189 For :class:`ConfidentialClientApplication`,
190 it can be a string containing client secret,
191 or an X509 certificate container in this form::
193 {
194 "private_key": "...-----BEGIN PRIVATE KEY-----...",
195 "thumbprint": "A1B2C3D4E5F6...",
196 "public_certificate": "...-----BEGIN CERTIFICATE-----... (Optional. See below.)",
197 "passphrase": "Passphrase if the private_key is encrypted (Optional. Added in version 1.6.0)",
198 }
200 *Added in version 0.5.0*:
201 public_certificate (optional) is public key certificate
202 which will be sent through 'x5c' JWT header only for
203 subject name and issuer authentication to support cert auto rolls.
205 Per `specs <https://tools.ietf.org/html/rfc7515#section-4.1.6>`_,
206 "the certificate containing
207 the public key corresponding to the key used to digitally sign the
208 JWS MUST be the first certificate. This MAY be followed by
209 additional certificates, with each subsequent certificate being the
210 one used to certify the previous one."
211 However, your certificate's issuer may use a different order.
212 So, if your attempt ends up with an error AADSTS700027 -
213 "The provided signature value did not match the expected signature value",
214 you may try use only the leaf cert (in PEM/str format) instead.
216 *Added in version 1.13.0*:
217 It can also be a completely pre-signed assertion that you've assembled yourself.
218 Simply pass a container containing only the key "client_assertion", like this::
220 {
221 "client_assertion": "...a JWT with claims aud, exp, iss, jti, nbf, and sub..."
222 }
224 :param dict client_claims:
225 *Added in version 0.5.0*:
226 It is a dictionary of extra claims that would be signed by
227 by this :class:`ConfidentialClientApplication` 's private key.
228 For example, you can use {"client_ip": "x.x.x.x"}.
229 You may also override any of the following default claims::
231 {
232 "aud": the_token_endpoint,
233 "iss": self.client_id,
234 "sub": same_as_issuer,
235 "exp": now + 10_min,
236 "iat": now,
237 "jti": a_random_uuid
238 }
240 :param str authority:
241 A URL that identifies a token authority. It should be of the format
242 ``https://login.microsoftonline.com/your_tenant``
243 By default, we will use ``https://login.microsoftonline.com/common``
245 *Changed in version 1.17*: you can also use predefined constant
246 and a builder like this::
248 from msal.authority import (
249 AuthorityBuilder,
250 AZURE_US_GOVERNMENT, AZURE_CHINA, AZURE_PUBLIC)
251 my_authority = AuthorityBuilder(AZURE_PUBLIC, "contoso.onmicrosoft.com")
252 # Now you get an equivalent of
253 # "https://login.microsoftonline.com/contoso.onmicrosoft.com"
255 # You can feed such an authority to msal's ClientApplication
256 from msal import PublicClientApplication
257 app = PublicClientApplication("my_client_id", authority=my_authority, ...)
259 :param bool validate_authority: (optional) Turns authority validation
260 on or off. This parameter default to true.
261 :param TokenCache cache:
262 Sets the token cache used by this ClientApplication instance.
263 By default, an in-memory cache will be created and used.
264 :param http_client: (optional)
265 Your implementation of abstract class HttpClient <msal.oauth2cli.http.http_client>
266 Defaults to a requests session instance.
267 Since MSAL 1.11.0, the default session would be configured
268 to attempt one retry on connection error.
269 If you are providing your own http_client,
270 it will be your http_client's duty to decide whether to perform retry.
272 :param verify: (optional)
273 It will be passed to the
274 `verify parameter in the underlying requests library
275 <http://docs.python-requests.org/en/v2.9.1/user/advanced/#ssl-cert-verification>`_
276 This does not apply if you have chosen to pass your own Http client
277 :param proxies: (optional)
278 It will be passed to the
279 `proxies parameter in the underlying requests library
280 <http://docs.python-requests.org/en/v2.9.1/user/advanced/#proxies>`_
281 This does not apply if you have chosen to pass your own Http client
282 :param timeout: (optional)
283 It will be passed to the
284 `timeout parameter in the underlying requests library
285 <http://docs.python-requests.org/en/v2.9.1/user/advanced/#timeouts>`_
286 This does not apply if you have chosen to pass your own Http client
287 :param app_name: (optional)
288 You can provide your application name for Microsoft telemetry purposes.
289 Default value is None, means it will not be passed to Microsoft.
290 :param app_version: (optional)
291 You can provide your application version for Microsoft telemetry purposes.
292 Default value is None, means it will not be passed to Microsoft.
293 :param list[str] client_capabilities: (optional)
294 Allows configuration of one or more client capabilities, e.g. ["CP1"].
296 Client capability is meant to inform the Microsoft identity platform
297 (STS) what this client is capable for,
298 so STS can decide to turn on certain features.
299 For example, if client is capable to handle *claims challenge*,
300 STS can then issue CAE access tokens to resources
301 knowing when the resource emits *claims challenge*
302 the client will be capable to handle.
304 Implementation details:
305 Client capability is implemented using "claims" parameter on the wire,
306 for now.
307 MSAL will combine them into
308 `claims parameter <https://openid.net/specs/openid-connect-core-1_0-final.html#ClaimsParameter>`_
309 which you will later provide via one of the acquire-token request.
311 :param str azure_region:
312 AAD provides regional endpoints for apps to opt in
313 to keep their traffic remain inside that region.
315 As of 2021 May, regional service is only available for
316 ``acquire_token_for_client()`` sent by any of the following scenarios::
318 1. An app powered by a capable MSAL
319 (MSAL Python 1.12+ will be provisioned)
321 2. An app with managed identity, which is formerly known as MSI.
322 (However MSAL Python does not support managed identity,
323 so this one does not apply.)
325 3. An app authenticated by
326 `Subject Name/Issuer (SNI) <https://github.com/AzureAD/microsoft-authentication-library-for-python/issues/60>`_.
328 4. An app which already onboard to the region's allow-list.
330 This parameter defaults to None, which means region behavior remains off.
332 App developer can opt in to a regional endpoint,
333 by provide its region name, such as "westus", "eastus2".
334 You can find a full list of regions by running
335 ``az account list-locations -o table``, or referencing to
336 `this doc <https://docs.microsoft.com/en-us/dotnet/api/microsoft.azure.management.resourcemanager.fluent.core.region?view=azure-dotnet>`_.
338 An app running inside Azure Functions and Azure VM can use a special keyword
339 ``ClientApplication.ATTEMPT_REGION_DISCOVERY`` to auto-detect region.
341 .. note::
343 Setting ``azure_region`` to non-``None`` for an app running
344 outside of Azure Function/VM could hang indefinitely.
346 You should consider opting in/out region behavior on-demand,
347 by loading ``azure_region=None`` or ``azure_region="westus"``
348 or ``azure_region=True`` (which means opt-in and auto-detect)
349 from your per-deployment configuration, and then do
350 ``app = ConfidentialClientApplication(..., azure_region=azure_region)``.
352 Alternatively, you can configure a short timeout,
353 or provide a custom http_client which has a short timeout.
354 That way, the latency would be under your control,
355 but still less performant than opting out of region feature.
357 New in version 1.12.0.
359 :param list[str] exclude_scopes: (optional)
360 Historically MSAL hardcodes `offline_access` scope,
361 which would allow your app to have prolonged access to user's data.
362 If that is unnecessary or undesirable for your app,
363 now you can use this parameter to supply an exclusion list of scopes,
364 such as ``exclude_scopes = ["offline_access"]``.
366 :param dict http_cache:
367 MSAL has long been caching tokens in the ``token_cache``.
368 Recently, MSAL also introduced a concept of ``http_cache``,
369 by automatically caching some finite amount of non-token http responses,
370 so that *long-lived*
371 ``PublicClientApplication`` and ``ConfidentialClientApplication``
372 would be more performant and responsive in some situations.
374 This ``http_cache`` parameter accepts any dict-like object.
375 If not provided, MSAL will use an in-memory dict.
377 If your app is a command-line app (CLI),
378 you would want to persist your http_cache across different CLI runs.
379 The following recipe shows a way to do so::
381 # Just add the following lines at the beginning of your CLI script
382 import sys, atexit, pickle
383 http_cache_filename = sys.argv[0] + ".http_cache"
384 try:
385 with open(http_cache_filename, "rb") as f:
386 persisted_http_cache = pickle.load(f) # Take a snapshot
387 except (
388 FileNotFoundError, # Or IOError in Python 2
389 pickle.UnpicklingError, # A corrupted http cache file
390 ):
391 persisted_http_cache = {} # Recover by starting afresh
392 atexit.register(lambda: pickle.dump(
393 # When exit, flush it back to the file.
394 # It may occasionally overwrite another process's concurrent write,
395 # but that is fine. Subsequent runs will reach eventual consistency.
396 persisted_http_cache, open(http_cache_file, "wb")))
398 # And then you can implement your app as you normally would
399 app = msal.PublicClientApplication(
400 "your_client_id",
401 ...,
402 http_cache=persisted_http_cache, # Utilize persisted_http_cache
403 ...,
404 #token_cache=..., # You may combine the old token_cache trick
405 # Please refer to token_cache recipe at
406 # https://msal-python.readthedocs.io/en/latest/#msal.SerializableTokenCache
407 )
408 app.acquire_token_interactive(["your", "scope"], ...)
410 Content inside ``http_cache`` are cheap to obtain.
411 There is no need to share them among different apps.
413 Content inside ``http_cache`` will contain no tokens nor
414 Personally Identifiable Information (PII). Encryption is unnecessary.
416 New in version 1.16.0.
418 :param boolean instance_discovery:
419 Historically, MSAL would connect to a central endpoint located at
420 ``https://login.microsoftonline.com`` to acquire some metadata,
421 especially when using an unfamiliar authority.
422 This behavior is known as Instance Discovery.
424 This parameter defaults to None, which enables the Instance Discovery.
426 If you know some authorities which you allow MSAL to operate with as-is,
427 without involving any Instance Discovery, the recommended pattern is::
429 known_authorities = frozenset([ # Treat your known authorities as const
430 "https://contoso.com/adfs", "https://login.azs/foo"])
431 ...
432 authority = "https://contoso.com/adfs" # Assuming your app will use this
433 app1 = PublicClientApplication(
434 "client_id",
435 authority=authority,
436 # Conditionally disable Instance Discovery for known authorities
437 instance_discovery=authority not in known_authorities,
438 )
440 If you do not know some authorities beforehand,
441 yet still want MSAL to accept any authority that you will provide,
442 you can use a ``False`` to unconditionally disable Instance Discovery.
444 New in version 1.19.0.
446 :param boolean allow_broker:
447 This parameter is NOT applicable to :class:`ConfidentialClientApplication`.
449 A broker is a component installed on your device.
450 Broker implicitly gives your device an identity. By using a broker,
451 your device becomes a factor that can satisfy MFA (Multi-factor authentication).
452 This factor would become mandatory
453 if a tenant's admin enables a corresponding Conditional Access (CA) policy.
454 The broker's presence allows Microsoft identity platform
455 to have higher confidence that the tokens are being issued to your device,
456 and that is more secure.
458 An additional benefit of broker is,
459 it runs as a long-lived process with your device's OS,
460 and maintains its own cache,
461 so that your broker-enabled apps (even a CLI)
462 could automatically SSO from a previously established signed-in session.
464 This parameter defaults to None, which means MSAL will not utilize a broker.
465 If this parameter is set to True,
466 MSAL will use the broker whenever possible,
467 and automatically fall back to non-broker behavior.
468 That also means your app does not need to enable broker conditionally,
469 you can always set allow_broker to True,
470 as long as your app meets the following prerequisite:
472 * Installed optional dependency, e.g. ``pip install msal[broker]>=1.20,<2``.
473 (Note that broker is currently only available on Windows 10+)
475 * Register a new redirect_uri for your desktop app as:
476 ``ms-appx-web://Microsoft.AAD.BrokerPlugin/your_client_id``
478 * Tested your app in following scenarios:
480 * Windows 10+
482 * PublicClientApplication's following methods::
483 acquire_token_interactive(), acquire_token_by_username_password(),
484 acquire_token_silent() (or acquire_token_silent_with_error()).
486 * AAD and MSA accounts (i.e. Non-ADFS, non-B2C)
488 New in version 1.20.0.
489 """
490 self.client_id = client_id
491 self.client_credential = client_credential
492 self.client_claims = client_claims
493 self._client_capabilities = client_capabilities
494 self._instance_discovery = instance_discovery
496 if exclude_scopes and not isinstance(exclude_scopes, list):
497 raise ValueError(
498 "Invalid exclude_scopes={}. It need to be a list of strings.".format(
499 repr(exclude_scopes)))
500 self._exclude_scopes = frozenset(exclude_scopes or [])
501 if "openid" in self._exclude_scopes:
502 raise ValueError(
503 'Invalid exclude_scopes={}. You can not opt out "openid" scope'.format(
504 repr(exclude_scopes)))
506 if http_client:
507 self.http_client = http_client
508 else:
509 import requests # Lazy load
511 self.http_client = requests.Session()
512 self.http_client.verify = verify
513 self.http_client.proxies = proxies
514 # Requests, does not support session - wide timeout
515 # But you can patch that (https://github.com/psf/requests/issues/3341):
516 self.http_client.request = functools.partial(
517 self.http_client.request, timeout=timeout)
519 # Enable a minimal retry. Better than nothing.
520 # https://github.com/psf/requests/blob/v2.25.1/requests/adapters.py#L94-L108
521 a = requests.adapters.HTTPAdapter(max_retries=1)
522 self.http_client.mount("http://", a)
523 self.http_client.mount("https://", a)
524 self.http_client = ThrottledHttpClient(
525 self.http_client,
526 {} if http_cache is None else http_cache, # Default to an in-memory dict
527 )
529 self.app_name = app_name
530 self.app_version = app_version
532 # Here the self.authority will not be the same type as authority in input
533 try:
534 authority_to_use = authority or "https://{}/common/".format(WORLD_WIDE)
535 self.authority = Authority(
536 authority_to_use,
537 self.http_client,
538 validate_authority=validate_authority,
539 instance_discovery=self._instance_discovery,
540 )
541 except ValueError: # Those are explicit authority validation errors
542 raise
543 except Exception: # The rest are typically connection errors
544 if validate_authority and azure_region:
545 # Since caller opts in to use region, here we tolerate connection
546 # errors happened during authority validation at non-region endpoint
547 self.authority = Authority(
548 authority_to_use,
549 self.http_client,
550 instance_discovery=False,
551 )
552 else:
553 raise
554 is_confidential_app = bool(
555 isinstance(self, ConfidentialClientApplication) or self.client_credential)
556 if is_confidential_app and allow_broker:
557 raise ValueError("allow_broker=True is only supported in PublicClientApplication")
558 self._enable_broker = False
559 if (allow_broker and not is_confidential_app
560 and sys.platform == "win32"
561 and not self.authority.is_adfs and not self.authority._is_b2c):
562 try:
563 from . import broker # Trigger Broker's initialization
564 self._enable_broker = True
565 except RuntimeError:
566 logger.exception(
567 "Broker is unavailable on this platform. "
568 "We will fallback to non-broker.")
569 logger.debug("Broker enabled? %s", self._enable_broker)
571 self.token_cache = token_cache or TokenCache()
572 self._region_configured = azure_region
573 self._region_detected = None
574 self.client, self._regional_client = self._build_client(
575 client_credential, self.authority)
576 self.authority_groups = None
577 self._telemetry_buffer = {}
578 self._telemetry_lock = Lock()
580 def _decorate_scope(
581 self, scopes,
582 reserved_scope=frozenset(['openid', 'profile', 'offline_access'])):
583 if not isinstance(scopes, (list, set, tuple)):
584 raise ValueError("The input scopes should be a list, tuple, or set")
585 scope_set = set(scopes) # Input scopes is typically a list. Copy it to a set.
586 if scope_set & reserved_scope:
587 # These scopes are reserved for the API to provide good experience.
588 # We could make the developer pass these and then if they do they will
589 # come back asking why they don't see refresh token or user information.
590 raise ValueError(
591 "API does not accept {} value as user-provided scopes".format(
592 reserved_scope))
594 # client_id can also be used as a scope in B2C
595 decorated = scope_set | reserved_scope
596 decorated -= self._exclude_scopes
597 return list(decorated)
599 def _build_telemetry_context(
600 self, api_id, correlation_id=None, refresh_reason=None):
601 return msal.telemetry._TelemetryContext(
602 self._telemetry_buffer, self._telemetry_lock, api_id,
603 correlation_id=correlation_id, refresh_reason=refresh_reason)
605 def _get_regional_authority(self, central_authority):
606 self._region_detected = self._region_detected or _detect_region(
607 self.http_client if self._region_configured is not None else None)
608 if (self._region_configured != self.ATTEMPT_REGION_DISCOVERY
609 and self._region_configured != self._region_detected):
610 logger.warning('Region configured ({}) != region detected ({})'.format(
611 repr(self._region_configured), repr(self._region_detected)))
612 region_to_use = (
613 self._region_detected
614 if self._region_configured == self.ATTEMPT_REGION_DISCOVERY
615 else self._region_configured) # It will retain the None i.e. opted out
616 logger.debug('Region to be used: {}'.format(repr(region_to_use)))
617 if region_to_use:
618 regional_host = ("{}.login.microsoft.com".format(region_to_use)
619 if central_authority.instance in (
620 # The list came from point 3 of the algorithm section in this internal doc
621 # https://identitydivision.visualstudio.com/DevEx/_git/AuthLibrariesApiReview?path=/PinAuthToRegion/AAD%20SDK%20Proposal%20to%20Pin%20Auth%20to%20region.md&anchor=algorithm&_a=preview
622 "login.microsoftonline.com",
623 "login.microsoft.com",
624 "login.windows.net",
625 "sts.windows.net",
626 )
627 else "{}.{}".format(region_to_use, central_authority.instance))
628 return Authority( # The central_authority has already been validated
629 "https://{}/{}".format(regional_host, central_authority.tenant),
630 self.http_client,
631 instance_discovery=False,
632 )
633 return None
635 def _build_client(self, client_credential, authority, skip_regional_client=False):
636 client_assertion = None
637 client_assertion_type = None
638 default_headers = {
639 "x-client-sku": "MSAL.Python", "x-client-ver": __version__,
640 "x-client-os": sys.platform,
641 "x-client-cpu": "x64" if sys.maxsize > 2 ** 32 else "x86",
642 "x-ms-lib-capability": "retry-after, h429",
643 }
644 if self.app_name:
645 default_headers['x-app-name'] = self.app_name
646 if self.app_version:
647 default_headers['x-app-ver'] = self.app_version
648 default_body = {"client_info": 1}
649 if isinstance(client_credential, dict):
650 assert (("private_key" in client_credential
651 and "thumbprint" in client_credential) or
652 "client_assertion" in client_credential)
653 client_assertion_type = Client.CLIENT_ASSERTION_TYPE_JWT
654 if 'client_assertion' in client_credential:
655 client_assertion = client_credential['client_assertion']
656 else:
657 headers = {}
658 if 'public_certificate' in client_credential:
659 headers["x5c"] = extract_certs(client_credential['public_certificate'])
660 if not client_credential.get("passphrase"):
661 unencrypted_private_key = client_credential['private_key']
662 else:
663 from cryptography.hazmat.primitives import serialization
664 from cryptography.hazmat.backends import default_backend
665 unencrypted_private_key = serialization.load_pem_private_key(
666 _str2bytes(client_credential["private_key"]),
667 _str2bytes(client_credential["passphrase"]),
668 backend=default_backend(), # It was a required param until 2020
669 )
670 assertion = JwtAssertionCreator(
671 unencrypted_private_key, algorithm="RS256",
672 sha1_thumbprint=client_credential.get("thumbprint"), headers=headers)
673 client_assertion = assertion.create_regenerative_assertion(
674 audience=authority.token_endpoint, issuer=self.client_id,
675 additional_claims=self.client_claims or {})
676 else:
677 default_body['client_secret'] = client_credential
678 central_configuration = {
679 "authorization_endpoint": authority.authorization_endpoint,
680 "token_endpoint": authority.token_endpoint,
681 "device_authorization_endpoint":
682 authority.device_authorization_endpoint or
683 urljoin(authority.token_endpoint, "devicecode"),
684 }
685 central_client = _ClientWithCcsRoutingInfo(
686 central_configuration,
687 self.client_id,
688 http_client=self.http_client,
689 default_headers=default_headers,
690 default_body=default_body,
691 client_assertion=client_assertion,
692 client_assertion_type=client_assertion_type,
693 on_obtaining_tokens=lambda event: self.token_cache.add(dict(
694 event, environment=authority.instance)),
695 on_removing_rt=self.token_cache.remove_rt,
696 on_updating_rt=self.token_cache.update_rt)
698 regional_client = None
699 if (client_credential # Currently regional endpoint only serves some CCA flows
700 and not skip_regional_client):
701 regional_authority = self._get_regional_authority(authority)
702 if regional_authority:
703 regional_configuration = {
704 "authorization_endpoint": regional_authority.authorization_endpoint,
705 "token_endpoint": regional_authority.token_endpoint,
706 "device_authorization_endpoint":
707 regional_authority.device_authorization_endpoint or
708 urljoin(regional_authority.token_endpoint, "devicecode"),
709 }
710 regional_client = _ClientWithCcsRoutingInfo(
711 regional_configuration,
712 self.client_id,
713 http_client=self.http_client,
714 default_headers=default_headers,
715 default_body=default_body,
716 client_assertion=client_assertion,
717 client_assertion_type=client_assertion_type,
718 on_obtaining_tokens=lambda event: self.token_cache.add(dict(
719 event, environment=authority.instance)),
720 on_removing_rt=self.token_cache.remove_rt,
721 on_updating_rt=self.token_cache.update_rt)
722 return central_client, regional_client
724 def initiate_auth_code_flow(
725 self,
726 scopes, # type: list[str]
727 redirect_uri=None,
728 state=None, # Recommended by OAuth2 for CSRF protection
729 prompt=None,
730 login_hint=None, # type: Optional[str]
731 domain_hint=None, # type: Optional[str]
732 claims_challenge=None,
733 max_age=None,
734 response_mode=None, # type: Optional[str]
735 ):
736 """Initiate an auth code flow.
738 Later when the response reaches your redirect_uri,
739 you can use :func:`~acquire_token_by_auth_code_flow()`
740 to complete the authentication/authorization.
742 :param list scopes:
743 It is a list of case-sensitive strings.
744 :param str redirect_uri:
745 Optional. If not specified, server will use the pre-registered one.
746 :param str state:
747 An opaque value used by the client to
748 maintain state between the request and callback.
749 If absent, this library will automatically generate one internally.
750 :param str prompt:
751 By default, no prompt value will be sent, not even "none".
752 You will have to specify a value explicitly.
753 Its valid values are defined in Open ID Connect specs
754 https://openid.net/specs/openid-connect-core-1_0.html#AuthRequest
755 :param str login_hint:
756 Optional. Identifier of the user. Generally a User Principal Name (UPN).
757 :param domain_hint:
758 Can be one of "consumers" or "organizations" or your tenant domain "contoso.com".
759 If included, it will skip the email-based discovery process that user goes
760 through on the sign-in page, leading to a slightly more streamlined user experience.
761 More information on possible values
762 `here <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-auth-code-flow#request-an-authorization-code>`_ and
763 `here <https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-oapx/86fb452d-e34a-494e-ac61-e526e263b6d8>`_.
765 :param int max_age:
766 OPTIONAL. Maximum Authentication Age.
767 Specifies the allowable elapsed time in seconds
768 since the last time the End-User was actively authenticated.
769 If the elapsed time is greater than this value,
770 Microsoft identity platform will actively re-authenticate the End-User.
772 MSAL Python will also automatically validate the auth_time in ID token.
774 New in version 1.15.
776 :param str response_mode:
777 OPTIONAL. Specifies the method with which response parameters should be returned.
778 The default value is equivalent to ``query``, which is still secure enough in MSAL Python
779 (because MSAL Python does not transfer tokens via query parameter in the first place).
780 For even better security, we recommend using the value ``form_post``.
781 In "form_post" mode, response parameters
782 will be encoded as HTML form values that are transmitted via the HTTP POST method and
783 encoded in the body using the application/x-www-form-urlencoded format.
784 Valid values can be either "form_post" for HTTP POST to callback URI or
785 "query" (the default) for HTTP GET with parameters encoded in query string.
786 More information on possible values
787 `here <https://openid.net/specs/oauth-v2-multiple-response-types-1_0.html#ResponseModes>`
788 and `here <https://openid.net/specs/oauth-v2-form-post-response-mode-1_0.html#FormPostResponseMode>`
790 :return:
791 The auth code flow. It is a dict in this form::
793 {
794 "auth_uri": "https://...", // Guide user to visit this
795 "state": "...", // You may choose to verify it by yourself,
796 // or just let acquire_token_by_auth_code_flow()
797 // do that for you.
798 "...": "...", // Everything else are reserved and internal
799 }
801 The caller is expected to::
803 1. somehow store this content, typically inside the current session,
804 2. guide the end user (i.e. resource owner) to visit that auth_uri,
805 3. and then relay this dict and subsequent auth response to
806 :func:`~acquire_token_by_auth_code_flow()`.
807 """
808 client = _ClientWithCcsRoutingInfo(
809 {"authorization_endpoint": self.authority.authorization_endpoint},
810 self.client_id,
811 http_client=self.http_client)
812 flow = client.initiate_auth_code_flow(
813 redirect_uri=redirect_uri, state=state, login_hint=login_hint,
814 prompt=prompt,
815 scope=self._decorate_scope(scopes),
816 domain_hint=domain_hint,
817 claims=_merge_claims_challenge_and_capabilities(
818 self._client_capabilities, claims_challenge),
819 max_age=max_age,
820 response_mode=response_mode,
821 )
822 flow["claims_challenge"] = claims_challenge
823 return flow
825 def get_authorization_request_url(
826 self,
827 scopes, # type: list[str]
828 login_hint=None, # type: Optional[str]
829 state=None, # Recommended by OAuth2 for CSRF protection
830 redirect_uri=None,
831 response_type="code", # Could be "token" if you use Implicit Grant
832 prompt=None,
833 nonce=None,
834 domain_hint=None, # type: Optional[str]
835 claims_challenge=None,
836 **kwargs):
837 """Constructs a URL for you to start a Authorization Code Grant.
839 :param list[str] scopes: (Required)
840 Scopes requested to access a protected API (a resource).
841 :param str state: Recommended by OAuth2 for CSRF protection.
842 :param str login_hint:
843 Identifier of the user. Generally a User Principal Name (UPN).
844 :param str redirect_uri:
845 Address to return to upon receiving a response from the authority.
846 :param str response_type:
847 Default value is "code" for an OAuth2 Authorization Code grant.
849 You could use other content such as "id_token" or "token",
850 which would trigger an Implicit Grant, but that is
851 `not recommended <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-implicit-grant-flow#is-the-implicit-grant-suitable-for-my-app>`_.
853 :param str prompt:
854 By default, no prompt value will be sent, not even "none".
855 You will have to specify a value explicitly.
856 Its valid values are defined in Open ID Connect specs
857 https://openid.net/specs/openid-connect-core-1_0.html#AuthRequest
858 :param nonce:
859 A cryptographically random value used to mitigate replay attacks. See also
860 `OIDC specs <https://openid.net/specs/openid-connect-core-1_0.html#AuthRequest>`_.
861 :param domain_hint:
862 Can be one of "consumers" or "organizations" or your tenant domain "contoso.com".
863 If included, it will skip the email-based discovery process that user goes
864 through on the sign-in page, leading to a slightly more streamlined user experience.
865 More information on possible values
866 `here <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-auth-code-flow#request-an-authorization-code>`_ and
867 `here <https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-oapx/86fb452d-e34a-494e-ac61-e526e263b6d8>`_.
868 :param claims_challenge:
869 The claims_challenge parameter requests specific claims requested by the resource provider
870 in the form of a claims_challenge directive in the www-authenticate header to be
871 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
872 It is a string of a JSON object which contains lists of claims being requested from these locations.
874 :return: The authorization url as a string.
875 """
876 authority = kwargs.pop("authority", None) # Historically we support this
877 if authority:
878 warnings.warn(
879 "We haven't decided if this method will accept authority parameter")
880 # The previous implementation is, it will use self.authority by default.
881 # Multi-tenant app can use new authority on demand
882 the_authority = Authority(
883 authority,
884 self.http_client,
885 instance_discovery=self._instance_discovery,
886 ) if authority else self.authority
888 client = _ClientWithCcsRoutingInfo(
889 {"authorization_endpoint": the_authority.authorization_endpoint},
890 self.client_id,
891 http_client=self.http_client)
892 warnings.warn(
893 "Change your get_authorization_request_url() "
894 "to initiate_auth_code_flow()", DeprecationWarning)
895 with warnings.catch_warnings(record=True):
896 return client.build_auth_request_uri(
897 response_type=response_type,
898 redirect_uri=redirect_uri, state=state, login_hint=login_hint,
899 prompt=prompt,
900 scope=self._decorate_scope(scopes),
901 nonce=nonce,
902 domain_hint=domain_hint,
903 claims=_merge_claims_challenge_and_capabilities(
904 self._client_capabilities, claims_challenge),
905 )
907 def acquire_token_by_auth_code_flow(
908 self, auth_code_flow, auth_response, scopes=None, **kwargs):
909 """Validate the auth response being redirected back, and obtain tokens.
911 It automatically provides nonce protection.
913 :param dict auth_code_flow:
914 The same dict returned by :func:`~initiate_auth_code_flow()`.
915 :param dict auth_response:
916 A dict of the query string received from auth server.
917 :param list[str] scopes:
918 Scopes requested to access a protected API (a resource).
920 Most of the time, you can leave it empty.
922 If you requested user consent for multiple resources, here you will
923 need to provide a subset of what you required in
924 :func:`~initiate_auth_code_flow()`.
926 OAuth2 was designed mostly for singleton services,
927 where tokens are always meant for the same resource and the only
928 changes are in the scopes.
929 In AAD, tokens can be issued for multiple 3rd party resources.
930 You can ask authorization code for multiple resources,
931 but when you redeem it, the token is for only one intended
932 recipient, called audience.
933 So the developer need to specify a scope so that we can restrict the
934 token to be issued for the corresponding audience.
936 :return:
937 * A dict containing "access_token" and/or "id_token", among others,
938 depends on what scope was used.
939 (See https://tools.ietf.org/html/rfc6749#section-5.1)
940 * A dict containing "error", optionally "error_description", "error_uri".
941 (It is either `this <https://tools.ietf.org/html/rfc6749#section-4.1.2.1>`_
942 or `that <https://tools.ietf.org/html/rfc6749#section-5.2>`_)
943 * Most client-side data error would result in ValueError exception.
944 So the usage pattern could be without any protocol details::
946 def authorize(): # A controller in a web app
947 try:
948 result = msal_app.acquire_token_by_auth_code_flow(
949 session.get("flow", {}), request.args)
950 if "error" in result:
951 return render_template("error.html", result)
952 use(result) # Token(s) are available in result and cache
953 except ValueError: # Usually caused by CSRF
954 pass # Simply ignore them
955 return redirect(url_for("index"))
956 """
957 self._validate_ssh_cert_input_data(kwargs.get("data", {}))
958 telemetry_context = self._build_telemetry_context(
959 self.ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID)
960 response =_clean_up(self.client.obtain_token_by_auth_code_flow(
961 auth_code_flow,
962 auth_response,
963 scope=self._decorate_scope(scopes) if scopes else None,
964 headers=telemetry_context.generate_headers(),
965 data=dict(
966 kwargs.pop("data", {}),
967 claims=_merge_claims_challenge_and_capabilities(
968 self._client_capabilities,
969 auth_code_flow.pop("claims_challenge", None))),
970 **kwargs))
971 telemetry_context.update_telemetry(response)
972 return response
974 def acquire_token_by_authorization_code(
975 self,
976 code,
977 scopes, # Syntactically required. STS accepts empty value though.
978 redirect_uri=None,
979 # REQUIRED, if the "redirect_uri" parameter was included in the
980 # authorization request as described in Section 4.1.1, and their
981 # values MUST be identical.
982 nonce=None,
983 claims_challenge=None,
984 **kwargs):
985 """The second half of the Authorization Code Grant.
987 :param code: The authorization code returned from Authorization Server.
988 :param list[str] scopes: (Required)
989 Scopes requested to access a protected API (a resource).
991 If you requested user consent for multiple resources, here you will
992 typically want to provide a subset of what you required in AuthCode.
994 OAuth2 was designed mostly for singleton services,
995 where tokens are always meant for the same resource and the only
996 changes are in the scopes.
997 In AAD, tokens can be issued for multiple 3rd party resources.
998 You can ask authorization code for multiple resources,
999 but when you redeem it, the token is for only one intended
1000 recipient, called audience.
1001 So the developer need to specify a scope so that we can restrict the
1002 token to be issued for the corresponding audience.
1004 :param nonce:
1005 If you provided a nonce when calling :func:`get_authorization_request_url`,
1006 same nonce should also be provided here, so that we'll validate it.
1007 An exception will be raised if the nonce in id token mismatches.
1009 :param claims_challenge:
1010 The claims_challenge parameter requests specific claims requested by the resource provider
1011 in the form of a claims_challenge directive in the www-authenticate header to be
1012 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
1013 It is a string of a JSON object which contains lists of claims being requested from these locations.
1015 :return: A dict representing the json response from AAD:
1017 - A successful response would contain "access_token" key,
1018 - an error response would contain "error" and usually "error_description".
1019 """
1020 # If scope is absent on the wire, STS will give you a token associated
1021 # to the FIRST scope sent during the authorization request.
1022 # So in theory, you can omit scope here when you were working with only
1023 # one scope. But, MSAL decorates your scope anyway, so they are never
1024 # really empty.
1025 assert isinstance(scopes, list), "Invalid parameter type"
1026 self._validate_ssh_cert_input_data(kwargs.get("data", {}))
1027 warnings.warn(
1028 "Change your acquire_token_by_authorization_code() "
1029 "to acquire_token_by_auth_code_flow()", DeprecationWarning)
1030 with warnings.catch_warnings(record=True):
1031 telemetry_context = self._build_telemetry_context(
1032 self.ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID)
1033 response = _clean_up(self.client.obtain_token_by_authorization_code(
1034 code, redirect_uri=redirect_uri,
1035 scope=self._decorate_scope(scopes),
1036 headers=telemetry_context.generate_headers(),
1037 data=dict(
1038 kwargs.pop("data", {}),
1039 claims=_merge_claims_challenge_and_capabilities(
1040 self._client_capabilities, claims_challenge)),
1041 nonce=nonce,
1042 **kwargs))
1043 telemetry_context.update_telemetry(response)
1044 return response
1046 def get_accounts(self, username=None):
1047 """Get a list of accounts which previously signed in, i.e. exists in cache.
1049 An account can later be used in :func:`~acquire_token_silent`
1050 to find its tokens.
1052 :param username:
1053 Filter accounts with this username only. Case insensitive.
1054 :return: A list of account objects.
1055 Each account is a dict. For now, we only document its "username" field.
1056 Your app can choose to display those information to end user,
1057 and allow user to choose one of his/her accounts to proceed.
1058 """
1059 accounts = self._find_msal_accounts(environment=self.authority.instance)
1060 if not accounts: # Now try other aliases of this authority instance
1061 for alias in self._get_authority_aliases(self.authority.instance):
1062 accounts = self._find_msal_accounts(environment=alias)
1063 if accounts:
1064 break
1065 if username:
1066 # Federated account["username"] from AAD could contain mixed case
1067 lowercase_username = username.lower()
1068 accounts = [a for a in accounts
1069 if a["username"].lower() == lowercase_username]
1070 if not accounts:
1071 logger.debug(( # This would also happen when the cache is empty
1072 "get_accounts(username='{}') finds no account. "
1073 "If tokens were acquired without 'profile' scope, "
1074 "they would contain no username for filtering. "
1075 "Consider calling get_accounts(username=None) instead."
1076 ).format(username))
1077 # Does not further filter by existing RTs here. It probably won't matter.
1078 # Because in most cases Accounts and RTs co-exist.
1079 # Even in the rare case when an RT is revoked and then removed,
1080 # acquire_token_silent() would then yield no result,
1081 # apps would fall back to other acquire methods. This is the standard pattern.
1082 return accounts
1084 def _find_msal_accounts(self, environment):
1085 interested_authority_types = [
1086 TokenCache.AuthorityType.ADFS, TokenCache.AuthorityType.MSSTS]
1087 if _is_running_in_cloud_shell():
1088 interested_authority_types.append(_AUTHORITY_TYPE_CLOUDSHELL)
1089 grouped_accounts = {
1090 a.get("home_account_id"): # Grouped by home tenant's id
1091 { # These are minimal amount of non-tenant-specific account info
1092 "home_account_id": a.get("home_account_id"),
1093 "environment": a.get("environment"),
1094 "username": a.get("username"),
1096 # The following fields for backward compatibility, for now
1097 "authority_type": a.get("authority_type"),
1098 "local_account_id": a.get("local_account_id"), # Tenant-specific
1099 "realm": a.get("realm"), # Tenant-specific
1100 }
1101 for a in self.token_cache.find(
1102 TokenCache.CredentialType.ACCOUNT,
1103 query={"environment": environment})
1104 if a["authority_type"] in interested_authority_types
1105 }
1106 return list(grouped_accounts.values())
1108 def _get_instance_metadata(self): # This exists so it can be mocked in unit test
1109 resp = self.http_client.get(
1110 "https://login.microsoftonline.com/common/discovery/instance?api-version=1.1&authorization_endpoint=https://login.microsoftonline.com/common/oauth2/authorize", # TBD: We may extend this to use self._instance_discovery endpoint
1111 headers={'Accept': 'application/json'})
1112 resp.raise_for_status()
1113 return json.loads(resp.text)['metadata']
1115 def _get_authority_aliases(self, instance):
1116 if self._instance_discovery is False:
1117 return []
1118 if self.authority._is_known_to_developer:
1119 # Then it is an ADFS/B2C/known_authority_hosts situation
1120 # which may not reach the central endpoint, so we skip it.
1121 return []
1122 if not self.authority_groups:
1123 self.authority_groups = [
1124 set(group['aliases']) for group in self._get_instance_metadata()]
1125 for group in self.authority_groups:
1126 if instance in group:
1127 return [alias for alias in group if alias != instance]
1128 return []
1130 def remove_account(self, account):
1131 """Sign me out and forget me from token cache"""
1132 self._forget_me(account)
1133 if self._enable_broker:
1134 from .broker import _signout_silently
1135 error = _signout_silently(self.client_id, account["local_account_id"])
1136 if error:
1137 logger.debug("_signout_silently() returns error: %s", error)
1139 def _sign_out(self, home_account):
1140 # Remove all relevant RTs and ATs from token cache
1141 owned_by_home_account = {
1142 "environment": home_account["environment"],
1143 "home_account_id": home_account["home_account_id"],} # realm-independent
1144 app_metadata = self._get_app_metadata(home_account["environment"])
1145 # Remove RTs/FRTs, and they are realm-independent
1146 for rt in [rt for rt in self.token_cache.find(
1147 TokenCache.CredentialType.REFRESH_TOKEN, query=owned_by_home_account)
1148 # Do RT's app ownership check as a precaution, in case family apps
1149 # and 3rd-party apps share same token cache, although they should not.
1150 if rt["client_id"] == self.client_id or (
1151 app_metadata.get("family_id") # Now let's settle family business
1152 and rt.get("family_id") == app_metadata["family_id"])
1153 ]:
1154 self.token_cache.remove_rt(rt)
1155 for at in self.token_cache.find( # Remove ATs
1156 # Regardless of realm, b/c we've removed realm-independent RTs anyway
1157 TokenCache.CredentialType.ACCESS_TOKEN, query=owned_by_home_account):
1158 # To avoid the complexity of locating sibling family app's AT,
1159 # we skip AT's app ownership check.
1160 # It means ATs for other apps will also be removed, it is OK because:
1161 # * non-family apps are not supposed to share token cache to begin with;
1162 # * Even if it happens, we keep other app's RT already, so SSO still works
1163 self.token_cache.remove_at(at)
1165 def _forget_me(self, home_account):
1166 # It implies signout, and then also remove all relevant accounts and IDTs
1167 self._sign_out(home_account)
1168 owned_by_home_account = {
1169 "environment": home_account["environment"],
1170 "home_account_id": home_account["home_account_id"],} # realm-independent
1171 for idt in self.token_cache.find( # Remove IDTs, regardless of realm
1172 TokenCache.CredentialType.ID_TOKEN, query=owned_by_home_account):
1173 self.token_cache.remove_idt(idt)
1174 for a in self.token_cache.find( # Remove Accounts, regardless of realm
1175 TokenCache.CredentialType.ACCOUNT, query=owned_by_home_account):
1176 self.token_cache.remove_account(a)
1178 def _acquire_token_by_cloud_shell(self, scopes, data=None):
1179 from .cloudshell import _obtain_token
1180 response = _obtain_token(
1181 self.http_client, scopes, client_id=self.client_id, data=data)
1182 if "error" not in response:
1183 self.token_cache.add(dict(
1184 client_id=self.client_id,
1185 scope=response["scope"].split() if "scope" in response else scopes,
1186 token_endpoint=self.authority.token_endpoint,
1187 response=response,
1188 data=data or {},
1189 authority_type=_AUTHORITY_TYPE_CLOUDSHELL,
1190 ))
1191 return response
1193 def acquire_token_silent(
1194 self,
1195 scopes, # type: List[str]
1196 account, # type: Optional[Account]
1197 authority=None, # See get_authorization_request_url()
1198 force_refresh=False, # type: Optional[boolean]
1199 claims_challenge=None,
1200 **kwargs):
1201 """Acquire an access token for given account, without user interaction.
1203 It is done either by finding a valid access token from cache,
1204 or by finding a valid refresh token from cache and then automatically
1205 use it to redeem a new access token.
1207 This method will combine the cache empty and refresh error
1208 into one return value, `None`.
1209 If your app does not care about the exact token refresh error during
1210 token cache look-up, then this method is easier and recommended.
1212 Internally, this method calls :func:`~acquire_token_silent_with_error`.
1214 :param claims_challenge:
1215 The claims_challenge parameter requests specific claims requested by the resource provider
1216 in the form of a claims_challenge directive in the www-authenticate header to be
1217 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
1218 It is a string of a JSON object which contains lists of claims being requested from these locations.
1220 :return:
1221 - A dict containing no "error" key,
1222 and typically contains an "access_token" key,
1223 if cache lookup succeeded.
1224 - None when cache lookup does not yield a token.
1225 """
1226 result = self.acquire_token_silent_with_error(
1227 scopes, account, authority=authority, force_refresh=force_refresh,
1228 claims_challenge=claims_challenge, **kwargs)
1229 return result if result and "error" not in result else None
1231 def acquire_token_silent_with_error(
1232 self,
1233 scopes, # type: List[str]
1234 account, # type: Optional[Account]
1235 authority=None, # See get_authorization_request_url()
1236 force_refresh=False, # type: Optional[boolean]
1237 claims_challenge=None,
1238 **kwargs):
1239 """Acquire an access token for given account, without user interaction.
1241 It is done either by finding a valid access token from cache,
1242 or by finding a valid refresh token from cache and then automatically
1243 use it to redeem a new access token.
1245 This method will differentiate cache empty from token refresh error.
1246 If your app cares the exact token refresh error during
1247 token cache look-up, then this method is suitable.
1248 Otherwise, the other method :func:`~acquire_token_silent` is recommended.
1250 :param list[str] scopes: (Required)
1251 Scopes requested to access a protected API (a resource).
1252 :param account:
1253 one of the account object returned by :func:`~get_accounts`,
1254 or use None when you want to find an access token for this client.
1255 :param force_refresh:
1256 If True, it will skip Access Token look-up,
1257 and try to find a Refresh Token to obtain a new Access Token.
1258 :param claims_challenge:
1259 The claims_challenge parameter requests specific claims requested by the resource provider
1260 in the form of a claims_challenge directive in the www-authenticate header to be
1261 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
1262 It is a string of a JSON object which contains lists of claims being requested from these locations.
1263 :return:
1264 - A dict containing no "error" key,
1265 and typically contains an "access_token" key,
1266 if cache lookup succeeded.
1267 - None when there is simply no token in the cache.
1268 - A dict containing an "error" key, when token refresh failed.
1269 """
1270 assert isinstance(scopes, list), "Invalid parameter type"
1271 self._validate_ssh_cert_input_data(kwargs.get("data", {}))
1272 correlation_id = msal.telemetry._get_new_correlation_id()
1273 if authority:
1274 warnings.warn("We haven't decided how/if this method will accept authority parameter")
1275 # the_authority = Authority(
1276 # authority,
1277 # self.http_client,
1278 # instance_discovery=self._instance_discovery,
1279 # ) if authority else self.authority
1280 result = self._acquire_token_silent_from_cache_and_possibly_refresh_it(
1281 scopes, account, self.authority, force_refresh=force_refresh,
1282 claims_challenge=claims_challenge,
1283 correlation_id=correlation_id,
1284 **kwargs)
1285 if result and "error" not in result:
1286 return result
1287 final_result = result
1288 for alias in self._get_authority_aliases(self.authority.instance):
1289 if not self.token_cache.find(
1290 self.token_cache.CredentialType.REFRESH_TOKEN,
1291 # target=scopes, # MUST NOT filter by scopes, because:
1292 # 1. AAD RTs are scope-independent;
1293 # 2. therefore target is optional per schema;
1294 query={"environment": alias}):
1295 # Skip heavy weight logic when RT for this alias doesn't exist
1296 continue
1297 the_authority = Authority(
1298 "https://" + alias + "/" + self.authority.tenant,
1299 self.http_client,
1300 instance_discovery=False,
1301 )
1302 result = self._acquire_token_silent_from_cache_and_possibly_refresh_it(
1303 scopes, account, the_authority, force_refresh=force_refresh,
1304 claims_challenge=claims_challenge,
1305 correlation_id=correlation_id,
1306 **kwargs)
1307 if result:
1308 if "error" not in result:
1309 return result
1310 final_result = result
1311 if final_result and final_result.get("suberror"):
1312 final_result["classification"] = { # Suppress these suberrors, per #57
1313 "bad_token": "",
1314 "token_expired": "",
1315 "protection_policy_required": "",
1316 "client_mismatch": "",
1317 "device_authentication_failed": "",
1318 }.get(final_result["suberror"], final_result["suberror"])
1319 return final_result
1321 def _acquire_token_silent_from_cache_and_possibly_refresh_it(
1322 self,
1323 scopes, # type: List[str]
1324 account, # type: Optional[Account]
1325 authority, # This can be different than self.authority
1326 force_refresh=False, # type: Optional[boolean]
1327 claims_challenge=None,
1328 correlation_id=None,
1329 **kwargs):
1330 access_token_from_cache = None
1331 if not (force_refresh or claims_challenge): # Bypass AT when desired or using claims
1332 query={
1333 "client_id": self.client_id,
1334 "environment": authority.instance,
1335 "realm": authority.tenant,
1336 "home_account_id": (account or {}).get("home_account_id"),
1337 }
1338 key_id = kwargs.get("data", {}).get("key_id")
1339 if key_id: # Some token types (SSH-certs, POP) are bound to a key
1340 query["key_id"] = key_id
1341 matches = self.token_cache.find(
1342 self.token_cache.CredentialType.ACCESS_TOKEN,
1343 target=scopes,
1344 query=query)
1345 now = time.time()
1346 refresh_reason = msal.telemetry.AT_ABSENT
1347 for entry in matches:
1348 expires_in = int(entry["expires_on"]) - now
1349 if expires_in < 5*60: # Then consider it expired
1350 refresh_reason = msal.telemetry.AT_EXPIRED
1351 continue # Removal is not necessary, it will be overwritten
1352 logger.debug("Cache hit an AT")
1353 access_token_from_cache = { # Mimic a real response
1354 "access_token": entry["secret"],
1355 "token_type": entry.get("token_type", "Bearer"),
1356 "expires_in": int(expires_in), # OAuth2 specs defines it as int
1357 }
1358 if "refresh_on" in entry and int(entry["refresh_on"]) < now: # aging
1359 refresh_reason = msal.telemetry.AT_AGING
1360 break # With a fallback in hand, we break here to go refresh
1361 self._build_telemetry_context(-1).hit_an_access_token()
1362 return access_token_from_cache # It is still good as new
1363 else:
1364 refresh_reason = msal.telemetry.FORCE_REFRESH # TODO: It could also mean claims_challenge
1365 assert refresh_reason, "It should have been established at this point"
1366 try:
1367 data = kwargs.get("data", {})
1368 if account and account.get("authority_type") == _AUTHORITY_TYPE_CLOUDSHELL:
1369 return self._acquire_token_by_cloud_shell(scopes, data=data)
1371 if self._enable_broker and account is not None:
1372 from .broker import _acquire_token_silently
1373 response = _acquire_token_silently(
1374 "https://{}/{}".format(self.authority.instance, self.authority.tenant),
1375 self.client_id,
1376 account["local_account_id"],
1377 scopes,
1378 claims=_merge_claims_challenge_and_capabilities(
1379 self._client_capabilities, claims_challenge),
1380 correlation_id=correlation_id,
1381 **data)
1382 if response: # The broker provided a decisive outcome, so we use it
1383 return self._process_broker_response(response, scopes, data)
1385 result = _clean_up(self._acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family(
1386 authority, self._decorate_scope(scopes), account,
1387 refresh_reason=refresh_reason, claims_challenge=claims_challenge,
1388 correlation_id=correlation_id,
1389 **kwargs))
1390 if (result and "error" not in result) or (not access_token_from_cache):
1391 return result
1392 except: # The exact HTTP exception is transportation-layer dependent
1393 # Typically network error. Potential AAD outage?
1394 if not access_token_from_cache: # It means there is no fall back option
1395 raise # We choose to bubble up the exception
1396 return access_token_from_cache
1398 def _process_broker_response(self, response, scopes, data):
1399 if "error" not in response:
1400 self.token_cache.add(dict(
1401 client_id=self.client_id,
1402 scope=response["scope"].split() if "scope" in response else scopes,
1403 token_endpoint=self.authority.token_endpoint,
1404 response=response,
1405 data=data,
1406 _account_id=response["_account_id"],
1407 ))
1408 return _clean_up(response)
1410 def _acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family(
1411 self, authority, scopes, account, **kwargs):
1412 query = {
1413 "environment": authority.instance,
1414 "home_account_id": (account or {}).get("home_account_id"),
1415 # "realm": authority.tenant, # AAD RTs are tenant-independent
1416 }
1417 app_metadata = self._get_app_metadata(authority.instance)
1418 if not app_metadata: # Meaning this app is now used for the first time.
1419 # When/if we have a way to directly detect current app's family,
1420 # we'll rewrite this block, to support multiple families.
1421 # For now, we try existing RTs (*). If it works, we are in that family.
1422 # (*) RTs of a different app/family are not supposed to be
1423 # shared with or accessible by us in the first place.
1424 at = self._acquire_token_silent_by_finding_specific_refresh_token(
1425 authority, scopes,
1426 dict(query, family_id="1"), # A hack, we have only 1 family for now
1427 rt_remover=lambda rt_item: None, # NO-OP b/c RTs are likely not mine
1428 break_condition=lambda response: # Break loop when app not in family
1429 # Based on an AAD-only behavior mentioned in internal doc here
1430 # https://msazure.visualstudio.com/One/_git/ESTS-Docs/pullrequest/1138595
1431 "client_mismatch" in response.get("error_additional_info", []),
1432 **kwargs)
1433 if at and "error" not in at:
1434 return at
1435 last_resp = None
1436 if app_metadata.get("family_id"): # Meaning this app belongs to this family
1437 last_resp = at = self._acquire_token_silent_by_finding_specific_refresh_token(
1438 authority, scopes, dict(query, family_id=app_metadata["family_id"]),
1439 **kwargs)
1440 if at and "error" not in at:
1441 return at
1442 # Either this app is an orphan, so we will naturally use its own RT;
1443 # or all attempts above have failed, so we fall back to non-foci behavior.
1444 return self._acquire_token_silent_by_finding_specific_refresh_token(
1445 authority, scopes, dict(query, client_id=self.client_id),
1446 **kwargs) or last_resp
1448 def _get_app_metadata(self, environment):
1449 apps = self.token_cache.find( # Use find(), rather than token_cache.get(...)
1450 TokenCache.CredentialType.APP_METADATA, query={
1451 "environment": environment, "client_id": self.client_id})
1452 return apps[0] if apps else {}
1454 def _acquire_token_silent_by_finding_specific_refresh_token(
1455 self, authority, scopes, query,
1456 rt_remover=None, break_condition=lambda response: False,
1457 refresh_reason=None, correlation_id=None, claims_challenge=None,
1458 **kwargs):
1459 matches = self.token_cache.find(
1460 self.token_cache.CredentialType.REFRESH_TOKEN,
1461 # target=scopes, # AAD RTs are scope-independent
1462 query=query)
1463 logger.debug("Found %d RTs matching %s", len(matches), query)
1465 response = None # A distinguishable value to mean cache is empty
1466 if not matches: # Then exit early to avoid expensive operations
1467 return response
1468 client, _ = self._build_client(
1469 # Potentially expensive if building regional client
1470 self.client_credential, authority, skip_regional_client=True)
1471 telemetry_context = self._build_telemetry_context(
1472 self.ACQUIRE_TOKEN_SILENT_ID,
1473 correlation_id=correlation_id, refresh_reason=refresh_reason)
1474 for entry in sorted( # Since unfit RTs would not be aggressively removed,
1475 # we start from newer RTs which are more likely fit.
1476 matches,
1477 key=lambda e: int(e.get("last_modification_time", "0")),
1478 reverse=True):
1479 logger.debug("Cache attempts an RT")
1480 headers = telemetry_context.generate_headers()
1481 if query.get("home_account_id"): # Then use it as CCS Routing info
1482 headers["X-AnchorMailbox"] = "Oid:{}".format( # case-insensitive value
1483 query["home_account_id"].replace(".", "@"))
1484 response = client.obtain_token_by_refresh_token(
1485 entry, rt_getter=lambda token_item: token_item["secret"],
1486 on_removing_rt=lambda rt_item: None, # Disable RT removal,
1487 # because an invalid_grant could be caused by new MFA policy,
1488 # the RT could still be useful for other MFA-less scope or tenant
1489 on_obtaining_tokens=lambda event: self.token_cache.add(dict(
1490 event,
1491 environment=authority.instance,
1492 skip_account_creation=True, # To honor a concurrent remove_account()
1493 )),
1494 scope=scopes,
1495 headers=headers,
1496 data=dict(
1497 kwargs.pop("data", {}),
1498 claims=_merge_claims_challenge_and_capabilities(
1499 self._client_capabilities, claims_challenge)),
1500 **kwargs)
1501 telemetry_context.update_telemetry(response)
1502 if "error" not in response:
1503 return response
1504 logger.debug("Refresh failed. {error}: {error_description}".format(
1505 error=response.get("error"),
1506 error_description=response.get("error_description"),
1507 ))
1508 if break_condition(response):
1509 break
1510 return response # Returns the latest error (if any), or just None
1512 def _validate_ssh_cert_input_data(self, data):
1513 if data.get("token_type") == "ssh-cert":
1514 if not data.get("req_cnf"):
1515 raise ValueError(
1516 "When requesting an SSH certificate, "
1517 "you must include a string parameter named 'req_cnf' "
1518 "containing the public key in JWK format "
1519 "(https://tools.ietf.org/html/rfc7517).")
1520 if not data.get("key_id"):
1521 raise ValueError(
1522 "When requesting an SSH certificate, "
1523 "you must include a string parameter named 'key_id' "
1524 "which identifies the key in the 'req_cnf' argument.")
1526 def acquire_token_by_refresh_token(self, refresh_token, scopes, **kwargs):
1527 """Acquire token(s) based on a refresh token (RT) obtained from elsewhere.
1529 You use this method only when you have old RTs from elsewhere,
1530 and now you want to migrate them into MSAL.
1531 Calling this method results in new tokens automatically storing into MSAL.
1533 You do NOT need to use this method if you are already using MSAL.
1534 MSAL maintains RT automatically inside its token cache,
1535 and an access token can be retrieved
1536 when you call :func:`~acquire_token_silent`.
1538 :param str refresh_token: The old refresh token, as a string.
1540 :param list scopes:
1541 The scopes associate with this old RT.
1542 Each scope needs to be in the Microsoft identity platform (v2) format.
1543 See `Scopes not resources <https://docs.microsoft.com/en-us/azure/active-directory/develop/migrate-python-adal-msal#scopes-not-resources>`_.
1545 :return:
1546 * A dict contains "error" and some other keys, when error happened.
1547 * A dict contains no "error" key means migration was successful.
1548 """
1549 self._validate_ssh_cert_input_data(kwargs.get("data", {}))
1550 telemetry_context = self._build_telemetry_context(
1551 self.ACQUIRE_TOKEN_BY_REFRESH_TOKEN,
1552 refresh_reason=msal.telemetry.FORCE_REFRESH)
1553 response = _clean_up(self.client.obtain_token_by_refresh_token(
1554 refresh_token,
1555 scope=self._decorate_scope(scopes),
1556 headers=telemetry_context.generate_headers(),
1557 rt_getter=lambda rt: rt,
1558 on_updating_rt=False,
1559 on_removing_rt=lambda rt_item: None, # No OP
1560 **kwargs))
1561 telemetry_context.update_telemetry(response)
1562 return response
1564 def acquire_token_by_username_password(
1565 self, username, password, scopes, claims_challenge=None, **kwargs):
1566 """Gets a token for a given resource via user credentials.
1568 See this page for constraints of Username Password Flow.
1569 https://github.com/AzureAD/microsoft-authentication-library-for-python/wiki/Username-Password-Authentication
1571 :param str username: Typically a UPN in the form of an email address.
1572 :param str password: The password.
1573 :param list[str] scopes:
1574 Scopes requested to access a protected API (a resource).
1575 :param claims_challenge:
1576 The claims_challenge parameter requests specific claims requested by the resource provider
1577 in the form of a claims_challenge directive in the www-authenticate header to be
1578 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
1579 It is a string of a JSON object which contains lists of claims being requested from these locations.
1581 :return: A dict representing the json response from AAD:
1583 - A successful response would contain "access_token" key,
1584 - an error response would contain "error" and usually "error_description".
1585 """
1586 claims = _merge_claims_challenge_and_capabilities(
1587 self._client_capabilities, claims_challenge)
1588 if self._enable_broker:
1589 from .broker import _signin_silently
1590 response = _signin_silently(
1591 "https://{}/{}".format(self.authority.instance, self.authority.tenant),
1592 self.client_id,
1593 scopes, # Decorated scopes won't work due to offline_access
1594 MSALRuntime_Username=username,
1595 MSALRuntime_Password=password,
1596 validateAuthority="no" if (
1597 self.authority._is_known_to_developer
1598 or self._instance_discovery is False) else None,
1599 claims=claims,
1600 )
1601 return self._process_broker_response(response, scopes, kwargs.get("data", {}))
1603 scopes = self._decorate_scope(scopes)
1604 telemetry_context = self._build_telemetry_context(
1605 self.ACQUIRE_TOKEN_BY_USERNAME_PASSWORD_ID)
1606 headers = telemetry_context.generate_headers()
1607 data = dict(kwargs.pop("data", {}), claims=claims)
1608 if not self.authority.is_adfs:
1609 user_realm_result = self.authority.user_realm_discovery(
1610 username, correlation_id=headers[msal.telemetry.CLIENT_REQUEST_ID])
1611 if user_realm_result.get("account_type") == "Federated":
1612 response = _clean_up(self._acquire_token_by_username_password_federated(
1613 user_realm_result, username, password, scopes=scopes,
1614 data=data,
1615 headers=headers, **kwargs))
1616 telemetry_context.update_telemetry(response)
1617 return response
1618 response = _clean_up(self.client.obtain_token_by_username_password(
1619 username, password, scope=scopes,
1620 headers=headers,
1621 data=data,
1622 **kwargs))
1623 telemetry_context.update_telemetry(response)
1624 return response
1626 def _acquire_token_by_username_password_federated(
1627 self, user_realm_result, username, password, scopes=None, **kwargs):
1628 wstrust_endpoint = {}
1629 if user_realm_result.get("federation_metadata_url"):
1630 wstrust_endpoint = mex_send_request(
1631 user_realm_result["federation_metadata_url"],
1632 self.http_client)
1633 if wstrust_endpoint is None:
1634 raise ValueError("Unable to find wstrust endpoint from MEX. "
1635 "This typically happens when attempting MSA accounts. "
1636 "More details available here. "
1637 "https://github.com/AzureAD/microsoft-authentication-library-for-python/wiki/Username-Password-Authentication")
1638 logger.debug("wstrust_endpoint = %s", wstrust_endpoint)
1639 wstrust_result = wst_send_request(
1640 username, password,
1641 user_realm_result.get("cloud_audience_urn", "urn:federation:MicrosoftOnline"),
1642 wstrust_endpoint.get("address",
1643 # Fallback to an AAD supplied endpoint
1644 user_realm_result.get("federation_active_auth_url")),
1645 wstrust_endpoint.get("action"), self.http_client)
1646 if not ("token" in wstrust_result and "type" in wstrust_result):
1647 raise RuntimeError("Unsuccessful RSTR. %s" % wstrust_result)
1648 GRANT_TYPE_SAML1_1 = 'urn:ietf:params:oauth:grant-type:saml1_1-bearer'
1649 grant_type = {
1650 SAML_TOKEN_TYPE_V1: GRANT_TYPE_SAML1_1,
1651 SAML_TOKEN_TYPE_V2: self.client.GRANT_TYPE_SAML2,
1652 WSS_SAML_TOKEN_PROFILE_V1_1: GRANT_TYPE_SAML1_1,
1653 WSS_SAML_TOKEN_PROFILE_V2: self.client.GRANT_TYPE_SAML2
1654 }.get(wstrust_result.get("type"))
1655 if not grant_type:
1656 raise RuntimeError(
1657 "RSTR returned unknown token type: %s", wstrust_result.get("type"))
1658 self.client.grant_assertion_encoders.setdefault( # Register a non-standard type
1659 grant_type, self.client.encode_saml_assertion)
1660 return self.client.obtain_token_by_assertion(
1661 wstrust_result["token"], grant_type, scope=scopes,
1662 on_obtaining_tokens=lambda event: self.token_cache.add(dict(
1663 event,
1664 environment=self.authority.instance,
1665 username=username, # Useful in case IDT contains no such info
1666 )),
1667 **kwargs)
1670class PublicClientApplication(ClientApplication): # browser app or mobile app
1672 DEVICE_FLOW_CORRELATION_ID = "_correlation_id"
1673 CONSOLE_WINDOW_HANDLE = object()
1675 def __init__(self, client_id, client_credential=None, **kwargs):
1676 if client_credential is not None:
1677 raise ValueError("Public Client should not possess credentials")
1678 super(PublicClientApplication, self).__init__(
1679 client_id, client_credential=None, **kwargs)
1681 def acquire_token_interactive(
1682 self,
1683 scopes, # type: list[str]
1684 prompt=None,
1685 login_hint=None, # type: Optional[str]
1686 domain_hint=None, # type: Optional[str]
1687 claims_challenge=None,
1688 timeout=None,
1689 port=None,
1690 extra_scopes_to_consent=None,
1691 max_age=None,
1692 parent_window_handle=None,
1693 on_before_launching_ui=None,
1694 **kwargs):
1695 """Acquire token interactively i.e. via a local browser.
1697 Prerequisite: In Azure Portal, configure the Redirect URI of your
1698 "Mobile and Desktop application" as ``http://localhost``.
1699 If you opts in to use broker during ``PublicClientApplication`` creation,
1700 your app also need this Redirect URI:
1701 ``ms-appx-web://Microsoft.AAD.BrokerPlugin/YOUR_CLIENT_ID``
1703 :param list scopes:
1704 It is a list of case-sensitive strings.
1705 :param str prompt:
1706 By default, no prompt value will be sent, not even "none".
1707 You will have to specify a value explicitly.
1708 Its valid values are defined in Open ID Connect specs
1709 https://openid.net/specs/openid-connect-core-1_0.html#AuthRequest
1710 :param str login_hint:
1711 Optional. Identifier of the user. Generally a User Principal Name (UPN).
1712 :param domain_hint:
1713 Can be one of "consumers" or "organizations" or your tenant domain "contoso.com".
1714 If included, it will skip the email-based discovery process that user goes
1715 through on the sign-in page, leading to a slightly more streamlined user experience.
1716 More information on possible values
1717 `here <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-auth-code-flow#request-an-authorization-code>`_ and
1718 `here <https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-oapx/86fb452d-e34a-494e-ac61-e526e263b6d8>`_.
1720 :param claims_challenge:
1721 The claims_challenge parameter requests specific claims requested by the resource provider
1722 in the form of a claims_challenge directive in the www-authenticate header to be
1723 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
1724 It is a string of a JSON object which contains lists of claims being requested from these locations.
1726 :param int timeout:
1727 This method will block the current thread.
1728 This parameter specifies the timeout value in seconds.
1729 Default value ``None`` means wait indefinitely.
1731 :param int port:
1732 The port to be used to listen to an incoming auth response.
1733 By default we will use a system-allocated port.
1734 (The rest of the redirect_uri is hard coded as ``http://localhost``.)
1736 :param list extra_scopes_to_consent:
1737 "Extra scopes to consent" is a concept only available in AAD.
1738 It refers to other resources you might want to prompt to consent for,
1739 in the same interaction, but for which you won't get back a
1740 token for in this particular operation.
1742 :param int max_age:
1743 OPTIONAL. Maximum Authentication Age.
1744 Specifies the allowable elapsed time in seconds
1745 since the last time the End-User was actively authenticated.
1746 If the elapsed time is greater than this value,
1747 Microsoft identity platform will actively re-authenticate the End-User.
1749 MSAL Python will also automatically validate the auth_time in ID token.
1751 New in version 1.15.
1753 :param int parent_window_handle:
1754 OPTIONAL. If your app is a GUI app running on modern Windows system,
1755 and your app opts in to use broker,
1756 you are recommended to also provide its window handle,
1757 so that the sign in UI window will properly pop up on top of your window.
1759 New in version 1.20.0.
1761 :param function on_before_launching_ui:
1762 A callback with the form of
1763 ``lambda ui="xyz", **kwargs: print("A {} will be launched".format(ui))``,
1764 where ``ui`` will be either "browser" or "broker".
1765 You can use it to inform your end user to expect a pop-up window.
1767 New in version 1.20.0.
1769 :return:
1770 - A dict containing no "error" key,
1771 and typically contains an "access_token" key.
1772 - A dict containing an "error" key, when token refresh failed.
1773 """
1774 data = kwargs.pop("data", {})
1775 enable_msa_passthrough = kwargs.pop( # MUST remove it from kwargs
1776 "enable_msa_passthrough", # Keep it as a hidden param, for now.
1777 # OPTIONAL. MSA-Passthrough is a legacy configuration,
1778 # needed by a small amount of Microsoft first-party apps,
1779 # which would login MSA accounts via ".../organizations" authority.
1780 # If you app belongs to this category, AND you are enabling broker,
1781 # you would want to enable this flag. Default value is False.
1782 # More background of MSA-PT is available from this internal docs:
1783 # https://microsoft.sharepoint.com/:w:/t/Identity-DevEx/EatIUauX3c9Ctw1l7AQ6iM8B5CeBZxc58eoQCE0IuZ0VFw?e=tgc3jP&CID=39c853be-76ea-79d7-ee73-f1b2706ede05
1784 False
1785 ) and data.get("token_type") != "ssh-cert" # Work around a known issue as of PyMsalRuntime 0.8
1786 self._validate_ssh_cert_input_data(data)
1787 if not on_before_launching_ui:
1788 on_before_launching_ui = lambda **kwargs: None
1789 if _is_running_in_cloud_shell() and prompt == "none":
1790 # Note: _acquire_token_by_cloud_shell() is always silent,
1791 # so we would not fire on_before_launching_ui()
1792 return self._acquire_token_by_cloud_shell(scopes, data=data)
1793 claims = _merge_claims_challenge_and_capabilities(
1794 self._client_capabilities, claims_challenge)
1795 if self._enable_broker:
1796 if parent_window_handle is None:
1797 raise ValueError(
1798 "parent_window_handle is required when you opted into using broker. "
1799 "You need to provide the window handle of your GUI application, "
1800 "or use msal.PublicClientApplication.CONSOLE_WINDOW_HANDLE "
1801 "when and only when your application is a console app.")
1802 if extra_scopes_to_consent:
1803 logger.warning(
1804 "Ignoring parameter extra_scopes_to_consent, "
1805 "which is not supported by broker")
1806 return self._acquire_token_interactive_via_broker(
1807 scopes,
1808 parent_window_handle,
1809 enable_msa_passthrough,
1810 claims,
1811 data,
1812 on_before_launching_ui,
1813 prompt=prompt,
1814 login_hint=login_hint,
1815 max_age=max_age,
1816 )
1818 on_before_launching_ui(ui="browser")
1819 telemetry_context = self._build_telemetry_context(
1820 self.ACQUIRE_TOKEN_INTERACTIVE)
1821 response = _clean_up(self.client.obtain_token_by_browser(
1822 scope=self._decorate_scope(scopes) if scopes else None,
1823 extra_scope_to_consent=extra_scopes_to_consent,
1824 redirect_uri="http://localhost:{port}".format(
1825 # Hardcode the host, for now. AAD portal rejects 127.0.0.1 anyway
1826 port=port or 0),
1827 prompt=prompt,
1828 login_hint=login_hint,
1829 max_age=max_age,
1830 timeout=timeout,
1831 auth_params={
1832 "claims": claims,
1833 "domain_hint": domain_hint,
1834 },
1835 data=dict(data, claims=claims),
1836 headers=telemetry_context.generate_headers(),
1837 browser_name=_preferred_browser(),
1838 **kwargs))
1839 telemetry_context.update_telemetry(response)
1840 return response
1842 def _acquire_token_interactive_via_broker(
1843 self,
1844 scopes, # type: list[str]
1845 parent_window_handle, # type: int
1846 enable_msa_passthrough, # type: boolean
1847 claims, # type: str
1848 data, # type: dict
1849 on_before_launching_ui, # type: callable
1850 prompt=None,
1851 login_hint=None, # type: Optional[str]
1852 max_age=None,
1853 **kwargs):
1854 from .broker import _signin_interactively, _signin_silently, _acquire_token_silently
1855 if "welcome_template" in kwargs:
1856 logger.debug(kwargs["welcome_template"]) # Experimental
1857 authority = "https://{}/{}".format(
1858 self.authority.instance, self.authority.tenant)
1859 validate_authority = "no" if (
1860 self.authority._is_known_to_developer
1861 or self._instance_discovery is False) else None
1862 # Calls different broker methods to mimic the OIDC behaviors
1863 if login_hint and prompt != "select_account": # OIDC prompts when the user did not sign in
1864 accounts = self.get_accounts(username=login_hint)
1865 if len(accounts) == 1: # Unambiguously proceed with this account
1866 logger.debug("Calling broker._acquire_token_silently()")
1867 response = _acquire_token_silently( # When it works, it bypasses prompt
1868 authority,
1869 self.client_id,
1870 accounts[0]["local_account_id"],
1871 scopes,
1872 claims=claims,
1873 **data)
1874 if response and "error" not in response:
1875 return self._process_broker_response(response, scopes, data)
1876 # login_hint undecisive or not exists
1877 if prompt == "none" or not prompt: # Must/Can attempt _signin_silently()
1878 logger.debug("Calling broker._signin_silently()")
1879 response = _signin_silently( # Unlike OIDC, it doesn't honor login_hint
1880 authority, self.client_id, scopes,
1881 validateAuthority=validate_authority,
1882 claims=claims,
1883 max_age=max_age,
1884 enable_msa_pt=enable_msa_passthrough,
1885 **data)
1886 is_wrong_account = bool(
1887 # _signin_silently() only gets tokens for default account,
1888 # but this seems to have been fixed in PyMsalRuntime 0.11.2
1889 "access_token" in response and login_hint
1890 and response.get("id_token_claims", {}) != login_hint)
1891 wrong_account_error_message = (
1892 'prompt="none" will not work for login_hint="non-default-user"')
1893 if is_wrong_account:
1894 logger.debug(wrong_account_error_message)
1895 if prompt == "none":
1896 return self._process_broker_response( # It is either token or error
1897 response, scopes, data
1898 ) if not is_wrong_account else {
1899 "error": "broker_error",
1900 "error_description": wrong_account_error_message,
1901 }
1902 else:
1903 assert bool(prompt) is False
1904 from pymsalruntime import Response_Status
1905 recoverable_errors = frozenset([
1906 Response_Status.Status_AccountUnusable,
1907 Response_Status.Status_InteractionRequired,
1908 ])
1909 if is_wrong_account or "error" in response and response.get(
1910 "_broker_status") in recoverable_errors:
1911 pass # It will fall back to the _signin_interactively()
1912 else:
1913 return self._process_broker_response(response, scopes, data)
1915 logger.debug("Falls back to broker._signin_interactively()")
1916 on_before_launching_ui(ui="broker")
1917 response = _signin_interactively(
1918 authority, self.client_id, scopes,
1919 None if parent_window_handle is self.CONSOLE_WINDOW_HANDLE
1920 else parent_window_handle,
1921 validateAuthority=validate_authority,
1922 login_hint=login_hint,
1923 prompt=prompt,
1924 claims=claims,
1925 max_age=max_age,
1926 enable_msa_pt=enable_msa_passthrough,
1927 **data)
1928 return self._process_broker_response(response, scopes, data)
1930 def initiate_device_flow(self, scopes=None, **kwargs):
1931 """Initiate a Device Flow instance,
1932 which will be used in :func:`~acquire_token_by_device_flow`.
1934 :param list[str] scopes:
1935 Scopes requested to access a protected API (a resource).
1936 :return: A dict representing a newly created Device Flow object.
1938 - A successful response would contain "user_code" key, among others
1939 - an error response would contain some other readable key/value pairs.
1940 """
1941 correlation_id = msal.telemetry._get_new_correlation_id()
1942 flow = self.client.initiate_device_flow(
1943 scope=self._decorate_scope(scopes or []),
1944 headers={msal.telemetry.CLIENT_REQUEST_ID: correlation_id},
1945 **kwargs)
1946 flow[self.DEVICE_FLOW_CORRELATION_ID] = correlation_id
1947 return flow
1949 def acquire_token_by_device_flow(self, flow, claims_challenge=None, **kwargs):
1950 """Obtain token by a device flow object, with customizable polling effect.
1952 :param dict flow:
1953 A dict previously generated by :func:`~initiate_device_flow`.
1954 By default, this method's polling effect will block current thread.
1955 You can abort the polling loop at any time,
1956 by changing the value of the flow's "expires_at" key to 0.
1957 :param claims_challenge:
1958 The claims_challenge parameter requests specific claims requested by the resource provider
1959 in the form of a claims_challenge directive in the www-authenticate header to be
1960 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
1961 It is a string of a JSON object which contains lists of claims being requested from these locations.
1963 :return: A dict representing the json response from AAD:
1965 - A successful response would contain "access_token" key,
1966 - an error response would contain "error" and usually "error_description".
1967 """
1968 telemetry_context = self._build_telemetry_context(
1969 self.ACQUIRE_TOKEN_BY_DEVICE_FLOW_ID,
1970 correlation_id=flow.get(self.DEVICE_FLOW_CORRELATION_ID))
1971 response = _clean_up(self.client.obtain_token_by_device_flow(
1972 flow,
1973 data=dict(
1974 kwargs.pop("data", {}),
1975 code=flow["device_code"], # 2018-10-4 Hack:
1976 # during transition period,
1977 # service seemingly need both device_code and code parameter.
1978 claims=_merge_claims_challenge_and_capabilities(
1979 self._client_capabilities, claims_challenge),
1980 ),
1981 headers=telemetry_context.generate_headers(),
1982 **kwargs))
1983 telemetry_context.update_telemetry(response)
1984 return response
1987class ConfidentialClientApplication(ClientApplication): # server-side web app
1989 def acquire_token_for_client(self, scopes, claims_challenge=None, **kwargs):
1990 """Acquires token for the current confidential client, not for an end user.
1992 :param list[str] scopes: (Required)
1993 Scopes requested to access a protected API (a resource).
1994 :param claims_challenge:
1995 The claims_challenge parameter requests specific claims requested by the resource provider
1996 in the form of a claims_challenge directive in the www-authenticate header to be
1997 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
1998 It is a string of a JSON object which contains lists of claims being requested from these locations.
2000 :return: A dict representing the json response from AAD:
2002 - A successful response would contain "access_token" key,
2003 - an error response would contain "error" and usually "error_description".
2004 """
2005 # TBD: force_refresh behavior
2006 if self.authority.tenant.lower() in ["common", "organizations"]:
2007 warnings.warn(
2008 "Using /common or /organizations authority "
2009 "in acquire_token_for_client() is unreliable. "
2010 "Please use a specific tenant instead.", DeprecationWarning)
2011 self._validate_ssh_cert_input_data(kwargs.get("data", {}))
2012 telemetry_context = self._build_telemetry_context(
2013 self.ACQUIRE_TOKEN_FOR_CLIENT_ID)
2014 client = self._regional_client or self.client
2015 response = _clean_up(client.obtain_token_for_client(
2016 scope=scopes, # This grant flow requires no scope decoration
2017 headers=telemetry_context.generate_headers(),
2018 data=dict(
2019 kwargs.pop("data", {}),
2020 claims=_merge_claims_challenge_and_capabilities(
2021 self._client_capabilities, claims_challenge)),
2022 **kwargs))
2023 telemetry_context.update_telemetry(response)
2024 return response
2026 def acquire_token_on_behalf_of(self, user_assertion, scopes, claims_challenge=None, **kwargs):
2027 """Acquires token using on-behalf-of (OBO) flow.
2029 The current app is a middle-tier service which was called with a token
2030 representing an end user.
2031 The current app can use such token (a.k.a. a user assertion) to request
2032 another token to access downstream web API, on behalf of that user.
2033 See `detail docs here <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-on-behalf-of-flow>`_ .
2035 The current middle-tier app has no user interaction to obtain consent.
2036 See how to gain consent upfront for your middle-tier app from this article.
2037 https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-on-behalf-of-flow#gaining-consent-for-the-middle-tier-application
2039 :param str user_assertion: The incoming token already received by this app
2040 :param list[str] scopes: Scopes required by downstream API (a resource).
2041 :param claims_challenge:
2042 The claims_challenge parameter requests specific claims requested by the resource provider
2043 in the form of a claims_challenge directive in the www-authenticate header to be
2044 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
2045 It is a string of a JSON object which contains lists of claims being requested from these locations.
2047 :return: A dict representing the json response from AAD:
2049 - A successful response would contain "access_token" key,
2050 - an error response would contain "error" and usually "error_description".
2051 """
2052 telemetry_context = self._build_telemetry_context(
2053 self.ACQUIRE_TOKEN_ON_BEHALF_OF_ID)
2054 # The implementation is NOT based on Token Exchange
2055 # https://tools.ietf.org/html/draft-ietf-oauth-token-exchange-16
2056 response = _clean_up(self.client.obtain_token_by_assertion( # bases on assertion RFC 7521
2057 user_assertion,
2058 self.client.GRANT_TYPE_JWT, # IDTs and AAD ATs are all JWTs
2059 scope=self._decorate_scope(scopes), # Decoration is used for:
2060 # 1. Explicitly requesting an RT, without relying on AAD default
2061 # behavior, even though it currently still issues an RT.
2062 # 2. Requesting an IDT (which would otherwise be unavailable)
2063 # so that the calling app could use id_token_claims to implement
2064 # their own cache mapping, which is likely needed in web apps.
2065 data=dict(
2066 kwargs.pop("data", {}),
2067 requested_token_use="on_behalf_of",
2068 claims=_merge_claims_challenge_and_capabilities(
2069 self._client_capabilities, claims_challenge)),
2070 headers=telemetry_context.generate_headers(),
2071 # TBD: Expose a login_hint (or ccs_routing_hint) param for web app
2072 **kwargs))
2073 telemetry_context.update_telemetry(response)
2074 return response