Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/msal/application.py: 16%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

702 statements  

1import functools 

2import json 

3import time 

4import logging 

5import sys 

6import warnings 

7from threading import Lock 

8from typing import Optional # Needed in Python 3.7 & 3.8 

9from urllib.parse import urlparse 

10import os 

11 

12from .oauth2cli import Client, JwtAssertionCreator 

13from .oauth2cli.oidc import decode_part 

14from .authority import ( 

15 Authority, 

16 WORLD_WIDE, 

17 _get_instance_discovery_endpoint, 

18 _get_instance_discovery_host, 

19) 

20from .mex import send_request as mex_send_request 

21from .wstrust_request import send_request as wst_send_request 

22from .wstrust_response import * 

23from .token_cache import TokenCache, _get_username, _GRANT_TYPE_BROKER, _compute_ext_cache_key 

24import msal.telemetry 

25from .region import _detect_region 

26from .throttled_http_client import ThrottledHttpClient 

27from .cloudshell import _is_running_in_cloud_shell 

28from .sku import SKU, __version__ 

29from .oauth2cli.authcode import is_wsl 

30 

31 

32logger = logging.getLogger(__name__) 

33_AUTHORITY_TYPE_CLOUDSHELL = "CLOUDSHELL" 

34 

35def _init_broker(enable_pii_log): # Make it a function to allow mocking 

36 from . import broker # Trigger Broker's initialization, lazily 

37 if enable_pii_log: 

38 broker._enable_pii_log() 

39 

40def extract_certs(public_cert_content): 

41 # Parses raw public certificate file contents and returns a list of strings 

42 # Usage: headers = {"x5c": extract_certs(open("my_cert.pem").read())} 

43 public_certificates = re.findall( 

44 r'-----BEGIN CERTIFICATE-----(?P<cert_value>[^-]+)-----END CERTIFICATE-----', 

45 public_cert_content, re.I) 

46 if public_certificates: 

47 return [cert.strip() for cert in public_certificates] 

48 # The public cert tags are not found in the input, 

49 # let's make best effort to exclude a private key pem file. 

50 if "PRIVATE KEY" in public_cert_content: 

51 raise ValueError( 

52 "We expect your public key but detect a private key instead") 

53 return [public_cert_content.strip()] 

54 

55 

56def _merge_claims_challenge_and_capabilities(capabilities, claims_challenge): 

57 # Represent capabilities as {"access_token": {"xms_cc": {"values": capabilities}}} 

58 # and then merge/add it into incoming claims 

59 if not capabilities: 

60 return claims_challenge 

61 claims_dict = json.loads(claims_challenge) if claims_challenge else {} 

62 for key in ["access_token"]: # We could add "id_token" if we'd decide to 

63 claims_dict.setdefault(key, {}).update(xms_cc={"values": capabilities}) 

64 return json.dumps(claims_dict) 

65 

66 

67def _str2bytes(raw): 

68 # A conversion based on duck-typing rather than six.text_type 

69 try: 

70 return raw.encode(encoding="utf-8") 

71 except: 

72 return raw 

73 

74def _extract_cert_and_thumbprints(cert): 

75 # Cert concepts https://security.stackexchange.com/a/226758/125264 

76 from cryptography.hazmat.primitives import hashes, serialization 

77 cert_pem = cert.public_bytes( # Requires cryptography 1.0+ 

78 encoding=serialization.Encoding.PEM).decode() 

79 x5c = [ 

80 '\n'.join( 

81 cert_pem.splitlines() 

82 [1:-1] # Strip the "--- header ---" and "--- footer ---" 

83 ) 

84 ] 

85 # https://cryptography.io/en/latest/x509/reference/#x-509-certificate-object - Requires cryptography 0.7+ 

86 sha256_thumbprint = cert.fingerprint(hashes.SHA256()).hex() 

87 sha1_thumbprint = cert.fingerprint(hashes.SHA1()).hex() # CodeQL [SM02167] for legacy support such as ADFS 

88 return sha256_thumbprint, sha1_thumbprint, x5c 

89 

90def _parse_pfx(pfx_path, passphrase_bytes): 

91 # Cert concepts https://security.stackexchange.com/a/226758/125264 

92 from cryptography.hazmat.primitives.serialization import pkcs12 

93 with open(pfx_path, 'rb') as f: 

94 private_key, cert, _ = pkcs12.load_key_and_certificates( # cryptography 2.5+ 

95 # https://cryptography.io/en/latest/hazmat/primitives/asymmetric/serialization/#cryptography.hazmat.primitives.serialization.pkcs12.load_key_and_certificates 

96 f.read(), passphrase_bytes) 

97 if not (private_key and cert): 

98 raise ValueError("Your PFX file shall contain both private key and cert") 

99 sha256_thumbprint, sha1_thumbprint, x5c = _extract_cert_and_thumbprints(cert) 

100 return private_key, sha256_thumbprint, sha1_thumbprint, x5c 

101 

102 

103def _load_private_key_from_pem_str(private_key_pem_str, passphrase_bytes): 

104 from cryptography.hazmat.primitives import serialization 

105 from cryptography.hazmat.backends import default_backend 

106 return serialization.load_pem_private_key( # cryptography 0.6+ 

107 _str2bytes(private_key_pem_str), 

108 passphrase_bytes, 

109 backend=default_backend(), # It was a required param until 2020 

110 ) 

111 

112 

113def _pii_less_home_account_id(home_account_id): 

114 parts = home_account_id.split(".") # It could contain one or two parts 

115 parts[0] = "********" 

116 return ".".join(parts) 

117 

118 

119def _clean_up(result): 

120 if isinstance(result, dict): 

121 if "_msalruntime_telemetry" in result or "_msal_python_telemetry" in result: 

122 result["msal_telemetry"] = json.dumps({ # Telemetry as an opaque string 

123 "msalruntime_telemetry": result.get("_msalruntime_telemetry"), 

124 "msal_python_telemetry": result.get("_msal_python_telemetry"), 

125 }, separators=(",", ":")) 

126 return_value = { 

127 k: result[k] for k in result 

128 if k != "refresh_in" # MSAL handled refresh_in, customers need not 

129 and not k.startswith('_') # Skim internal properties 

130 } 

131 if "refresh_in" in result: # To encourage proactive refresh 

132 return_value["refresh_on"] = int(time.time() + result["refresh_in"]) 

133 return return_value 

134 return result # It could be None 

135 

136 

137def _preferred_browser(): 

138 """Register Edge and return a name suitable for subsequent webbrowser.get(...) 

139 when appropriate. Otherwise return None. 

140 """ 

141 # On Linux, only Edge will provide device-based Conditional Access support 

142 if sys.platform != "linux": # On other platforms, we have no browser preference 

143 return None 

144 browser_path = "/usr/bin/microsoft-edge" # Use a full path owned by sys admin 

145 # Note: /usr/bin/microsoft-edge, /usr/bin/microsoft-edge-stable, etc. 

146 # are symlinks that point to the actual binaries which are found under 

147 # /opt/microsoft/msedge/msedge or /opt/microsoft/msedge-beta/msedge. 

148 # Either method can be used to detect an Edge installation. 

149 user_has_no_preference = "BROWSER" not in os.environ 

150 user_wont_mind_edge = "microsoft-edge" in os.environ.get("BROWSER", "") # Note: 

151 # BROWSER could contain "microsoft-edge" or "/path/to/microsoft-edge". 

152 # Python documentation (https://docs.python.org/3/library/webbrowser.html) 

153 # does not document the name being implicitly register, 

154 # so there is no public API to know whether the ENV VAR browser would work. 

155 # Therefore, we would not bother examine the env var browser's type. 

156 # We would just register our own Edge instance. 

157 if (user_has_no_preference or user_wont_mind_edge) and os.path.exists(browser_path): 

158 try: 

159 import webbrowser # Lazy import. Some distro may not have this. 

160 browser_name = "msal-edge" # Avoid popular name "microsoft-edge" 

161 # otherwise `BROWSER="microsoft-edge"; webbrowser.get("microsoft-edge")` 

162 # would return a GenericBrowser instance which won't work. 

163 try: 

164 registration_available = isinstance( 

165 webbrowser.get(browser_name), webbrowser.BackgroundBrowser) 

166 except webbrowser.Error: 

167 registration_available = False 

168 if not registration_available: 

169 logger.debug("Register %s with %s", browser_name, browser_path) 

170 # By registering our own browser instance with our own name, 

171 # rather than populating a process-wide BROWSER enn var, 

172 # this approach does not have side effect on non-MSAL code path. 

173 webbrowser.register( # Even double-register happens to work fine 

174 browser_name, None, webbrowser.BackgroundBrowser(browser_path)) 

175 return browser_name 

176 except ImportError: 

177 pass # We may still proceed 

178 return None 

179 

180def _is_ssh_cert_or_pop_request(token_type, auth_scheme) -> bool: 

181 return token_type == "ssh-cert" or token_type == "pop" or isinstance(auth_scheme, msal.auth_scheme.PopAuthScheme) 

182 

183class _ClientWithCcsRoutingInfo(Client): 

184 

185 def initiate_auth_code_flow(self, **kwargs): 

186 if kwargs.get("login_hint"): # eSTS could have utilized this as-is, but nope 

187 kwargs["X-AnchorMailbox"] = "UPN:%s" % kwargs["login_hint"] 

188 return super(_ClientWithCcsRoutingInfo, self).initiate_auth_code_flow( 

189 client_info=1, # To be used as CSS Routing info 

190 **kwargs) 

191 

192 def obtain_token_by_auth_code_flow( 

193 self, auth_code_flow, auth_response, **kwargs): 

194 # Note: the obtain_token_by_browser() is also covered by this 

195 assert isinstance(auth_code_flow, dict) and isinstance(auth_response, dict) 

196 headers = kwargs.pop("headers", {}) 

197 client_info = json.loads( 

198 decode_part(auth_response["client_info"]) 

199 ) if auth_response.get("client_info") else {} 

200 if "uid" in client_info and "utid" in client_info: 

201 # Note: The value of X-AnchorMailbox is also case-insensitive 

202 headers["X-AnchorMailbox"] = "Oid:{uid}@{utid}".format(**client_info) 

203 return super(_ClientWithCcsRoutingInfo, self).obtain_token_by_auth_code_flow( 

204 auth_code_flow, auth_response, headers=headers, **kwargs) 

205 

206 def obtain_token_by_username_password(self, username, password, **kwargs): 

207 headers = kwargs.pop("headers", {}) 

208 headers["X-AnchorMailbox"] = "upn:{}".format(username) 

209 return super(_ClientWithCcsRoutingInfo, self).obtain_token_by_username_password( 

210 username, password, headers=headers, **kwargs) 

211 

212 

213def _msal_extension_check(): 

214 # Can't run this in module or class level otherwise you'll get circular import error 

215 try: 

216 from msal_extensions import __version__ as v 

217 major, minor, _ = v.split(".", maxsplit=3) 

218 if not (int(major) >= 1 and int(minor) >= 2): 

219 warnings.warn( 

220 "Please upgrade msal-extensions. " 

221 "Only msal-extensions 1.2+ can work with msal 1.30+") 

222 except ImportError: 

223 pass # The optional msal_extensions is not installed. Business as usual. 

224 except ValueError: 

225 logger.exception(f"msal_extensions version {v} not in major.minor.patch format") 

226 except: 

227 logger.exception( 

228 "Unable to import msal_extensions during an optional check. " 

229 "This exception can be safely ignored." 

230 ) 

231 

232 

233class ClientApplication(object): 

234 """You do not usually directly use this class. Use its subclasses instead: 

235 :class:`PublicClientApplication` and :class:`ConfidentialClientApplication`. 

236 """ 

237 ACQUIRE_TOKEN_SILENT_ID = "84" 

238 ACQUIRE_TOKEN_BY_REFRESH_TOKEN = "85" 

239 ACQUIRE_TOKEN_BY_USERNAME_PASSWORD_ID = "301" 

240 ACQUIRE_TOKEN_ON_BEHALF_OF_ID = "523" 

241 ACQUIRE_TOKEN_BY_DEVICE_FLOW_ID = "622" 

242 ACQUIRE_TOKEN_FOR_CLIENT_ID = "730" 

243 ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID = "832" 

244 ACQUIRE_TOKEN_INTERACTIVE = "169" 

245 ACQUIRE_TOKEN_BY_USER_FIC_ID = "950" 

246 GET_ACCOUNTS_ID = "902" 

247 REMOVE_ACCOUNT_ID = "903" 

248 

249 ATTEMPT_REGION_DISCOVERY = True # "TryAutoDetect" 

250 DISABLE_MSAL_FORCE_REGION = False # Used in azure_region to disable MSAL_FORCE_REGION behavior 

251 _TOKEN_SOURCE = "token_source" 

252 _TOKEN_SOURCE_IDP = "identity_provider" 

253 _TOKEN_SOURCE_CACHE = "cache" 

254 _TOKEN_SOURCE_BROKER = "broker" 

255 

256 _enable_broker = False 

257 _AUTH_SCHEME_UNSUPPORTED = ( 

258 "auth_scheme is currently only available from broker. " 

259 "You can enable broker by following these instructions. " 

260 "https://msal-python.readthedocs.io/en/latest/#publicclientapplication") 

261 

262 def __init__( 

263 self, client_id, 

264 client_credential=None, authority=None, validate_authority=True, 

265 token_cache=None, 

266 http_client=None, 

267 verify=True, proxies=None, timeout=None, 

268 client_claims=None, app_name=None, app_version=None, 

269 client_capabilities=None, 

270 azure_region=None, # Note: We choose to add this param in this base class, 

271 # despite it is currently only needed by ConfidentialClientApplication. 

272 # This way, it holds the same positional param place for PCA, 

273 # when we would eventually want to add this feature to PCA in future. 

274 exclude_scopes=None, 

275 http_cache=None, 

276 instance_discovery=None, 

277 allow_broker=None, 

278 enable_pii_log=None, 

279 oidc_authority=None, 

280 ): 

281 """Create an instance of application. 

282 

283 :param str client_id: Your app has a client_id after you register it on Microsoft Entra admin center. 

284 

285 :param client_credential: 

286 For :class:`PublicClientApplication`, you use `None` here. 

287 

288 For :class:`ConfidentialClientApplication`, 

289 it supports many different input formats for different scenarios. 

290 

291 .. admonition:: Support using a client secret. 

292 

293 Just feed in a string, such as ``"your client secret"``. 

294 

295 .. admonition:: Support using a certificate in X.509 (.pem) format 

296 

297 Deprecated because it uses SHA-1 thumbprint, 

298 unless you are still using ADFS which supports SHA-1 thumbprint only. 

299 Please use the .pfx option documented later in this page. 

300 

301 Feed in a dict in this form:: 

302 

303 { 

304 "private_key": "...-----BEGIN PRIVATE KEY-----... in PEM format", 

305 "thumbprint": "An SHA-1 thumbprint such as A1B2C3D4E5F6..." 

306 "Changed in version 1.35.0, if thumbprint is absent" 

307 "and a public_certificate is present, MSAL will" 

308 "automatically calculate an SHA-256 thumbprint instead.", 

309 "passphrase": "Needed if the private_key is encrypted (Added in version 1.6.0)", 

310 "public_certificate": "...-----BEGIN CERTIFICATE-----...", # Needed if you use Subject Name/Issuer auth. Added in version 0.5.0. 

311 } 

312 

313 MSAL Python requires a "private_key" in PEM format. 

314 If your cert is in PKCS12 (.pfx) format, 

315 you can convert it to X.509 (.pem) format, 

316 by ``openssl pkcs12 -in file.pfx -out file.pem -nodes``. 

317 

318 The thumbprint is available in your app's registration in Azure Portal. 

319 Alternatively, you can `calculate the thumbprint <https://github.com/Azure/azure-sdk-for-python/blob/07d10639d7e47f4852eaeb74aef5d569db499d6e/sdk/identity/azure-identity/azure/identity/_credentials/certificate.py#L94-L97>`_. 

320 

321 ``public_certificate`` (optional) is public key certificate 

322 which will be sent through 'x5c' JWT header. 

323 This is useful when you use `Subject Name/Issuer Authentication 

324 <https://github.com/AzureAD/microsoft-authentication-library-for-python/issues/60>`_ 

325 which is an approach to allow easier certificate rotation. 

326 Per `specs <https://tools.ietf.org/html/rfc7515#section-4.1.6>`_, 

327 "the certificate containing 

328 the public key corresponding to the key used to digitally sign the 

329 JWS MUST be the first certificate. This MAY be followed by 

330 additional certificates, with each subsequent certificate being the 

331 one used to certify the previous one." 

332 However, your certificate's issuer may use a different order. 

333 So, if your attempt ends up with an error AADSTS700027 - 

334 "The provided signature value did not match the expected signature value", 

335 you may try use only the leaf cert (in PEM/str format) instead. 

336 

337 .. admonition:: Supporting raw assertion obtained from elsewhere 

338 

339 *Added in version 1.13.0*: 

340 It can also be a completely pre-signed assertion that you've assembled yourself. 

341 Simply pass a container containing only the key "client_assertion", like this:: 

342 

343 { 

344 "client_assertion": "...a JWT with claims aud, exp, iss, jti, nbf, and sub..." 

345 } 

346 

347 .. note:: 

348 

349 A pre-signed JWT string has a fixed expiration. Long-running 

350 confidential client applications (for example, workloads using 

351 AKS workload identity federation, or any other dynamic 

352 credential source) should instead pass a **callable** which 

353 MSAL will invoke on demand to obtain a fresh assertion:: 

354 

355 def get_client_assertion(): 

356 # e.g. read the projected service-account token from disk 

357 with open("/var/run/secrets/azure/tokens/azure-identity-token") as f: 

358 return f.read() 

359 

360 app = ConfidentialClientApplication( 

361 "client_id", 

362 client_credential={"client_assertion": get_client_assertion}, 

363 ..., 

364 ) 

365 

366 The callable is only invoked when MSAL needs to send a token 

367 request on the wire (the in-memory token cache transparently 

368 avoids unnecessary calls). 

369 

370 If your callback is itself expensive (for example it calls 

371 out to a key vault), wrap it in :class:`msal.AutoRefresher` 

372 to memoize the assertion for its lifetime:: 

373 

374 from msal import AutoRefresher 

375 smart_callback = AutoRefresher(get_client_assertion, expires_in=3600) 

376 app = ConfidentialClientApplication( 

377 "client_id", 

378 client_credential={"client_assertion": smart_callback}, 

379 ..., 

380 ) 

381 

382 Passing a plain ``str`` / ``bytes`` ``client_assertion`` is 

383 still supported for backward compatibility but is discouraged 

384 because the assertion will eventually expire. 

385 

386 .. admonition:: Supporting reading client certificates from PFX files 

387 

388 This usage will automatically use SHA-256 thumbprint of the certificate. 

389 

390 *Added in version 1.29.0*: 

391 Feed in a dictionary containing the path to a PFX file:: 

392 

393 { 

394 "private_key_pfx_path": "/path/to/your.pfx", # Added in version 1.29.0 

395 "public_certificate": True, # Only needed if you use Subject Name/Issuer auth. Added in version 1.30.0 

396 "passphrase": "Passphrase if the private_key is encrypted (Optional)", 

397 } 

398 

399 The following command will generate a .pfx file from your .key and .pem file:: 

400 

401 openssl pkcs12 -export -out certificate.pfx -inkey privateKey.key -in certificate.pem 

402 

403 `Subject Name/Issuer Auth 

404 <https://github.com/AzureAD/microsoft-authentication-library-for-python/issues/60>`_ 

405 is an approach to allow easier certificate rotation. 

406 If your .pfx file contains both the private key and public cert, 

407 you can opt in for Subject Name/Issuer Auth by setting "public_certificate" to ``True``. 

408 

409 :type client_credential: Union[dict, str, None] 

410 

411 :param dict client_claims: 

412 *Added in version 0.5.0*: 

413 It is a dictionary of extra claims that would be signed by 

414 by this :class:`ConfidentialClientApplication` 's private key. 

415 For example, you can use {"client_ip": "x.x.x.x"}. 

416 You may also override any of the following default claims:: 

417 

418 { 

419 "aud": the_token_endpoint, 

420 "iss": self.client_id, 

421 "sub": same_as_issuer, 

422 "exp": now + 10_min, 

423 "iat": now, 

424 "jti": a_random_uuid 

425 } 

426 

427 :param str authority: 

428 A URL that identifies a token authority. It should be of the format 

429 ``https://login.microsoftonline.com/your_tenant`` 

430 By default, we will use ``https://login.microsoftonline.com/common`` 

431 

432 *Changed in version 1.17*: you can also use predefined constant 

433 and a builder like this:: 

434 

435 from msal.authority import ( 

436 AuthorityBuilder, 

437 AZURE_US_GOVERNMENT, AZURE_CHINA, AZURE_PUBLIC) 

438 my_authority = AuthorityBuilder(AZURE_PUBLIC, "contoso.onmicrosoft.com") 

439 # Now you get an equivalent of 

440 # "https://login.microsoftonline.com/contoso.onmicrosoft.com" 

441 

442 # You can feed such an authority to msal's ClientApplication 

443 from msal import PublicClientApplication 

444 app = PublicClientApplication("my_client_id", authority=my_authority, ...) 

445 

446 :param bool validate_authority: (optional) Turns authority validation 

447 on or off. This parameter default to true. 

448 :param TokenCache token_cache: 

449 Sets the token cache used by this ClientApplication instance. 

450 By default, an in-memory cache will be created and used. 

451 :param http_client: (optional) 

452 Your implementation of abstract class HttpClient <msal.oauth2cli.http.http_client> 

453 Defaults to a requests session instance. 

454 Since MSAL 1.11.0, the default session would be configured 

455 to attempt one retry on connection error. 

456 If you are providing your own http_client, 

457 it will be your http_client's duty to decide whether to perform retry. 

458 

459 :param verify: (optional) 

460 It will be passed to the 

461 `verify parameter in the underlying requests library 

462 <http://docs.python-requests.org/en/v2.9.1/user/advanced/#ssl-cert-verification>`_ 

463 This does not apply if you have chosen to pass your own Http client 

464 :param proxies: (optional) 

465 It will be passed to the 

466 `proxies parameter in the underlying requests library 

467 <http://docs.python-requests.org/en/v2.9.1/user/advanced/#proxies>`_ 

468 This does not apply if you have chosen to pass your own Http client 

469 :param timeout: (optional) 

470 It will be passed to the 

471 `timeout parameter in the underlying requests library 

472 <http://docs.python-requests.org/en/v2.9.1/user/advanced/#timeouts>`_ 

473 This does not apply if you have chosen to pass your own Http client 

474 :param app_name: (optional) 

475 You can provide your application name for Microsoft telemetry purposes. 

476 Default value is None, means it will not be passed to Microsoft. 

477 :param app_version: (optional) 

478 You can provide your application version for Microsoft telemetry purposes. 

479 Default value is None, means it will not be passed to Microsoft. 

480 :param list[str] client_capabilities: (optional) 

481 Allows configuration of one or more client capabilities, e.g. ["CP1"]. 

482 

483 Client capability is meant to inform the Microsoft identity platform 

484 (STS) what this client is capable for, 

485 so STS can decide to turn on certain features. 

486 For example, if client is capable to handle *claims challenge*, 

487 STS may issue 

488 `Continuous Access Evaluation (CAE) <https://learn.microsoft.com/entra/identity/conditional-access/concept-continuous-access-evaluation>`_ 

489 access tokens to resources, 

490 knowing that when the resource emits a *claims challenge* 

491 the client will be able to handle those challenges. 

492 

493 Implementation details: 

494 Client capability is implemented using "claims" parameter on the wire, 

495 for now. 

496 MSAL will combine them into 

497 `claims parameter <https://openid.net/specs/openid-connect-core-1_0-final.html#ClaimsParameter>`_ 

498 which you will later provide via one of the acquire-token request. 

499 

500 :param str azure_region: (optional) 

501 Instructs MSAL to use the Entra regional token service. This legacy feature is only available to 

502 first-party applications. Only ``acquire_token_for_client()`` is supported. 

503 

504 Supports 4 values: 

505 

506 1. ``azure_region=None`` - This default value means no region is configured. 

507 MSAL will use the region defined in env var ``MSAL_FORCE_REGION``. 

508 2. ``azure_region="some_region"`` - meaning the specified region is used. 

509 3. ``azure_region=True`` - meaning 

510 MSAL will try to auto-detect the region. This is not recommended. 

511 4. ``azure_region=False`` - meaning MSAL will use no region. 

512 

513 .. note:: 

514 Region auto-discovery has been tested on VMs and on Azure Functions. It is unreliable. 

515 Applications using this option should configure a short timeout. 

516 

517 For more details and for the values of the region string 

518 see https://learn.microsoft.com/entra/msal/dotnet/resources/region-discovery-troubleshooting 

519 

520 New in version 1.12.0. 

521 

522 :param list[str] exclude_scopes: (optional) 

523 Historically MSAL hardcodes `offline_access` scope, 

524 which would allow your app to have prolonged access to user's data. 

525 If that is unnecessary or undesirable for your app, 

526 now you can use this parameter to supply an exclusion list of scopes, 

527 such as ``exclude_scopes = ["offline_access"]``. 

528 

529 :param dict http_cache: 

530 MSAL has long been caching tokens in the ``token_cache``. 

531 Recently, MSAL also introduced a concept of ``http_cache``, 

532 by automatically caching some finite amount of non-token http responses, 

533 so that *long-lived* 

534 ``PublicClientApplication`` and ``ConfidentialClientApplication`` 

535 would be more performant and responsive in some situations. 

536 

537 This ``http_cache`` parameter accepts any dict-like object. 

538 If not provided, MSAL will use an in-memory dict. 

539 

540 If your app is a command-line app (CLI), 

541 you would want to persist your http_cache across different CLI runs. 

542 The persisted file's format may change due to, but not limited to, 

543 `unstable protocol <https://docs.python.org/3/library/pickle.html#data-stream-format>`_, 

544 so your implementation shall tolerate unexpected loading errors. 

545 The following recipe shows a way to do so:: 

546 

547 # Just add the following lines at the beginning of your CLI script 

548 import sys, atexit, pickle, logging 

549 http_cache_filename = sys.argv[0] + ".http_cache" 

550 try: 

551 with open(http_cache_filename, "rb") as f: 

552 persisted_http_cache = pickle.load(f) # Take a snapshot 

553 except ( 

554 FileNotFoundError, # Or IOError in Python 2 

555 pickle.UnpicklingError, # A corrupted http cache file 

556 AttributeError, # Cache created by a different version of MSAL 

557 ): 

558 persisted_http_cache = {} # Recover by starting afresh 

559 except: # Unexpected exceptions 

560 logging.exception("You may want to debug this") 

561 persisted_http_cache = {} # Recover by starting afresh 

562 atexit.register(lambda: pickle.dump( 

563 # When exit, flush it back to the file. 

564 # It may occasionally overwrite another process's concurrent write, 

565 # but that is fine. Subsequent runs will reach eventual consistency. 

566 persisted_http_cache, open(http_cache_file, "wb"))) 

567 

568 # And then you can implement your app as you normally would 

569 app = msal.PublicClientApplication( 

570 "your_client_id", 

571 ..., 

572 http_cache=persisted_http_cache, # Utilize persisted_http_cache 

573 ..., 

574 #token_cache=..., # You may combine the old token_cache trick 

575 # Please refer to token_cache recipe at 

576 # https://msal-python.readthedocs.io/en/latest/#msal.SerializableTokenCache 

577 ) 

578 app.acquire_token_interactive(["your", "scope"], ...) 

579 

580 Content inside ``http_cache`` are cheap to obtain. 

581 There is no need to share them among different apps. 

582 

583 Content inside ``http_cache`` will contain no tokens nor 

584 Personally Identifiable Information (PII). Encryption is unnecessary. 

585 

586 New in version 1.16.0. 

587 

588 :param boolean instance_discovery: 

589 Historically, MSAL would connect to a central endpoint located at 

590 ``https://login.microsoftonline.com`` to acquire some metadata, 

591 especially when using an unfamiliar authority. 

592 This behavior is known as Instance Discovery. 

593 

594 This parameter defaults to None, which enables the Instance Discovery. 

595 

596 If you know some authorities which you allow MSAL to operate with as-is, 

597 without involving any Instance Discovery, the recommended pattern is:: 

598 

599 known_authorities = frozenset([ # Treat your known authorities as const 

600 "https://contoso.com/adfs", "https://login.azs/foo"]) 

601 ... 

602 authority = "https://contoso.com/adfs" # Assuming your app will use this 

603 app1 = PublicClientApplication( 

604 "client_id", 

605 authority=authority, 

606 # Conditionally disable Instance Discovery for known authorities 

607 instance_discovery=authority not in known_authorities, 

608 ) 

609 

610 If you do not know some authorities beforehand, 

611 yet still want MSAL to accept any authority that you will provide, 

612 you can use a ``False`` to unconditionally disable Instance Discovery. 

613 

614 New in version 1.19.0. 

615 

616 :param boolean allow_broker: 

617 Deprecated. Please use ``enable_broker_on_windows`` instead. 

618 

619 :param boolean enable_pii_log: 

620 When enabled, logs may include PII (Personal Identifiable Information). 

621 This can be useful in troubleshooting broker behaviors. 

622 The default behavior is False. 

623 

624 New in version 1.24.0. 

625 

626 :param str oidc_authority: 

627 *Added in version 1.28.0*: 

628 It is a URL that identifies an OpenID Connect (OIDC) authority of 

629 the format ``https://contoso.com/tenant``. 

630 MSAL will append ".well-known/openid-configuration" to the authority 

631 and retrieve the OIDC metadata from there, to figure out the endpoints. 

632 

633 Note: Broker will NOT be used for OIDC authority. 

634 """ 

635 self.client_id = client_id 

636 self.client_credential = client_credential 

637 self.client_claims = client_claims 

638 self._client_capabilities = client_capabilities 

639 self._instance_discovery = instance_discovery 

640 

641 if exclude_scopes and not isinstance(exclude_scopes, list): 

642 raise ValueError( 

643 "Invalid exclude_scopes={}. It need to be a list of strings.".format( 

644 repr(exclude_scopes))) 

645 self._exclude_scopes = frozenset(exclude_scopes or []) 

646 if "openid" in self._exclude_scopes: 

647 raise ValueError( 

648 'Invalid exclude_scopes={}. You can not opt out "openid" scope'.format( 

649 repr(exclude_scopes))) 

650 

651 if http_client: 

652 self.http_client = http_client 

653 else: 

654 import requests # Lazy load 

655 

656 self.http_client = requests.Session() 

657 self.http_client.verify = verify 

658 self.http_client.proxies = proxies 

659 # Requests, does not support session - wide timeout 

660 # But you can patch that (https://github.com/psf/requests/issues/3341): 

661 self.http_client.request = functools.partial( 

662 self.http_client.request, timeout=timeout) 

663 

664 # Enable a minimal retry. Better than nothing. 

665 # https://github.com/psf/requests/blob/v2.25.1/requests/adapters.py#L94-L108 

666 a = requests.adapters.HTTPAdapter(max_retries=1) 

667 self.http_client.mount("http://", a) 

668 self.http_client.mount("https://", a) 

669 self.http_client = ThrottledHttpClient( 

670 self.http_client, 

671 http_cache=http_cache, 

672 default_throttle_time=60 

673 # The default value 60 was recommended mainly for PCA at the end of 

674 # https://identitydivision.visualstudio.com/devex/_git/AuthLibrariesApiReview?version=GBdev&path=%2FService%20protection%2FIntial%20set%20of%20protection%20measures.md&_a=preview 

675 if isinstance(self, PublicClientApplication) else 5, 

676 ) 

677 

678 self.app_name = app_name 

679 self.app_version = app_version 

680 

681 # Here the self.authority will not be the same type as authority in input 

682 if oidc_authority and authority: 

683 raise ValueError("You can not provide both authority and oidc_authority") 

684 if isinstance(authority, str) and urlparse(authority).path.startswith( 

685 "/dstsv2"): # dSTS authority's path always starts with "/dstsv2" 

686 oidc_authority = authority # So we treat it as if an oidc_authority 

687 try: 

688 authority_to_use = authority or "https://{}/common/".format(WORLD_WIDE) 

689 self.authority = Authority( 

690 authority_to_use, 

691 self.http_client, 

692 validate_authority=validate_authority, 

693 instance_discovery=self._instance_discovery, 

694 oidc_authority_url=oidc_authority, 

695 ) 

696 except ValueError: # Those are explicit authority validation errors 

697 raise 

698 except Exception: # The rest are typically connection errors 

699 if validate_authority and not oidc_authority and ( 

700 azure_region # Opted in to use region 

701 or (azure_region is None and os.getenv("MSAL_FORCE_REGION")) # Will use region 

702 ): 

703 # Since caller opts in to use region, here we tolerate connection 

704 # errors happened during authority validation at non-region endpoint 

705 self.authority = Authority( 

706 authority_to_use, 

707 self.http_client, 

708 instance_discovery=False, 

709 ) 

710 else: 

711 raise 

712 

713 self._decide_broker(allow_broker, enable_pii_log) 

714 self.token_cache = token_cache or TokenCache() 

715 self._region_configured = azure_region 

716 self._region_detected = None 

717 self.client, self._regional_client = self._build_client( 

718 client_credential, self.authority) 

719 # Warn if using a static string/bytes client_assertion (discouraged for long-running apps) 

720 if isinstance(client_credential, dict) and isinstance( 

721 client_credential.get("client_assertion"), (str, bytes)): 

722 warnings.warn( 

723 "Passing a static string/bytes 'client_assertion' is " 

724 "discouraged because the JWT will eventually expire. " 

725 "Pass a no-arg callable instead (optionally wrapped in " 

726 "msal.AutoRefresher) so MSAL can obtain a fresh " 

727 "assertion on demand. " 

728 "See https://github.com/AzureAD/microsoft-authentication-library-for-python/issues/746", 

729 DeprecationWarning, stacklevel=2) 

730 

731 self.authority_groups = {} 

732 self._telemetry_buffer = {} 

733 self._telemetry_lock = Lock() 

734 _msal_extension_check() 

735 

736 

737 def _decide_broker(self, allow_broker, enable_pii_log): 

738 is_confidential_app = self.client_credential or isinstance( 

739 self, ConfidentialClientApplication) 

740 if is_confidential_app and allow_broker: 

741 raise ValueError("allow_broker=True is only supported in PublicClientApplication") 

742 # Historically, we chose to support ClientApplication("client_id", allow_broker=True) 

743 if allow_broker: 

744 warnings.warn( 

745 "allow_broker is deprecated. " 

746 "Please use PublicClientApplication(..., " 

747 "enable_broker_on_windows=True, " 

748 # No need to mention non-Windows platforms, because allow_broker is only for Windows 

749 "...)", 

750 DeprecationWarning) 

751 opted_in_for_broker = ( 

752 self._enable_broker # True means Opted-in from PCA 

753 or ( 

754 # When we started the broker project on Windows platform, 

755 # the allow_broker was meant to be cross-platform. Now we realize 

756 # that other platforms have different redirect_uri requirements, 

757 # so the old allow_broker is deprecated and will only for Windows. 

758 allow_broker and sys.platform == "win32") 

759 ) 

760 self._enable_broker = ( # This same variable will also store the state 

761 opted_in_for_broker 

762 and not is_confidential_app 

763 and not self.authority.is_adfs 

764 and not self.authority._is_b2c 

765 ) 

766 if self._enable_broker: 

767 try: 

768 _init_broker(enable_pii_log) 

769 except RuntimeError: 

770 self._enable_broker = False 

771 logger.warning( # It is common on Mac and Linux where broker is not built-in 

772 "Broker is unavailable on this platform. " 

773 "We will fallback to non-broker.") 

774 logger.debug("Broker enabled? %s", self._enable_broker) 

775 

776 def is_pop_supported(self): 

777 """Returns True if this client supports Proof-of-Possession Access Token.""" 

778 return self._enable_broker and sys.platform in ("win32", "darwin") 

779 

780 def _decorate_scope( 

781 self, scopes, 

782 reserved_scope=frozenset(['openid', 'profile', 'offline_access'])): 

783 if not isinstance(scopes, (list, set, tuple)): 

784 raise ValueError("The input scopes should be a list, tuple, or set") 

785 scope_set = set(scopes) # Input scopes is typically a list. Copy it to a set. 

786 if scope_set & reserved_scope: 

787 # These scopes are reserved for the API to provide good experience. 

788 # We could make the developer pass these and then if they do they will 

789 # come back asking why they don't see refresh token or user information. 

790 raise ValueError( 

791 """You cannot use any scope value that is reserved. 

792Your input: {} 

793The reserved list: {}""".format(list(scope_set), list(reserved_scope))) 

794 raise ValueError( 

795 "You cannot use any scope value that is in this reserved list: {}".format( 

796 list(reserved_scope))) 

797 

798 # client_id can also be used as a scope in B2C 

799 decorated = scope_set | reserved_scope 

800 decorated -= self._exclude_scopes 

801 return list(decorated) 

802 

803 def _build_telemetry_context( 

804 self, api_id, correlation_id=None, refresh_reason=None): 

805 return msal.telemetry._TelemetryContext( 

806 self._telemetry_buffer, self._telemetry_lock, api_id, 

807 correlation_id=correlation_id, refresh_reason=refresh_reason) 

808 

809 def _get_regional_authority(self, central_authority) -> Optional[Authority]: 

810 if self._region_configured is False: # User opts out of ESTS-R 

811 return None # Short circuit to completely bypass region detection 

812 if self._region_configured is None: # User did not make an ESTS-R choice 

813 self._region_configured = os.getenv("MSAL_FORCE_REGION") or None 

814 self._region_detected = self._region_detected or _detect_region( 

815 self.http_client if self._region_configured is not None else None) 

816 if (self._region_configured != self.ATTEMPT_REGION_DISCOVERY 

817 and self._region_configured != self._region_detected): 

818 logger.warning('Region configured ({}) != region detected ({})'.format( 

819 repr(self._region_configured), repr(self._region_detected))) 

820 region_to_use = ( 

821 self._region_detected 

822 if self._region_configured == self.ATTEMPT_REGION_DISCOVERY 

823 else self._region_configured) # It will retain the None i.e. opted out 

824 logger.debug('Region to be used: {}'.format(repr(region_to_use))) 

825 if region_to_use: 

826 regional_host = ("{}.login.microsoft.com".format(region_to_use) 

827 if central_authority.instance in ( 

828 # The list came from point 3 of the algorithm section in this internal doc 

829 # https://identitydivision.visualstudio.com/DevEx/_git/AuthLibrariesApiReview?path=/PinAuthToRegion/AAD%20SDK%20Proposal%20to%20Pin%20Auth%20to%20region.md&anchor=algorithm&_a=preview 

830 "login.microsoftonline.com", 

831 "login.microsoft.com", 

832 "login.windows.net", 

833 "sts.windows.net", 

834 ) 

835 else "{}.{}".format(region_to_use, central_authority.instance)) 

836 return Authority( # The central_authority has already been validated 

837 "https://{}/{}".format(regional_host, central_authority.tenant), 

838 self.http_client, 

839 instance_discovery=False, 

840 ) 

841 return None 

842 

843 def _build_client(self, client_credential, authority, skip_regional_client=False): 

844 client_assertion = None 

845 client_assertion_type = None 

846 default_headers = { 

847 "x-client-sku": SKU, "x-client-ver": __version__, 

848 "x-client-os": sys.platform, 

849 "x-ms-lib-capability": "retry-after, h429", 

850 } 

851 if self.app_name: 

852 default_headers['x-app-name'] = self.app_name 

853 if self.app_version: 

854 default_headers['x-app-ver'] = self.app_version 

855 default_body = {"client_info": 1} 

856 if isinstance(client_credential, dict): 

857 client_assertion_type = Client.CLIENT_ASSERTION_TYPE_JWT 

858 # Use client_credential.get("...") rather than "..." in client_credential 

859 # so that we can ignore an empty string came from an empty ENV VAR. 

860 if client_credential.get("client_assertion"): 

861 client_assertion = client_credential['client_assertion'] 

862 else: 

863 headers = {} 

864 sha1_thumbprint = sha256_thumbprint = None 

865 passphrase_bytes = _str2bytes( 

866 client_credential["passphrase"] 

867 ) if client_credential.get("passphrase") else None 

868 if client_credential.get("private_key_pfx_path"): 

869 private_key, sha256_thumbprint, sha1_thumbprint, x5c = _parse_pfx( 

870 client_credential["private_key_pfx_path"], 

871 passphrase_bytes) 

872 if client_credential.get("public_certificate") is True and x5c: 

873 headers["x5c"] = x5c 

874 elif client_credential.get("private_key"): # PEM blob 

875 private_key = ( # handles both encrypted and unencrypted 

876 _load_private_key_from_pem_str( 

877 client_credential['private_key'], passphrase_bytes) 

878 if passphrase_bytes 

879 else client_credential['private_key'] 

880 ) 

881 

882 # Determine thumbprints based on what's provided 

883 if client_credential.get("thumbprint"): 

884 # User provided a thumbprint - use it as SHA-1 (legacy/manual approach) 

885 sha1_thumbprint = client_credential["thumbprint"] 

886 sha256_thumbprint = None 

887 elif isinstance(client_credential.get('public_certificate'), str): 

888 # No thumbprint provided, but we have a certificate to calculate thumbprints 

889 from cryptography import x509 

890 cert = x509.load_pem_x509_certificate( 

891 _str2bytes(client_credential['public_certificate'])) 

892 sha256_thumbprint, sha1_thumbprint, headers["x5c"] = ( 

893 _extract_cert_and_thumbprints(cert)) 

894 else: 

895 raise ValueError( 

896 "You must provide either 'thumbprint' or 'public_certificate' " 

897 "from which the thumbprint can be calculated.") 

898 else: 

899 raise ValueError( 

900 "client_credential needs to follow this format " 

901 "https://msal-python.readthedocs.io/en/latest/#msal.ClientApplication.params.client_credential") 

902 if ("x5c" not in headers # So the .pfx file contains no certificate 

903 and isinstance(client_credential.get('public_certificate'), str) 

904 ): # Then we treat the public_certificate value as PEM content 

905 headers["x5c"] = extract_certs(client_credential['public_certificate']) 

906 if sha256_thumbprint and not authority.is_adfs: 

907 assertion_params = { 

908 "algorithm": "PS256", "sha256_thumbprint": sha256_thumbprint, 

909 } 

910 else: # Fall back 

911 if not sha1_thumbprint: 

912 raise ValueError("You shall provide a thumbprint in SHA1.") 

913 assertion_params = { 

914 "algorithm": "RS256", "sha1_thumbprint": sha1_thumbprint, 

915 } 

916 assertion = JwtAssertionCreator( 

917 private_key, headers=headers, **assertion_params) 

918 client_assertion = assertion.create_regenerative_assertion( 

919 audience=authority.token_endpoint, issuer=self.client_id, 

920 additional_claims=self.client_claims or {}) 

921 else: 

922 default_body['client_secret'] = client_credential 

923 central_configuration = { 

924 "authorization_endpoint": authority.authorization_endpoint, 

925 "token_endpoint": authority.token_endpoint, 

926 "device_authorization_endpoint": authority.device_authorization_endpoint, 

927 } 

928 central_client = _ClientWithCcsRoutingInfo( 

929 central_configuration, 

930 self.client_id, 

931 http_client=self.http_client, 

932 default_headers=default_headers, 

933 default_body=default_body, 

934 client_assertion=client_assertion, 

935 client_assertion_type=client_assertion_type, 

936 on_obtaining_tokens=lambda event: self.token_cache.add(dict( 

937 event, environment=authority.instance)), 

938 on_removing_rt=self.token_cache.remove_rt, 

939 on_updating_rt=self.token_cache.update_rt) 

940 

941 regional_client = None 

942 if (client_credential # Currently regional endpoint only serves some CCA flows 

943 and not skip_regional_client): 

944 regional_authority = self._get_regional_authority(authority) 

945 if regional_authority: 

946 regional_configuration = { 

947 "authorization_endpoint": regional_authority.authorization_endpoint, 

948 "token_endpoint": regional_authority.token_endpoint, 

949 "device_authorization_endpoint": 

950 regional_authority.device_authorization_endpoint, 

951 } 

952 regional_client = _ClientWithCcsRoutingInfo( 

953 regional_configuration, 

954 self.client_id, 

955 http_client=self.http_client, 

956 default_headers=default_headers, 

957 default_body=default_body, 

958 client_assertion=client_assertion, 

959 client_assertion_type=client_assertion_type, 

960 on_obtaining_tokens=lambda event: self.token_cache.add(dict( 

961 event, environment=authority.instance)), 

962 on_removing_rt=self.token_cache.remove_rt, 

963 on_updating_rt=self.token_cache.update_rt) 

964 return central_client, regional_client 

965 

966 def initiate_auth_code_flow( 

967 self, 

968 scopes, # type: list[str] 

969 redirect_uri=None, 

970 state=None, # Recommended by OAuth2 for CSRF protection 

971 prompt=None, 

972 login_hint=None, # type: Optional[str] 

973 domain_hint=None, # type: Optional[str] 

974 claims_challenge=None, 

975 max_age=None, 

976 response_mode=None, # type: Optional[str] 

977 ): 

978 """Initiate an auth code flow. 

979 

980 Later when the response reaches your redirect_uri, 

981 you can use :func:`~acquire_token_by_auth_code_flow()` 

982 to complete the authentication/authorization. 

983 

984 :param list scopes: 

985 It is a list of case-sensitive strings. 

986 :param str redirect_uri: 

987 Optional. If not specified, server will use the pre-registered one. 

988 :param str state: 

989 An opaque value used by the client to 

990 maintain state between the request and callback. 

991 If absent, this library will automatically generate one internally. 

992 :param str prompt: 

993 By default, no prompt value will be sent, not even string ``"none"``. 

994 You will have to specify a value explicitly. 

995 Its valid values are the constants defined in 

996 :class:`Prompt <msal.Prompt>`. 

997 

998 :param str login_hint: 

999 Optional. Identifier of the user. Generally a User Principal Name (UPN). 

1000 :param domain_hint: 

1001 Can be one of "consumers" or "organizations" or your tenant domain "contoso.com". 

1002 If included, it will skip the email-based discovery process that user goes 

1003 through on the sign-in page, leading to a slightly more streamlined user experience. 

1004 More information on possible values available in 

1005 `Auth Code Flow doc <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-auth-code-flow#request-an-authorization-code>`_ and 

1006 `domain_hint doc <https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-oapx/86fb452d-e34a-494e-ac61-e526e263b6d8>`_. 

1007 

1008 :param int max_age: 

1009 OPTIONAL. Maximum Authentication Age. 

1010 Specifies the allowable elapsed time in seconds 

1011 since the last time the End-User was actively authenticated. 

1012 If the elapsed time is greater than this value, 

1013 Microsoft identity platform will actively re-authenticate the End-User. 

1014 

1015 MSAL Python will also automatically validate the auth_time in ID token. 

1016 

1017 New in version 1.15. 

1018 

1019 :param str response_mode: 

1020 OPTIONAL. Specifies the method with which response parameters should be returned. 

1021 The default value is equivalent to ``query``, which was still secure enough in MSAL Python 

1022 (because MSAL Python does not transfer tokens via query parameter in the first place). 

1023 For even better security, we recommend using the value ``form_post``. 

1024 In "form_post" mode, response parameters 

1025 will be encoded as HTML form values that are transmitted via the HTTP POST method and 

1026 encoded in the body using the application/x-www-form-urlencoded format. 

1027 Valid values can be either "form_post" for HTTP POST to callback URI or 

1028 "query" (the default) for HTTP GET with parameters encoded in query string. 

1029 More information on possible values 

1030 `here <https://openid.net/specs/oauth-v2-multiple-response-types-1_0.html#ResponseModes>` 

1031 and `here <https://openid.net/specs/oauth-v2-form-post-response-mode-1_0.html#FormPostResponseMode>` 

1032 

1033 .. note:: 

1034 You should configure your web framework to accept form_post responses instead of query responses. 

1035 While this parameter still works, it will be removed in a future version. 

1036 Using query-based response modes is less secure and should be avoided. 

1037 

1038 :return: 

1039 The auth code flow. It is a dict in this form:: 

1040 

1041 { 

1042 "auth_uri": "https://...", // Guide user to visit this 

1043 "state": "...", // You may choose to verify it by yourself, 

1044 // or just let acquire_token_by_auth_code_flow() 

1045 // do that for you. 

1046 "...": "...", // Everything else are reserved and internal 

1047 } 

1048 

1049 The caller is expected to: 

1050 

1051 1. somehow store this content, typically inside the current session, 

1052 2. guide the end user (i.e. resource owner) to visit that auth_uri, 

1053 3. and then relay this dict and subsequent auth response to 

1054 :func:`~acquire_token_by_auth_code_flow()`. 

1055 """ 

1056 # Note to maintainers: Do not emit warning for the use of response_mode here, 

1057 # because response_mode=form_post is still the recommended usage for MSAL Python 1.x. 

1058 # App developers making the right call shall not be disturbed by unactionable warnings. 

1059 client = _ClientWithCcsRoutingInfo( 

1060 {"authorization_endpoint": self.authority.authorization_endpoint}, 

1061 self.client_id, 

1062 http_client=self.http_client) 

1063 flow = client.initiate_auth_code_flow( 

1064 redirect_uri=redirect_uri, state=state, login_hint=login_hint, 

1065 prompt=prompt, 

1066 scope=self._decorate_scope(scopes), 

1067 domain_hint=domain_hint, 

1068 claims=_merge_claims_challenge_and_capabilities( 

1069 self._client_capabilities, claims_challenge), 

1070 max_age=max_age, 

1071 response_mode=response_mode, 

1072 ) 

1073 flow["claims_challenge"] = claims_challenge 

1074 return flow 

1075 

1076 def get_authorization_request_url( 

1077 self, 

1078 scopes, # type: list[str] 

1079 login_hint=None, # type: Optional[str] 

1080 state=None, # Recommended by OAuth2 for CSRF protection 

1081 redirect_uri=None, 

1082 response_type="code", # Could be "token" if you use Implicit Grant 

1083 prompt=None, 

1084 nonce=None, 

1085 domain_hint=None, # type: Optional[str] 

1086 claims_challenge=None, 

1087 **kwargs): 

1088 """Constructs a URL for you to start a Authorization Code Grant. 

1089 

1090 :param list[str] scopes: (Required) 

1091 Scopes requested to access a protected API (a resource). 

1092 :param str state: Recommended by OAuth2 for CSRF protection. 

1093 :param str login_hint: 

1094 Identifier of the user. Generally a User Principal Name (UPN). 

1095 :param str redirect_uri: 

1096 Address to return to upon receiving a response from the authority. 

1097 :param str response_type: 

1098 Default value is "code" for an OAuth2 Authorization Code grant. 

1099 

1100 You could use other content such as "id_token" or "token", 

1101 which would trigger an Implicit Grant, but that is 

1102 `not recommended <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-implicit-grant-flow#is-the-implicit-grant-suitable-for-my-app>`_. 

1103 

1104 :param str prompt: 

1105 By default, no prompt value will be sent, not even string ``"none"``. 

1106 You will have to specify a value explicitly. 

1107 Its valid values are the constants defined in 

1108 :class:`Prompt <msal.Prompt>`. 

1109 :param nonce: 

1110 A cryptographically random value used to mitigate replay attacks. See also 

1111 `OIDC specs <https://openid.net/specs/openid-connect-core-1_0.html#AuthRequest>`_. 

1112 :param domain_hint: 

1113 Can be one of "consumers" or "organizations" or your tenant domain "contoso.com". 

1114 If included, it will skip the email-based discovery process that user goes 

1115 through on the sign-in page, leading to a slightly more streamlined user experience. 

1116 More information on possible values available in 

1117 `Auth Code Flow doc <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-auth-code-flow#request-an-authorization-code>`_ and 

1118 `domain_hint doc <https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-oapx/86fb452d-e34a-494e-ac61-e526e263b6d8>`_. 

1119 :param claims_challenge: 

1120 The claims_challenge parameter requests specific claims requested by the resource provider 

1121 in the form of a claims_challenge directive in the www-authenticate header to be 

1122 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

1123 It is a string of a JSON object which contains lists of claims being requested from these locations. 

1124 

1125 :return: The authorization url as a string. 

1126 """ 

1127 authority = kwargs.pop("authority", None) # Historically we support this 

1128 if authority: 

1129 warnings.warn( 

1130 "We haven't decided if this method will accept authority parameter") 

1131 # The previous implementation is, it will use self.authority by default. 

1132 # Multi-tenant app can use new authority on demand 

1133 the_authority = Authority( 

1134 authority, 

1135 self.http_client, 

1136 instance_discovery=self._instance_discovery, 

1137 ) if authority else self.authority 

1138 

1139 client = _ClientWithCcsRoutingInfo( 

1140 {"authorization_endpoint": the_authority.authorization_endpoint}, 

1141 self.client_id, 

1142 http_client=self.http_client) 

1143 warnings.warn( 

1144 "Change your get_authorization_request_url() " 

1145 "to initiate_auth_code_flow()", DeprecationWarning) 

1146 with warnings.catch_warnings(record=True): 

1147 return client.build_auth_request_uri( 

1148 response_type=response_type, 

1149 redirect_uri=redirect_uri, state=state, login_hint=login_hint, 

1150 prompt=prompt, 

1151 scope=self._decorate_scope(scopes), 

1152 nonce=nonce, 

1153 domain_hint=domain_hint, 

1154 claims=_merge_claims_challenge_and_capabilities( 

1155 self._client_capabilities, claims_challenge), 

1156 ) 

1157 

1158 def acquire_token_by_auth_code_flow( 

1159 self, auth_code_flow, auth_response, scopes=None, **kwargs): 

1160 """Validate the auth response being redirected back, and obtain tokens. 

1161 

1162 It automatically provides nonce protection. 

1163 

1164 :param dict auth_code_flow: 

1165 The same dict returned by :func:`~initiate_auth_code_flow()`. 

1166 :param dict auth_response: 

1167 A dict of the query string received from auth server. 

1168 :param list[str] scopes: 

1169 Scopes requested to access a protected API (a resource). 

1170 

1171 Most of the time, you can leave it empty. 

1172 

1173 If you requested user consent for multiple resources, here you will 

1174 need to provide a subset of what you required in 

1175 :func:`~initiate_auth_code_flow()`. 

1176 

1177 OAuth2 was designed mostly for singleton services, 

1178 where tokens are always meant for the same resource and the only 

1179 changes are in the scopes. 

1180 In Microsoft Entra, tokens can be issued for multiple 3rd party resources. 

1181 You can ask authorization code for multiple resources, 

1182 but when you redeem it, the token is for only one intended 

1183 recipient, called audience. 

1184 So the developer need to specify a scope so that we can restrict the 

1185 token to be issued for the corresponding audience. 

1186 

1187 :return: 

1188 * A dict containing "access_token" and/or "id_token", among others, 

1189 depends on what scope was used. 

1190 (See https://tools.ietf.org/html/rfc6749#section-5.1) 

1191 * A dict containing "error", optionally "error_description", "error_uri". 

1192 (It is either `this <https://tools.ietf.org/html/rfc6749#section-4.1.2.1>`_ 

1193 or `that <https://tools.ietf.org/html/rfc6749#section-5.2>`_) 

1194 * Most client-side data error would result in ValueError exception. 

1195 So the usage pattern could be without any protocol details:: 

1196 

1197 def authorize(): # A controller in a web app 

1198 try: 

1199 result = msal_app.acquire_token_by_auth_code_flow( 

1200 session.get("flow", {}), request.args) 

1201 if "error" in result: 

1202 return render_template("error.html", result) 

1203 use(result) # Token(s) are available in result and cache 

1204 except ValueError: # Usually caused by CSRF 

1205 pass # Simply ignore them 

1206 return redirect(url_for("index")) 

1207 """ 

1208 self._validate_ssh_cert_input_data(kwargs.get("data", {})) 

1209 telemetry_context = self._build_telemetry_context( 

1210 self.ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID) 

1211 response = _clean_up(self.client.obtain_token_by_auth_code_flow( 

1212 auth_code_flow, 

1213 auth_response, 

1214 scope=self._decorate_scope(scopes) if scopes else None, 

1215 headers=telemetry_context.generate_headers(), 

1216 data=dict( 

1217 kwargs.pop("data", {}), 

1218 claims=_merge_claims_challenge_and_capabilities( 

1219 self._client_capabilities, 

1220 auth_code_flow.pop("claims_challenge", None))), 

1221 **kwargs)) 

1222 if "access_token" in response: 

1223 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

1224 telemetry_context.update_telemetry(response) 

1225 return response 

1226 

1227 def acquire_token_by_authorization_code( 

1228 self, 

1229 code, 

1230 scopes, # Syntactically required. STS accepts empty value though. 

1231 redirect_uri=None, 

1232 # REQUIRED, if the "redirect_uri" parameter was included in the 

1233 # authorization request as described in Section 4.1.1, and their 

1234 # values MUST be identical. 

1235 nonce=None, 

1236 claims_challenge=None, 

1237 **kwargs): 

1238 """The second half of the Authorization Code Grant. 

1239 

1240 :param code: The authorization code returned from Authorization Server. 

1241 :param list[str] scopes: (Required) 

1242 Scopes requested to access a protected API (a resource). 

1243 

1244 If you requested user consent for multiple resources, here you will 

1245 typically want to provide a subset of what you required in AuthCode. 

1246 

1247 OAuth2 was designed mostly for singleton services, 

1248 where tokens are always meant for the same resource and the only 

1249 changes are in the scopes. 

1250 In Microsoft Entra, tokens can be issued for multiple 3rd party resources. 

1251 You can ask authorization code for multiple resources, 

1252 but when you redeem it, the token is for only one intended 

1253 recipient, called audience. 

1254 So the developer need to specify a scope so that we can restrict the 

1255 token to be issued for the corresponding audience. 

1256 

1257 :param nonce: 

1258 If you provided a nonce when calling :func:`get_authorization_request_url`, 

1259 same nonce should also be provided here, so that we'll validate it. 

1260 An exception will be raised if the nonce in id token mismatches. 

1261 

1262 :param claims_challenge: 

1263 The claims_challenge parameter requests specific claims requested by the resource provider 

1264 in the form of a claims_challenge directive in the www-authenticate header to be 

1265 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

1266 It is a string of a JSON object which contains lists of claims being requested from these locations. 

1267 

1268 :return: A dict representing the json response from Microsoft Entra: 

1269 

1270 - A successful response would contain "access_token" key, 

1271 - an error response would contain "error" and usually "error_description". 

1272 """ 

1273 # If scope is absent on the wire, STS will give you a token associated 

1274 # to the FIRST scope sent during the authorization request. 

1275 # So in theory, you can omit scope here when you were working with only 

1276 # one scope. But, MSAL decorates your scope anyway, so they are never 

1277 # really empty. 

1278 assert isinstance(scopes, list), "Invalid parameter type" 

1279 self._validate_ssh_cert_input_data(kwargs.get("data", {})) 

1280 warnings.warn( 

1281 "Change your acquire_token_by_authorization_code() " 

1282 "to acquire_token_by_auth_code_flow()", DeprecationWarning) 

1283 with warnings.catch_warnings(record=True): 

1284 telemetry_context = self._build_telemetry_context( 

1285 self.ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID) 

1286 response = _clean_up(self.client.obtain_token_by_authorization_code( 

1287 code, redirect_uri=redirect_uri, 

1288 scope=self._decorate_scope(scopes), 

1289 headers=telemetry_context.generate_headers(), 

1290 data=dict( 

1291 kwargs.pop("data", {}), 

1292 claims=_merge_claims_challenge_and_capabilities( 

1293 self._client_capabilities, claims_challenge)), 

1294 nonce=nonce, 

1295 **kwargs)) 

1296 if "access_token" in response: 

1297 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

1298 telemetry_context.update_telemetry(response) 

1299 return response 

1300 

1301 def get_accounts(self, username=None): 

1302 """Get a list of accounts which previously signed in, i.e. exists in cache. 

1303 

1304 An account can later be used in :func:`~acquire_token_silent` 

1305 to find its tokens. 

1306 

1307 :param username: 

1308 Filter accounts with this username only. Case insensitive. 

1309 :return: A list of account objects. 

1310 Each account is a dict. For now, we only document its "username" field. 

1311 Your app can choose to display those information to end user, 

1312 and allow user to choose one of his/her accounts to proceed. 

1313 """ 

1314 accounts = self._find_msal_accounts(environment=self.authority.instance) 

1315 if not accounts: # Now try other aliases of this authority instance 

1316 for alias in self._get_authority_aliases(self.authority.instance): 

1317 accounts = self._find_msal_accounts(environment=alias) 

1318 if accounts: 

1319 break 

1320 if username: 

1321 # Federated account["username"] from AAD could contain mixed case 

1322 lowercase_username = username.lower() 

1323 accounts = [a for a in accounts 

1324 if a["username"].lower() == lowercase_username] 

1325 if not accounts: 

1326 logger.debug(( # This would also happen when the cache is empty 

1327 "get_accounts(username='{}') finds no account. " 

1328 "If tokens were acquired without 'profile' scope, " 

1329 "they would contain no username for filtering. " 

1330 "Consider calling get_accounts(username=None) instead." 

1331 ).format(username)) 

1332 # Does not further filter by existing RTs here. It probably won't matter. 

1333 # Because in most cases Accounts and RTs co-exist. 

1334 # Even in the rare case when an RT is revoked and then removed, 

1335 # acquire_token_silent() would then yield no result, 

1336 # apps would fall back to other acquire methods. This is the standard pattern. 

1337 return accounts 

1338 

1339 def _find_msal_accounts(self, environment): 

1340 interested_authority_types = [ 

1341 TokenCache.AuthorityType.ADFS, TokenCache.AuthorityType.MSSTS] 

1342 if _is_running_in_cloud_shell(): 

1343 interested_authority_types.append(_AUTHORITY_TYPE_CLOUDSHELL) 

1344 grouped_accounts = { 

1345 a.get("home_account_id"): # Grouped by home tenant's id 

1346 { # These are minimal amount of non-tenant-specific account info 

1347 "home_account_id": a.get("home_account_id"), 

1348 "environment": a.get("environment"), 

1349 "username": a.get("username"), 

1350 "account_source": a.get("account_source"), 

1351 

1352 # The following fields for backward compatibility, for now 

1353 "authority_type": a.get("authority_type"), 

1354 "local_account_id": a.get("local_account_id"), # Tenant-specific 

1355 "realm": a.get("realm"), # Tenant-specific 

1356 } 

1357 for a in self.token_cache.search( 

1358 TokenCache.CredentialType.ACCOUNT, 

1359 query={"environment": environment}) 

1360 if a["authority_type"] in interested_authority_types 

1361 } 

1362 return list(grouped_accounts.values()) 

1363 

1364 def _get_instance_metadata(self, instance): # This exists so it can be mocked in unit test 

1365 instance_discovery_host = _get_instance_discovery_host(instance) 

1366 resp = self.http_client.get( 

1367 _get_instance_discovery_endpoint(instance), 

1368 params={ 

1369 'api-version': '1.1', 

1370 'authorization_endpoint': ( 

1371 "https://{}/common/oauth2/authorize".format(instance_discovery_host) 

1372 ), 

1373 }, 

1374 headers={'Accept': 'application/json'}) 

1375 resp.raise_for_status() 

1376 return json.loads(resp.text)['metadata'] 

1377 

1378 def _get_authority_aliases(self, instance): 

1379 if self._instance_discovery is False: 

1380 return [] 

1381 if self.authority._is_known_to_developer: 

1382 # Then it is an ADFS/B2C/known_authority_hosts situation 

1383 # which may not reach the central endpoint, so we skip it. 

1384 return [] 

1385 if instance not in self.authority_groups: 

1386 self.authority_groups[instance] = [ 

1387 set(group['aliases']) for group in self._get_instance_metadata(instance)] 

1388 for group in self.authority_groups[instance]: 

1389 if instance in group: 

1390 return [alias for alias in group if alias != instance] 

1391 return [] 

1392 

1393 def remove_account(self, account): 

1394 """Sign me out and forget me from token cache""" 

1395 if self._enable_broker: 

1396 from .broker import _signout_silently 

1397 error = _signout_silently(self.client_id, account["local_account_id"]) 

1398 if error: 

1399 logger.debug("_signout_silently() returns error: %s", error) 

1400 # Broker sign-out has been attempted, even if the _forget_me() below throws. 

1401 self._forget_me(account) 

1402 

1403 def _sign_out(self, home_account): 

1404 # Remove all relevant RTs and ATs from token cache 

1405 owned_by_home_account = { 

1406 "environment": home_account["environment"], 

1407 "home_account_id": home_account["home_account_id"],} # realm-independent 

1408 app_metadata = self._get_app_metadata(home_account["environment"]) 

1409 # Remove RTs/FRTs, and they are realm-independent 

1410 for rt in [ # Remove RTs from a static list (rather than from a dynamic generator), 

1411 # to avoid changing self.token_cache while it is being iterated 

1412 rt for rt in self.token_cache.search( 

1413 TokenCache.CredentialType.REFRESH_TOKEN, query=owned_by_home_account) 

1414 # Do RT's app ownership check as a precaution, in case family apps 

1415 # and 3rd-party apps share same token cache, although they should not. 

1416 if rt["client_id"] == self.client_id or ( 

1417 app_metadata.get("family_id") # Now let's settle family business 

1418 and rt.get("family_id") == app_metadata["family_id"]) 

1419 ]: 

1420 self.token_cache.remove_rt(rt) 

1421 for at in list(self.token_cache.search( # Remove ATs from a static list, 

1422 # to avoid changing self.token_cache while it is being iterated 

1423 TokenCache.CredentialType.ACCESS_TOKEN, query=owned_by_home_account, 

1424 # Regardless of realm, b/c we've removed realm-independent RTs anyway 

1425 )): 

1426 # To avoid the complexity of locating sibling family app's AT, 

1427 # we skip AT's app ownership check. 

1428 # It means ATs for other apps will also be removed, it is OK because: 

1429 # * non-family apps are not supposed to share token cache to begin with; 

1430 # * Even if it happens, we keep other app's RT already, so SSO still works 

1431 self.token_cache.remove_at(at) 

1432 

1433 def _forget_me(self, home_account): 

1434 # It implies signout, and then also remove all relevant accounts and IDTs 

1435 self._sign_out(home_account) 

1436 owned_by_home_account = { 

1437 "environment": home_account["environment"], 

1438 "home_account_id": home_account["home_account_id"],} # realm-independent 

1439 for idt in list(self.token_cache.search( # Remove IDTs from a static list, 

1440 # to avoid changing self.token_cache while it is being iterated 

1441 TokenCache.CredentialType.ID_TOKEN, query=owned_by_home_account, # regardless of realm 

1442 )): 

1443 self.token_cache.remove_idt(idt) 

1444 for a in list(self.token_cache.search( # Remove Accounts from a static list, 

1445 # to avoid changing self.token_cache while it is being iterated 

1446 TokenCache.CredentialType.ACCOUNT, query=owned_by_home_account, # regardless of realm 

1447 )): 

1448 self.token_cache.remove_account(a) 

1449 

1450 def _acquire_token_by_cloud_shell(self, scopes, data=None): 

1451 from .cloudshell import _obtain_token 

1452 response = _obtain_token( 

1453 self.http_client, scopes, client_id=self.client_id, data=data) 

1454 if "error" not in response: 

1455 self.token_cache.add(dict( 

1456 client_id=self.client_id, 

1457 scope=response["scope"].split() if "scope" in response else scopes, 

1458 token_endpoint=self.authority.token_endpoint, 

1459 response=response, 

1460 data=data or {}, 

1461 authority_type=_AUTHORITY_TYPE_CLOUDSHELL, 

1462 )) 

1463 if "access_token" in response: 

1464 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_BROKER 

1465 return response 

1466 

1467 def acquire_token_silent( 

1468 self, 

1469 scopes, # type: List[str] 

1470 account, # type: Optional[Account] 

1471 authority=None, # See get_authorization_request_url() 

1472 force_refresh=False, # type: Optional[boolean] 

1473 claims_challenge=None, 

1474 auth_scheme=None, 

1475 **kwargs): 

1476 """Acquire an access token for given account, without user interaction. 

1477 

1478 It has same parameters as the :func:`~acquire_token_silent_with_error`. 

1479 The difference is the behavior of the return value. 

1480 This method will combine the cache empty and refresh error 

1481 into one return value, `None`. 

1482 If your app does not care about the exact token refresh error during 

1483 token cache look-up, then this method is easier and recommended. 

1484 

1485 :return: 

1486 - A dict containing no "error" key, 

1487 and typically contains an "access_token" key, 

1488 if cache lookup succeeded. 

1489 - None when cache lookup does not yield a token. 

1490 """ 

1491 if not account: 

1492 return None # A backward-compatible NO-OP to drop the account=None usage 

1493 result = _clean_up(self._acquire_token_silent_with_error( 

1494 scopes, account, authority=authority, force_refresh=force_refresh, 

1495 claims_challenge=claims_challenge, auth_scheme=auth_scheme, **kwargs)) 

1496 return result if result and "error" not in result else None 

1497 

1498 def acquire_token_silent_with_error( 

1499 self, 

1500 scopes, # type: List[str] 

1501 account, # type: Optional[Account] 

1502 authority=None, # See get_authorization_request_url() 

1503 force_refresh=False, # type: Optional[boolean] 

1504 claims_challenge=None, 

1505 auth_scheme=None, 

1506 **kwargs): 

1507 """Acquire an access token for given account, without user interaction. 

1508 

1509 It is done either by finding a valid access token from cache, 

1510 or by finding a valid refresh token from cache and then automatically 

1511 use it to redeem a new access token. 

1512 

1513 This method will differentiate cache empty from token refresh error. 

1514 If your app cares the exact token refresh error during 

1515 token cache look-up, then this method is suitable. 

1516 Otherwise, the other method :func:`~acquire_token_silent` is recommended. 

1517 

1518 :param list[str] scopes: (Required) 

1519 Scopes requested to access a protected API (a resource). 

1520 :param account: (Required) 

1521 One of the account object returned by :func:`~get_accounts`. 

1522 Starting from MSAL Python 1.23, 

1523 a ``None`` input will become a NO-OP and always return ``None``. 

1524 :param force_refresh: 

1525 If True, it will skip Access Token look-up, 

1526 and try to find a Refresh Token to obtain a new Access Token. 

1527 :param claims_challenge: 

1528 The claims_challenge parameter requests specific claims requested by the resource provider 

1529 in the form of a claims_challenge directive in the www-authenticate header to be 

1530 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

1531 It is a string of a JSON object which contains lists of claims being requested from these locations. 

1532 :param object auth_scheme: 

1533 You can provide an ``msal.auth_scheme.PopAuthScheme`` object 

1534 so that MSAL will get a Proof-of-Possession (POP) token for you. 

1535 

1536 New in version 1.26.0. 

1537 

1538 :return: 

1539 - A dict containing no "error" key, 

1540 and typically contains an "access_token" key, 

1541 if cache lookup succeeded. 

1542 - None when there is simply no token in the cache. 

1543 - A dict containing an "error" key, when token refresh failed. 

1544 """ 

1545 if not account: 

1546 return None # A backward-compatible NO-OP to drop the account=None usage 

1547 return _clean_up(self._acquire_token_silent_with_error( 

1548 scopes, account, authority=authority, force_refresh=force_refresh, 

1549 claims_challenge=claims_challenge, auth_scheme=auth_scheme, **kwargs)) 

1550 

1551 def _acquire_token_silent_with_error( 

1552 self, 

1553 scopes, # type: List[str] 

1554 account, # type: Optional[Account] 

1555 authority=None, # See get_authorization_request_url() 

1556 force_refresh=False, # type: Optional[boolean] 

1557 claims_challenge=None, 

1558 auth_scheme=None, 

1559 **kwargs): 

1560 assert isinstance(scopes, list), "Invalid parameter type" 

1561 self._validate_ssh_cert_input_data(kwargs.get("data", {})) 

1562 correlation_id = msal.telemetry._get_new_correlation_id() 

1563 if authority: 

1564 warnings.warn("We haven't decided how/if this method will accept authority parameter") 

1565 # the_authority = Authority( 

1566 # authority, 

1567 # self.http_client, 

1568 # instance_discovery=self._instance_discovery, 

1569 # ) if authority else self.authority 

1570 result = self._acquire_token_silent_from_cache_and_possibly_refresh_it( 

1571 scopes, account, self.authority, force_refresh=force_refresh, 

1572 claims_challenge=claims_challenge, 

1573 correlation_id=correlation_id, 

1574 auth_scheme=auth_scheme, 

1575 **kwargs) 

1576 if result and "error" not in result: 

1577 return result 

1578 final_result = result 

1579 for alias in self._get_authority_aliases(self.authority.instance): 

1580 if not list(self.token_cache.search( # Need a list to test emptiness 

1581 self.token_cache.CredentialType.REFRESH_TOKEN, 

1582 # target=scopes, # MUST NOT filter by scopes, because: 

1583 # 1. AAD RTs are scope-independent; 

1584 # 2. therefore target is optional per schema; 

1585 query={"environment": alias})): 

1586 # Skip heavy weight logic when RT for this alias doesn't exist 

1587 continue 

1588 the_authority = Authority( 

1589 "https://" + alias + "/" + self.authority.tenant, 

1590 self.http_client, 

1591 instance_discovery=False, 

1592 ) 

1593 result = self._acquire_token_silent_from_cache_and_possibly_refresh_it( 

1594 scopes, account, the_authority, force_refresh=force_refresh, 

1595 claims_challenge=claims_challenge, 

1596 correlation_id=correlation_id, 

1597 auth_scheme=auth_scheme, 

1598 **kwargs) 

1599 if result: 

1600 if "error" not in result: 

1601 return result 

1602 final_result = result 

1603 if final_result and final_result.get("suberror"): 

1604 final_result["classification"] = { # Suppress these suberrors, per #57 

1605 "bad_token": "", 

1606 "token_expired": "", 

1607 "protection_policy_required": "", 

1608 "client_mismatch": "", 

1609 "device_authentication_failed": "", 

1610 }.get(final_result["suberror"], final_result["suberror"]) 

1611 return final_result 

1612 

1613 def _acquire_token_silent_from_cache_and_possibly_refresh_it( 

1614 self, 

1615 scopes, # type: List[str] 

1616 account, # type: Optional[Account] 

1617 authority, # This can be different than self.authority 

1618 force_refresh=False, # type: Optional[boolean] 

1619 claims_challenge=None, 

1620 correlation_id=None, 

1621 http_exceptions=None, 

1622 auth_scheme=None, 

1623 **kwargs): 

1624 # This internal method has two calling patterns: 

1625 # it accepts a non-empty account to find token for a user, 

1626 # and accepts account=None to find a token for the current app. 

1627 access_token_from_cache = None 

1628 if not (force_refresh or claims_challenge or auth_scheme): # Then attempt AT cache 

1629 query={ 

1630 "client_id": self.client_id, 

1631 "environment": authority.instance, 

1632 "realm": authority.tenant, 

1633 "home_account_id": (account or {}).get("home_account_id"), 

1634 } 

1635 key_id = kwargs.get("data", {}).get("key_id") 

1636 if key_id: # Some token types (SSH-certs, POP) are bound to a key 

1637 query["key_id"] = key_id 

1638 ext_cache_key = _compute_ext_cache_key(kwargs.get("data", {})) 

1639 if ext_cache_key: # FMI tokens need cache isolation by path 

1640 query["ext_cache_key"] = ext_cache_key 

1641 now = time.time() 

1642 refresh_reason = msal.telemetry.AT_ABSENT 

1643 for entry in self.token_cache.search( # A generator allows us to 

1644 # break early in cache-hit without finding a full list 

1645 self.token_cache.CredentialType.ACCESS_TOKEN, 

1646 target=scopes, 

1647 query=query, 

1648 ): # This loop is about token search, not about token deletion. 

1649 # Note that search() holds a lock during this loop; 

1650 # that is fine because this loop is fast 

1651 expires_in = int(entry["expires_on"]) - now 

1652 if expires_in < 5*60: # Then consider it expired 

1653 refresh_reason = msal.telemetry.AT_EXPIRED 

1654 continue # Removal is not necessary, it will be overwritten 

1655 logger.debug("Cache hit an AT") 

1656 access_token_from_cache = { # Mimic a real response 

1657 "access_token": entry["secret"], 

1658 "token_type": entry.get("token_type", "Bearer"), 

1659 "expires_in": int(expires_in), # OAuth2 specs defines it as int 

1660 self._TOKEN_SOURCE: self._TOKEN_SOURCE_CACHE, 

1661 } 

1662 if "refresh_on" in entry: 

1663 access_token_from_cache["refresh_on"] = int(entry["refresh_on"]) 

1664 if int(entry["refresh_on"]) < now: # aging 

1665 refresh_reason = msal.telemetry.AT_AGING 

1666 break # With a fallback in hand, we break here to go refresh 

1667 self._build_telemetry_context(-1).hit_an_access_token() 

1668 return access_token_from_cache # It is still good as new 

1669 else: 

1670 refresh_reason = msal.telemetry.FORCE_REFRESH # TODO: It could also mean claims_challenge 

1671 assert refresh_reason, "It should have been established at this point" 

1672 if not http_exceptions: # It can be a tuple of exceptions 

1673 # The exact HTTP exceptions are transportation-layer dependent 

1674 from requests.exceptions import RequestException # Lazy load 

1675 http_exceptions = (RequestException,) 

1676 try: 

1677 data = kwargs.get("data", {}) 

1678 if account and account.get("authority_type") == _AUTHORITY_TYPE_CLOUDSHELL: 

1679 if auth_scheme: 

1680 raise ValueError("auth_scheme is not supported in Cloud Shell") 

1681 return self._acquire_token_by_cloud_shell(scopes, data=data) 

1682 

1683 is_ssh_cert_or_pop_request = _is_ssh_cert_or_pop_request(data.get("token_type"), auth_scheme) 

1684 

1685 if self._enable_broker and account and account.get("account_source") in ( 

1686 _GRANT_TYPE_BROKER, # Broker successfully established this account previously. 

1687 None, # Unknown data from older MSAL. Broker might still work. 

1688 ) and (sys.platform in ("win32", "darwin") or not is_ssh_cert_or_pop_request): 

1689 from .broker import _acquire_token_silently 

1690 response = _acquire_token_silently( 

1691 "https://{}/{}".format(self.authority.instance, self.authority.tenant), 

1692 self.client_id, 

1693 account["local_account_id"], 

1694 scopes, 

1695 claims=_merge_claims_challenge_and_capabilities( 

1696 self._client_capabilities, claims_challenge), 

1697 correlation_id=correlation_id, 

1698 auth_scheme=auth_scheme, 

1699 **data) 

1700 if response: # Broker provides a decisive outcome 

1701 account_was_established_by_broker = account.get( 

1702 "account_source") == _GRANT_TYPE_BROKER 

1703 broker_attempt_succeeded_just_now = "error" not in response 

1704 if account_was_established_by_broker or broker_attempt_succeeded_just_now: 

1705 return self._process_broker_response(response, scopes, data) 

1706 

1707 if auth_scheme: 

1708 raise ValueError(self._AUTH_SCHEME_UNSUPPORTED) 

1709 if account: 

1710 result = self._acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family( 

1711 authority, self._decorate_scope(scopes), account, 

1712 refresh_reason=refresh_reason, claims_challenge=claims_challenge, 

1713 correlation_id=correlation_id, 

1714 **kwargs) 

1715 else: # The caller is acquire_token_for_client() 

1716 result = self._acquire_token_for_client( 

1717 scopes, refresh_reason, claims_challenge=claims_challenge, 

1718 **kwargs) 

1719 if result and "access_token" in result: 

1720 result[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

1721 if (result and "error" not in result) or (not access_token_from_cache): 

1722 return result 

1723 except http_exceptions: 

1724 # Typically network error. Potential AAD outage? 

1725 if not access_token_from_cache: # It means there is no fall back option 

1726 raise # We choose to bubble up the exception 

1727 return access_token_from_cache 

1728 

1729 def _process_broker_response(self, response, scopes, data): 

1730 if "error" not in response: 

1731 self.token_cache.add(dict( 

1732 client_id=self.client_id, 

1733 scope=response["scope"].split() if "scope" in response else scopes, 

1734 token_endpoint=self.authority.token_endpoint, 

1735 response=response, 

1736 data=data, 

1737 _account_id=response["_account_id"], 

1738 environment=self.authority.instance, # Be consistent with non-broker flows 

1739 grant_type=_GRANT_TYPE_BROKER, # A pseudo grant type for TokenCache to mark account_source as broker 

1740 )) 

1741 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_BROKER 

1742 return _clean_up(response) 

1743 

1744 def _acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family( 

1745 self, authority, scopes, account, **kwargs): 

1746 query = { 

1747 "environment": authority.instance, 

1748 "home_account_id": (account or {}).get("home_account_id"), 

1749 # "realm": authority.tenant, # AAD RTs are tenant-independent 

1750 } 

1751 app_metadata = self._get_app_metadata(authority.instance) 

1752 if not app_metadata: # Meaning this app is now used for the first time. 

1753 # When/if we have a way to directly detect current app's family, 

1754 # we'll rewrite this block, to support multiple families. 

1755 # For now, we try existing RTs (*). If it works, we are in that family. 

1756 # (*) RTs of a different app/family are not supposed to be 

1757 # shared with or accessible by us in the first place. 

1758 at = self._acquire_token_silent_by_finding_specific_refresh_token( 

1759 authority, scopes, 

1760 dict(query, family_id="1"), # A hack, we have only 1 family for now 

1761 rt_remover=lambda rt_item: None, # NO-OP b/c RTs are likely not mine 

1762 break_condition=lambda response: # Break loop when app not in family 

1763 # Based on an AAD-only behavior mentioned in internal doc here 

1764 # https://msazure.visualstudio.com/One/_git/ESTS-Docs/pullrequest/1138595 

1765 "client_mismatch" in response.get("error_additional_info", []), 

1766 **kwargs) 

1767 if at and "error" not in at: 

1768 return at 

1769 last_resp = None 

1770 if app_metadata.get("family_id"): # Meaning this app belongs to this family 

1771 last_resp = at = self._acquire_token_silent_by_finding_specific_refresh_token( 

1772 authority, scopes, dict(query, family_id=app_metadata["family_id"]), 

1773 **kwargs) 

1774 if at and "error" not in at: 

1775 return at 

1776 # Either this app is an orphan, so we will naturally use its own RT; 

1777 # or all attempts above have failed, so we fall back to non-foci behavior. 

1778 return self._acquire_token_silent_by_finding_specific_refresh_token( 

1779 authority, scopes, dict(query, client_id=self.client_id), 

1780 **kwargs) or last_resp 

1781 

1782 def _get_app_metadata(self, environment): 

1783 return self.token_cache._get_app_metadata( 

1784 environment=environment, client_id=self.client_id, default={}) 

1785 

1786 def _acquire_token_silent_by_finding_specific_refresh_token( 

1787 self, authority, scopes, query, 

1788 rt_remover=None, break_condition=lambda response: False, 

1789 refresh_reason=None, correlation_id=None, claims_challenge=None, 

1790 **kwargs): 

1791 matches = list(self.token_cache.search( # We want a list to test emptiness 

1792 self.token_cache.CredentialType.REFRESH_TOKEN, 

1793 # target=scopes, # AAD RTs are scope-independent 

1794 query=query)) 

1795 logger.debug("Found %d RTs matching %s", len(matches), { 

1796 k: _pii_less_home_account_id(v) if k == "home_account_id" and v else v 

1797 for k, v in query.items() 

1798 }) 

1799 

1800 response = None # A distinguishable value to mean cache is empty 

1801 if not matches: # Then exit early to avoid expensive operations 

1802 return response 

1803 client, _ = self._build_client( 

1804 # Potentially expensive if building regional client 

1805 self.client_credential, authority, skip_regional_client=True) 

1806 telemetry_context = self._build_telemetry_context( 

1807 self.ACQUIRE_TOKEN_SILENT_ID, 

1808 correlation_id=correlation_id, refresh_reason=refresh_reason) 

1809 for entry in sorted( # Since unfit RTs would not be aggressively removed, 

1810 # we start from newer RTs which are more likely fit. 

1811 matches, 

1812 key=lambda e: int(e.get("last_modification_time", "0")), 

1813 reverse=True): 

1814 logger.debug("Cache attempts an RT") 

1815 headers = telemetry_context.generate_headers() 

1816 if query.get("home_account_id"): # Then use it as CCS Routing info 

1817 headers["X-AnchorMailbox"] = "Oid:{}".format( # case-insensitive value 

1818 query["home_account_id"].replace(".", "@")) 

1819 response = client.obtain_token_by_refresh_token( 

1820 entry, rt_getter=lambda token_item: token_item["secret"], 

1821 on_removing_rt=lambda rt_item: None, # Disable RT removal, 

1822 # because an invalid_grant could be caused by new MFA policy, 

1823 # the RT could still be useful for other MFA-less scope or tenant 

1824 on_obtaining_tokens=lambda event: self.token_cache.add(dict( 

1825 event, 

1826 environment=authority.instance, 

1827 skip_account_creation=True, # To honor a concurrent remove_account() 

1828 )), 

1829 scope=scopes, 

1830 headers=headers, 

1831 data=dict( 

1832 kwargs.pop("data", {}), 

1833 claims=_merge_claims_challenge_and_capabilities( 

1834 self._client_capabilities, claims_challenge)), 

1835 **kwargs) 

1836 telemetry_context.update_telemetry(response) 

1837 if "error" not in response: 

1838 return response 

1839 logger.debug("Refresh failed. {error}: {error_description}".format( 

1840 error=response.get("error"), 

1841 error_description=response.get("error_description"), 

1842 )) 

1843 if break_condition(response): 

1844 break 

1845 return response # Returns the latest error (if any), or just None 

1846 

1847 def _validate_ssh_cert_input_data(self, data): 

1848 if data.get("token_type") == "ssh-cert": 

1849 if not data.get("req_cnf"): 

1850 raise ValueError( 

1851 "When requesting an SSH certificate, " 

1852 "you must include a string parameter named 'req_cnf' " 

1853 "containing the public key in JWK format " 

1854 "(https://tools.ietf.org/html/rfc7517).") 

1855 if not data.get("key_id"): 

1856 raise ValueError( 

1857 "When requesting an SSH certificate, " 

1858 "you must include a string parameter named 'key_id' " 

1859 "which identifies the key in the 'req_cnf' argument.") 

1860 

1861 def acquire_token_by_refresh_token(self, refresh_token, scopes, **kwargs): 

1862 """Acquire token(s) based on a refresh token (RT) obtained from elsewhere. 

1863 

1864 You use this method only when you have old RTs from elsewhere, 

1865 and now you want to migrate them into MSAL. 

1866 Calling this method results in new tokens automatically storing into MSAL. 

1867 

1868 You do NOT need to use this method if you are already using MSAL. 

1869 MSAL maintains RT automatically inside its token cache, 

1870 and an access token can be retrieved 

1871 when you call :func:`~acquire_token_silent`. 

1872 

1873 :param str refresh_token: The old refresh token, as a string. 

1874 

1875 :param list scopes: 

1876 The scopes associate with this old RT. 

1877 Each scope needs to be in the Microsoft identity platform (v2) format. 

1878 See `Scopes not resources <https://docs.microsoft.com/en-us/azure/active-directory/develop/migrate-python-adal-msal#scopes-not-resources>`_. 

1879 

1880 :return: 

1881 * A dict contains "error" and some other keys, when error happened. 

1882 * A dict contains no "error" key means migration was successful. 

1883 """ 

1884 self._validate_ssh_cert_input_data(kwargs.get("data", {})) 

1885 telemetry_context = self._build_telemetry_context( 

1886 self.ACQUIRE_TOKEN_BY_REFRESH_TOKEN, 

1887 refresh_reason=msal.telemetry.FORCE_REFRESH) 

1888 response = _clean_up(self.client.obtain_token_by_refresh_token( 

1889 refresh_token, 

1890 scope=self._decorate_scope(scopes), 

1891 headers=telemetry_context.generate_headers(), 

1892 rt_getter=lambda rt: rt, 

1893 on_updating_rt=False, 

1894 on_removing_rt=lambda rt_item: None, # No OP 

1895 **kwargs)) 

1896 if "access_token" in response: 

1897 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

1898 telemetry_context.update_telemetry(response) 

1899 return response 

1900 

1901 def acquire_token_by_username_password( 

1902 self, username, password, scopes, claims_challenge=None, 

1903 # Note: We shouldn't need to surface enable_msa_passthrough, 

1904 # because this ROPC won't work with MSA account anyway. 

1905 auth_scheme=None, 

1906 **kwargs): 

1907 """Gets a token for a given resource via user credentials. 

1908 

1909 See this page for constraints of Username Password Flow. 

1910 https://github.com/AzureAD/microsoft-authentication-library-for-python/wiki/Username-Password-Authentication 

1911 

1912 :param str username: Typically a UPN in the form of an email address. 

1913 :param str password: The password. 

1914 :param list[str] scopes: 

1915 Scopes requested to access a protected API (a resource). 

1916 :param claims_challenge: 

1917 The claims_challenge parameter requests specific claims requested by the resource provider 

1918 in the form of a claims_challenge directive in the www-authenticate header to be 

1919 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

1920 It is a string of a JSON object which contains lists of claims being requested from these locations. 

1921 

1922 :param object auth_scheme: 

1923 You can provide an ``msal.auth_scheme.PopAuthScheme`` object 

1924 so that MSAL will get a Proof-of-Possession (POP) token for you. 

1925 

1926 New in version 1.26.0. 

1927 

1928 :return: A dict representing the json response from Microsoft Entra: 

1929 

1930 - A successful response would contain "access_token" key, 

1931 - an error response would contain "error" and usually "error_description". 

1932 

1933 [Deprecated] This API is deprecated for public client flows and will be 

1934 removed in a future release. Use a more secure flow instead. 

1935 Migration guide: https://aka.ms/msal-ropc-migration 

1936 

1937 """ 

1938 is_confidential_app = self.client_credential or isinstance( 

1939 self, ConfidentialClientApplication) 

1940 if not is_confidential_app: 

1941 warnings.warn("""This API has been deprecated for public client flows, please use a more secure flow. 

1942 See https://aka.ms/msal-ropc-migration for migration guidance""", DeprecationWarning) 

1943 claims = _merge_claims_challenge_and_capabilities( 

1944 self._client_capabilities, claims_challenge) 

1945 if self._enable_broker and sys.platform in ("win32", "darwin"): 

1946 from .broker import _signin_silently 

1947 response = _signin_silently( 

1948 "https://{}/{}".format(self.authority.instance, self.authority.tenant), 

1949 self.client_id, 

1950 scopes, # Decorated scopes won't work due to offline_access 

1951 MSALRuntime_Username=username, 

1952 MSALRuntime_Password=password, 

1953 validateAuthority="no" if ( 

1954 self.authority._is_known_to_developer 

1955 or self._instance_discovery is False) else None, 

1956 claims=claims, 

1957 auth_scheme=auth_scheme, 

1958 ) 

1959 return self._process_broker_response(response, scopes, kwargs.get("data", {})) 

1960 

1961 if auth_scheme: 

1962 raise ValueError(self._AUTH_SCHEME_UNSUPPORTED) 

1963 scopes = self._decorate_scope(scopes) 

1964 telemetry_context = self._build_telemetry_context( 

1965 self.ACQUIRE_TOKEN_BY_USERNAME_PASSWORD_ID) 

1966 headers = telemetry_context.generate_headers() 

1967 data = dict(kwargs.pop("data", {}), claims=claims) 

1968 response = None 

1969 if not self.authority.is_adfs: 

1970 user_realm_result = self.authority.user_realm_discovery( 

1971 username, correlation_id=headers[msal.telemetry.CLIENT_REQUEST_ID]) 

1972 if user_realm_result.get("account_type") == "Federated": 

1973 response = _clean_up(self._acquire_token_by_username_password_federated( 

1974 user_realm_result, username, password, scopes=scopes, 

1975 data=data, 

1976 headers=headers, **kwargs)) 

1977 if response is None: # Either ADFS or not federated 

1978 response = _clean_up(self.client.obtain_token_by_username_password( 

1979 username, password, scope=scopes, 

1980 headers=headers, 

1981 data=data, 

1982 **kwargs)) 

1983 if "access_token" in response: 

1984 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

1985 telemetry_context.update_telemetry(response) 

1986 return response 

1987 

1988 def _acquire_token_by_username_password_federated( 

1989 self, user_realm_result, username, password, scopes=None, **kwargs): 

1990 wstrust_endpoint = {} 

1991 if user_realm_result.get("federation_metadata_url"): 

1992 wstrust_endpoint = mex_send_request( 

1993 user_realm_result["federation_metadata_url"], 

1994 self.http_client) 

1995 if wstrust_endpoint is None: 

1996 raise ValueError("Unable to find wstrust endpoint from MEX. " 

1997 "This typically happens when attempting MSA accounts. " 

1998 "More details available here. " 

1999 "https://github.com/AzureAD/microsoft-authentication-library-for-python/wiki/Username-Password-Authentication") 

2000 logger.debug("wstrust_endpoint = %s", wstrust_endpoint) 

2001 wstrust_result = wst_send_request( 

2002 username, password, 

2003 user_realm_result.get("cloud_audience_urn", "urn:federation:MicrosoftOnline"), 

2004 wstrust_endpoint.get("address", 

2005 # Fallback to an AAD supplied endpoint 

2006 user_realm_result.get("federation_active_auth_url")), 

2007 wstrust_endpoint.get("action"), self.http_client) 

2008 if not ("token" in wstrust_result and "type" in wstrust_result): 

2009 raise RuntimeError("Unsuccessful RSTR. %s" % wstrust_result) 

2010 GRANT_TYPE_SAML1_1 = 'urn:ietf:params:oauth:grant-type:saml1_1-bearer' 

2011 grant_type = { 

2012 SAML_TOKEN_TYPE_V1: GRANT_TYPE_SAML1_1, 

2013 SAML_TOKEN_TYPE_V2: self.client.GRANT_TYPE_SAML2, 

2014 WSS_SAML_TOKEN_PROFILE_V1_1: GRANT_TYPE_SAML1_1, 

2015 WSS_SAML_TOKEN_PROFILE_V2: self.client.GRANT_TYPE_SAML2 

2016 }.get(wstrust_result.get("type")) 

2017 if not grant_type: 

2018 raise RuntimeError( 

2019 "RSTR returned unknown token type: %s", wstrust_result.get("type")) 

2020 self.client.grant_assertion_encoders.setdefault( # Register a non-standard type 

2021 grant_type, self.client.encode_saml_assertion) 

2022 return self.client.obtain_token_by_assertion( 

2023 wstrust_result["token"], grant_type, scope=scopes, 

2024 on_obtaining_tokens=lambda event: self.token_cache.add(dict( 

2025 event, 

2026 environment=self.authority.instance, 

2027 username=username, # Useful in case IDT contains no such info 

2028 )), 

2029 **kwargs) 

2030 

2031 

2032class PublicClientApplication(ClientApplication): # browser app or mobile app 

2033 

2034 DEVICE_FLOW_CORRELATION_ID = "_correlation_id" 

2035 CONSOLE_WINDOW_HANDLE = object() 

2036 

2037 def __init__( 

2038 self, client_id, client_credential=None, 

2039 *, 

2040 enable_broker_on_windows=None, 

2041 enable_broker_on_mac=None, 

2042 enable_broker_on_linux=None, 

2043 enable_broker_on_wsl=None, 

2044 **kwargs): 

2045 """Same as :func:`ClientApplication.__init__`, 

2046 except that ``client_credential`` parameter shall remain ``None``. 

2047 

2048 .. note:: 

2049 

2050 **What is a broker, and why use it?** 

2051 

2052 A broker is a component installed on your device. 

2053 Broker implicitly gives your device an identity. By using a broker, 

2054 your device becomes a factor that can satisfy MFA (Multi-factor authentication). 

2055 This factor would become mandatory 

2056 if a tenant's admin enables a corresponding Conditional Access (CA) policy. 

2057 The broker's presence allows Microsoft identity platform 

2058 to have higher confidence that the tokens are being issued to your device, 

2059 and that is more secure. 

2060 

2061 An additional benefit of broker is, 

2062 it runs as a long-lived process with your device's OS, 

2063 and maintains its own cache, 

2064 so that your broker-enabled apps (even a CLI) 

2065 could automatically SSO from a previously established signed-in session. 

2066 

2067 **How to opt in to use broker?** 

2068 

2069 1. You can set any combination of the following opt-in parameters to true: 

2070 

2071 +--------------------------+-----------------------------------+------------------------------------------------------------------------------------+ 

2072 | Opt-in flag | If app will run on | App has registered this as a Desktop platform redirect URI in Azure Portal | 

2073 +==========================+===================================+====================================================================================+ 

2074 | enable_broker_on_windows | Windows 10+ | ms-appx-web://Microsoft.AAD.BrokerPlugin/your_client_id | 

2075 +--------------------------+-----------------------------------+------------------------------------------------------------------------------------+ 

2076 | enable_broker_on_wsl | WSL | ms-appx-web://Microsoft.AAD.BrokerPlugin/your_client_id | 

2077 +--------------------------+-----------------------------------+------------------------------------------------------------------------------------+ 

2078 | enable_broker_on_mac | Mac with Company Portal installed | msauth.com.msauth.unsignedapp://auth | 

2079 +--------------------------+-----------------------------------+------------------------------------------------------------------------------------+ 

2080 | enable_broker_on_linux | Linux with Intune installed | ``https://login.microsoftonline.com/common/oauth2/nativeclient`` (MUST be enabled) | 

2081 +--------------------------+-----------------------------------+------------------------------------------------------------------------------------+ 

2082 

2083 2. Install broker dependency, 

2084 e.g. ``pip install msal[broker]>=1.33,<2``. 

2085 

2086 3. Test with ``acquire_token_interactive()`` and ``acquire_token_silent()``. 

2087 

2088 **The fallback behaviors of MSAL Python's broker support** 

2089 

2090 MSAL will either error out, or silently fallback to non-broker flows. 

2091 

2092 1. MSAL will ignore the `enable_broker_...` and bypass broker 

2093 on those auth flows that are known to be NOT supported by broker. 

2094 This includes ADFS, B2C, etc.. 

2095 For other "could-use-broker" scenarios, please see below. 

2096 2. MSAL errors out when app developer opted-in to use broker 

2097 but a direct dependency "mid-tier" package is not installed. 

2098 Error message guides app developer to declare the correct dependency 

2099 ``msal[broker]``. 

2100 We error out here because the error is actionable to app developers. 

2101 3. MSAL silently "deactivates" the broker and fallback to non-broker, 

2102 when opted-in, dependency installed yet failed to initialize. 

2103 We anticipate this would happen on a device whose OS is too old 

2104 or the underlying broker component is somehow unavailable. 

2105 There is not much an app developer or the end user can do here. 

2106 Eventually, the conditional access policy shall 

2107 force the user to switch to a different device. 

2108 4. MSAL errors out when broker is opted in, installed, initialized, 

2109 but subsequent token request(s) failed. 

2110 

2111 :param boolean enable_broker_on_windows: 

2112 This setting is only effective if your app is running on Windows 10+. 

2113 This parameter defaults to None, which means MSAL will not utilize a broker. 

2114 

2115 New in MSAL Python 1.25.0. 

2116 

2117 :param boolean enable_broker_on_mac: 

2118 This setting is only effective if your app is running on Mac. 

2119 This parameter defaults to None, which means MSAL will not utilize a broker. 

2120 

2121 New in MSAL Python 1.31.0. 

2122 

2123 :param boolean enable_broker_on_linux: 

2124 This setting is only effective if your app is running on Linux, including WSL. 

2125 This parameter defaults to None, which means MSAL will not utilize a broker. 

2126 

2127 New in MSAL Python 1.33.0. 

2128 

2129 :param boolean enable_broker_on_wsl: 

2130 This setting is only effective if your app is running on WSL. 

2131 This parameter defaults to None, which means MSAL will not utilize a broker. 

2132 

2133 New in MSAL Python 1.33.0. 

2134 """ 

2135 if client_credential is not None: 

2136 raise ValueError("Public Client should not possess credentials") 

2137 

2138 self._enable_broker = bool( 

2139 enable_broker_on_windows and sys.platform == "win32" 

2140 or enable_broker_on_mac and sys.platform == "darwin" 

2141 or enable_broker_on_linux and sys.platform == "linux" 

2142 or enable_broker_on_wsl and is_wsl() 

2143 ) 

2144 

2145 super(PublicClientApplication, self).__init__( 

2146 client_id, client_credential=None, **kwargs) 

2147 

2148 def acquire_token_interactive( 

2149 self, 

2150 scopes, # type: list[str] 

2151 prompt=None, 

2152 login_hint=None, # type: Optional[str] 

2153 domain_hint=None, # type: Optional[str] 

2154 claims_challenge=None, 

2155 timeout=None, 

2156 port=None, 

2157 extra_scopes_to_consent=None, 

2158 max_age=None, 

2159 parent_window_handle=None, 

2160 on_before_launching_ui=None, 

2161 auth_scheme=None, 

2162 **kwargs): 

2163 """Acquire token interactively i.e. via a local browser. 

2164 

2165 Prerequisite: In Azure Portal, configure the Redirect URI of your 

2166 "Mobile and Desktop application" as ``http://localhost``. 

2167 If you opts in to use broker during ``PublicClientApplication`` creation, 

2168 your app also need this Redirect URI: 

2169 ``ms-appx-web://Microsoft.AAD.BrokerPlugin/YOUR_CLIENT_ID`` 

2170 

2171 :param list scopes: 

2172 It is a list of case-sensitive strings. 

2173 :param str prompt: 

2174 By default, no prompt value will be sent, not even string ``"none"``. 

2175 You will have to specify a value explicitly. 

2176 Its valid values are the constants defined in 

2177 :class:`Prompt <msal.Prompt>`. 

2178 :param str login_hint: 

2179 Optional. Identifier of the user. Generally a User Principal Name (UPN). 

2180 :param domain_hint: 

2181 Can be one of "consumers" or "organizations" or your tenant domain "contoso.com". 

2182 If included, it will skip the email-based discovery process that user goes 

2183 through on the sign-in page, leading to a slightly more streamlined user experience. 

2184 More information on possible values available in 

2185 `Auth Code Flow doc <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-auth-code-flow#request-an-authorization-code>`_ and 

2186 `domain_hint doc <https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-oapx/86fb452d-e34a-494e-ac61-e526e263b6d8>`_. 

2187 

2188 :param claims_challenge: 

2189 The claims_challenge parameter requests specific claims requested by the resource provider 

2190 in the form of a claims_challenge directive in the www-authenticate header to be 

2191 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

2192 It is a string of a JSON object which contains lists of claims being requested from these locations. 

2193 

2194 :param int timeout: 

2195 This method will block the current thread. 

2196 This parameter specifies the timeout value in seconds. 

2197 Default value ``None`` means wait indefinitely. 

2198 

2199 :param int port: 

2200 The port to be used to listen to an incoming auth response. 

2201 By default we will use a system-allocated port. 

2202 (The rest of the redirect_uri is hard coded as ``http://localhost``.) 

2203 

2204 :param list extra_scopes_to_consent: 

2205 "Extra scopes to consent" is a concept only available in Microsoft Entra. 

2206 It refers to other resources you might want to prompt to consent for, 

2207 in the same interaction, but for which you won't get back a 

2208 token for in this particular operation. 

2209 

2210 :param int max_age: 

2211 OPTIONAL. Maximum Authentication Age. 

2212 Specifies the allowable elapsed time in seconds 

2213 since the last time the End-User was actively authenticated. 

2214 If the elapsed time is greater than this value, 

2215 Microsoft identity platform will actively re-authenticate the End-User. 

2216 

2217 MSAL Python will also automatically validate the auth_time in ID token. 

2218 

2219 New in version 1.15. 

2220 

2221 :param int parent_window_handle: 

2222 OPTIONAL. 

2223 

2224 * If your app does not opt in to use broker, 

2225 you do not need to provide a ``parent_window_handle`` here. 

2226 

2227 * If your app opts in to use broker, 

2228 ``parent_window_handle`` is required. 

2229 

2230 - If your app is a GUI app running on Windows or Mac system, 

2231 you are required to also provide its window handle, 

2232 so that the sign-in window will pop up on top of your window. 

2233 - If your app is a console app running on Windows or Mac system, 

2234 you can use a placeholder 

2235 ``PublicClientApplication.CONSOLE_WINDOW_HANDLE``. 

2236 

2237 Most Python scripts are console apps. 

2238 

2239 New in version 1.20.0. 

2240 

2241 :param function on_before_launching_ui: 

2242 A callback with the form of 

2243 ``lambda ui="xyz", **kwargs: print("A {} will be launched".format(ui))``, 

2244 where ``ui`` will be either "browser" or "broker". 

2245 You can use it to inform your end user to expect a pop-up window. 

2246 

2247 New in version 1.20.0. 

2248 

2249 :param object auth_scheme: 

2250 You can provide an ``msal.auth_scheme.PopAuthScheme`` object 

2251 so that MSAL will get a Proof-of-Possession (POP) token for you. 

2252 

2253 New in version 1.26.0. 

2254 

2255 :return: 

2256 - A dict containing no "error" key, 

2257 and typically contains an "access_token" key. 

2258 - A dict containing an "error" key, when token refresh failed. 

2259 """ 

2260 data = kwargs.pop("data", {}) 

2261 enable_msa_passthrough = kwargs.pop( # MUST remove it from kwargs 

2262 "enable_msa_passthrough", # Keep it as a hidden param, for now. 

2263 # OPTIONAL. MSA-Passthrough is a legacy configuration, 

2264 # needed by a small amount of Microsoft first-party apps, 

2265 # which would login MSA accounts via ".../organizations" authority. 

2266 # If you app belongs to this category, AND you are enabling broker, 

2267 # you would want to enable this flag. Default value is False. 

2268 # More background of MSA-PT is available from this internal docs: 

2269 # https://microsoft.sharepoint.com/:w:/t/Identity-DevEx/EatIUauX3c9Ctw1l7AQ6iM8B5CeBZxc58eoQCE0IuZ0VFw?e=tgc3jP&CID=39c853be-76ea-79d7-ee73-f1b2706ede05 

2270 False 

2271 ) and data.get("token_type") != "ssh-cert" # Work around a known issue as of PyMsalRuntime 0.8 

2272 self._validate_ssh_cert_input_data(data) 

2273 is_ssh_cert_or_pop_request = _is_ssh_cert_or_pop_request(data.get("token_type"), auth_scheme) 

2274 

2275 if not on_before_launching_ui: 

2276 on_before_launching_ui = lambda **kwargs: None 

2277 if _is_running_in_cloud_shell() and prompt == "none": 

2278 # Note: _acquire_token_by_cloud_shell() is always silent, 

2279 # so we would not fire on_before_launching_ui() 

2280 return self._acquire_token_by_cloud_shell(scopes, data=data) 

2281 claims = _merge_claims_challenge_and_capabilities( 

2282 self._client_capabilities, claims_challenge) 

2283 if self._enable_broker and (sys.platform in ("win32", "darwin") or not is_ssh_cert_or_pop_request): 

2284 if parent_window_handle is None: 

2285 raise ValueError( 

2286 "parent_window_handle is required when you opted into using broker. " 

2287 "You need to provide the window handle of your GUI application, " 

2288 "or use msal.PublicClientApplication.CONSOLE_WINDOW_HANDLE " 

2289 "when and only when your application is a console app.") 

2290 if extra_scopes_to_consent: 

2291 logger.warning( 

2292 "Ignoring parameter extra_scopes_to_consent, " 

2293 "which is not supported by broker") 

2294 response = self._acquire_token_interactive_via_broker( 

2295 scopes, 

2296 parent_window_handle, 

2297 enable_msa_passthrough, 

2298 claims, 

2299 data, 

2300 on_before_launching_ui, 

2301 auth_scheme, 

2302 prompt=prompt, 

2303 login_hint=login_hint, 

2304 max_age=max_age, 

2305 ) 

2306 return self._process_broker_response(response, scopes, data) 

2307 

2308 if isinstance(auth_scheme, msal.auth_scheme.PopAuthScheme) and sys.platform == "linux": 

2309 raise ValueError("POP is not supported on Linux") 

2310 elif auth_scheme: 

2311 raise ValueError(self._AUTH_SCHEME_UNSUPPORTED) 

2312 on_before_launching_ui(ui="browser") 

2313 telemetry_context = self._build_telemetry_context( 

2314 self.ACQUIRE_TOKEN_INTERACTIVE) 

2315 response = _clean_up(self.client.obtain_token_by_browser( 

2316 scope=self._decorate_scope(scopes) if scopes else None, 

2317 extra_scope_to_consent=extra_scopes_to_consent, 

2318 redirect_uri="http://localhost:{port}".format( 

2319 # Hardcode the host, for now. AAD portal rejects 127.0.0.1 anyway 

2320 port=port or 0), 

2321 prompt=prompt, 

2322 login_hint=login_hint, 

2323 max_age=max_age, 

2324 timeout=timeout, 

2325 auth_params={ 

2326 "claims": claims, 

2327 "domain_hint": domain_hint, 

2328 }, 

2329 data=dict(data, claims=claims), 

2330 headers=telemetry_context.generate_headers(), 

2331 browser_name=_preferred_browser(), 

2332 **kwargs)) 

2333 if "access_token" in response: 

2334 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

2335 telemetry_context.update_telemetry(response) 

2336 return response 

2337 

2338 def _acquire_token_interactive_via_broker( 

2339 self, 

2340 scopes, # type: list[str] 

2341 parent_window_handle, # type: int 

2342 enable_msa_passthrough, # type: boolean 

2343 claims, # type: str 

2344 data, # type: dict 

2345 on_before_launching_ui, # type: callable 

2346 auth_scheme, # type: object 

2347 prompt=None, 

2348 login_hint=None, # type: Optional[str] 

2349 max_age=None, 

2350 **kwargs): 

2351 from .broker import _signin_interactively, _signin_silently, _acquire_token_silently 

2352 if "welcome_template" in kwargs: 

2353 logger.debug(kwargs["welcome_template"]) # Experimental 

2354 authority = "https://{}/{}".format( 

2355 self.authority.instance, self.authority.tenant) 

2356 validate_authority = "no" if ( 

2357 self.authority._is_known_to_developer 

2358 or self._instance_discovery is False) else None 

2359 # Calls different broker methods to mimic the OIDC behaviors 

2360 if login_hint and prompt != "select_account": # OIDC prompts when the user did not sign in 

2361 accounts = self.get_accounts(username=login_hint) 

2362 if len(accounts) == 1: # Unambiguously proceed with this account 

2363 logger.debug("Calling broker._acquire_token_silently()") 

2364 response = _acquire_token_silently( # When it works, it bypasses prompt 

2365 authority, 

2366 self.client_id, 

2367 accounts[0]["local_account_id"], 

2368 scopes, 

2369 claims=claims, 

2370 auth_scheme=auth_scheme, 

2371 **data) 

2372 if response and "error" not in response: 

2373 return response 

2374 # login_hint undecisive or not exists 

2375 if prompt == "none" or not prompt: # Must/Can attempt _signin_silently() 

2376 logger.debug("Calling broker._signin_silently()") 

2377 response = _signin_silently( # Unlike OIDC, it doesn't honor login_hint 

2378 authority, self.client_id, scopes, 

2379 validateAuthority=validate_authority, 

2380 claims=claims, 

2381 max_age=max_age, 

2382 enable_msa_pt=enable_msa_passthrough, 

2383 auth_scheme=auth_scheme, 

2384 **data) 

2385 is_wrong_account = bool( 

2386 # _signin_silently() only gets tokens for default account, 

2387 # but this seems to have been fixed in PyMsalRuntime 0.11.2 

2388 "access_token" in response and login_hint 

2389 and login_hint != response.get( 

2390 "id_token_claims", {}).get("preferred_username")) 

2391 wrong_account_error_message = ( 

2392 'prompt="none" will not work for login_hint="non-default-user"') 

2393 if is_wrong_account: 

2394 logger.debug(wrong_account_error_message) 

2395 if prompt == "none": 

2396 return response if not is_wrong_account else { 

2397 "error": "broker_error", 

2398 "error_description": wrong_account_error_message, 

2399 } 

2400 else: 

2401 assert bool(prompt) is False 

2402 from pymsalruntime import Response_Status 

2403 recoverable_errors = frozenset([ 

2404 Response_Status.Status_AccountUnusable, 

2405 Response_Status.Status_InteractionRequired, 

2406 ]) 

2407 if is_wrong_account or "error" in response and response.get( 

2408 "_broker_status") in recoverable_errors: 

2409 pass # It will fall back to the _signin_interactively() 

2410 else: 

2411 return response 

2412 

2413 logger.debug("Falls back to broker._signin_interactively()") 

2414 on_before_launching_ui(ui="broker") 

2415 return _signin_interactively( 

2416 authority, self.client_id, scopes, 

2417 None if parent_window_handle is self.CONSOLE_WINDOW_HANDLE 

2418 else parent_window_handle, 

2419 validateAuthority=validate_authority, 

2420 login_hint=login_hint, 

2421 prompt=prompt, 

2422 claims=claims, 

2423 max_age=max_age, 

2424 enable_msa_pt=enable_msa_passthrough, 

2425 auth_scheme=auth_scheme, 

2426 **data) 

2427 

2428 def initiate_device_flow(self, scopes=None, *, claims_challenge=None, **kwargs): 

2429 """Initiate a Device Flow instance, 

2430 which will be used in :func:`~acquire_token_by_device_flow`. 

2431 

2432 :param list[str] scopes: 

2433 Scopes requested to access a protected API (a resource). 

2434 :return: A dict representing a newly created Device Flow object. 

2435 

2436 - A successful response would contain "user_code" key, among others 

2437 - an error response would contain some other readable key/value pairs. 

2438 """ 

2439 correlation_id = msal.telemetry._get_new_correlation_id() 

2440 flow = self.client.initiate_device_flow( 

2441 scope=self._decorate_scope(scopes or []), 

2442 headers={msal.telemetry.CLIENT_REQUEST_ID: correlation_id}, 

2443 data={"claims": _merge_claims_challenge_and_capabilities( 

2444 self._client_capabilities, claims_challenge)}, 

2445 **kwargs) 

2446 flow[self.DEVICE_FLOW_CORRELATION_ID] = correlation_id 

2447 return flow 

2448 

2449 def acquire_token_by_device_flow(self, flow, claims_challenge=None, **kwargs): 

2450 """Obtain token by a device flow object, with customizable polling effect. 

2451 

2452 :param dict flow: 

2453 A dict previously generated by :func:`~initiate_device_flow`. 

2454 By default, this method's polling effect will block current thread. 

2455 You can abort the polling loop at any time, 

2456 by changing the value of the flow's "expires_at" key to 0. 

2457 :param claims_challenge: 

2458 The claims_challenge parameter requests specific claims requested by the resource provider 

2459 in the form of a claims_challenge directive in the www-authenticate header to be 

2460 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

2461 It is a string of a JSON object which contains lists of claims being requested from these locations. 

2462 

2463 :return: A dict representing the json response from Microsoft Entra: 

2464 

2465 - A successful response would contain "access_token" key, 

2466 - an error response would contain "error" and usually "error_description". 

2467 """ 

2468 telemetry_context = self._build_telemetry_context( 

2469 self.ACQUIRE_TOKEN_BY_DEVICE_FLOW_ID, 

2470 correlation_id=flow.get(self.DEVICE_FLOW_CORRELATION_ID)) 

2471 response = _clean_up(self.client.obtain_token_by_device_flow( 

2472 flow, 

2473 data=dict( 

2474 kwargs.pop("data", {}), 

2475 code=flow["device_code"], # 2018-10-4 Hack: 

2476 # during transition period, 

2477 # service seemingly need both device_code and code parameter. 

2478 claims=_merge_claims_challenge_and_capabilities( 

2479 self._client_capabilities, claims_challenge), 

2480 ), 

2481 headers=telemetry_context.generate_headers(), 

2482 **kwargs)) 

2483 if "access_token" in response: 

2484 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

2485 telemetry_context.update_telemetry(response) 

2486 return response 

2487 

2488 

2489class ConfidentialClientApplication(ClientApplication): # server-side web app 

2490 """Same as :func:`ClientApplication.__init__`, 

2491 except that ``allow_broker`` parameter shall remain ``None``. 

2492 """ 

2493 

2494 def acquire_token_for_client(self, scopes, claims_challenge=None, fmi_path=None, **kwargs): 

2495 """Acquires token for the current confidential client, not for an end user. 

2496 

2497 Since MSAL Python 1.23, it will automatically look for token from cache, 

2498 and only send request to Identity Provider when cache misses. 

2499 

2500 :param list[str] scopes: (Required) 

2501 Scopes requested to access a protected API (a resource). 

2502 :param claims_challenge: 

2503 The claims_challenge parameter requests specific claims requested by the resource provider 

2504 in the form of a claims_challenge directive in the www-authenticate header to be 

2505 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

2506 It is a string of a JSON object which contains lists of claims being requested from these locations. 

2507 :param str fmi_path: 

2508 Optional. The Federated Managed Identity (FMI) credential path. 

2509 When provided, it is sent as the ``fmi_path`` parameter in the 

2510 token request body, and the resulting token is cached separately 

2511 so that different FMI paths do not share cached tokens. 

2512 Example usage:: 

2513 

2514 result = cca.acquire_token_for_client( 

2515 scopes=["api://resource/.default"], 

2516 fmi_path="SomeFmiPath/FmiCredentialPath", 

2517 ) 

2518 :return: A dict representing the json response from Microsoft Entra: 

2519 

2520 - A successful response would contain "access_token" key, 

2521 - an error response would contain "error" and usually "error_description". 

2522 """ 

2523 if kwargs.get("force_refresh"): 

2524 raise ValueError( # We choose to disallow force_refresh 

2525 "Historically, this method does not support force_refresh behavior. " 

2526 ) 

2527 if fmi_path is not None: 

2528 if not isinstance(fmi_path, str): 

2529 raise ValueError( 

2530 "fmi_path must be a string, got {}".format(type(fmi_path).__name__)) 

2531 kwargs["data"] = kwargs.get("data", {}) 

2532 kwargs["data"]["fmi_path"] = fmi_path 

2533 return _clean_up(self._acquire_token_silent_with_error( 

2534 scopes, None, claims_challenge=claims_challenge, **kwargs)) 

2535 

2536 def _acquire_token_for_client( 

2537 self, 

2538 scopes, 

2539 refresh_reason, 

2540 claims_challenge=None, 

2541 **kwargs 

2542 ): 

2543 if self.authority.tenant.lower() in ["common", "organizations"]: 

2544 warnings.warn( 

2545 "Using /common or /organizations authority " 

2546 "in acquire_token_for_client() is unreliable. " 

2547 "Please use a specific tenant instead.", DeprecationWarning) 

2548 self._validate_ssh_cert_input_data(kwargs.get("data", {})) 

2549 telemetry_context = self._build_telemetry_context( 

2550 self.ACQUIRE_TOKEN_FOR_CLIENT_ID, refresh_reason=refresh_reason) 

2551 client = self._regional_client or self.client 

2552 response = client.obtain_token_for_client( 

2553 scope=scopes, # This grant flow requires no scope decoration 

2554 headers=telemetry_context.generate_headers(), 

2555 data=dict( 

2556 kwargs.pop("data", {}), 

2557 claims=_merge_claims_challenge_and_capabilities( 

2558 self._client_capabilities, claims_challenge)), 

2559 **kwargs) 

2560 telemetry_context.update_telemetry(response) 

2561 return response 

2562 

2563 def remove_tokens_for_client(self): 

2564 """Remove all tokens that were previously acquired via 

2565 :func:`~acquire_token_for_client()` for the current client.""" 

2566 for env in [self.authority.instance] + self._get_authority_aliases( 

2567 self.authority.instance): 

2568 for at in list(self.token_cache.search( # Remove ATs from a snapshot 

2569 TokenCache.CredentialType.ACCESS_TOKEN, query={ 

2570 "client_id": self.client_id, 

2571 "environment": env, 

2572 "home_account_id": None, # These are mostly app-only tokens 

2573 })): 

2574 self.token_cache.remove_at(at) 

2575 # acquire_token_for_client() obtains no RTs, so we have no RT to remove 

2576 

2577 def acquire_token_on_behalf_of(self, user_assertion, scopes, claims_challenge=None, **kwargs): 

2578 """Acquires token using on-behalf-of (OBO) flow. 

2579 

2580 The current app is a middle-tier service which was called with a token 

2581 representing an end user. 

2582 The current app can use such token (a.k.a. a user assertion) to request 

2583 another token to access downstream web API, on behalf of that user. 

2584 See `detail docs here <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-on-behalf-of-flow>`_ . 

2585 

2586 The current middle-tier app has no user interaction to obtain consent. 

2587 See how to gain consent upfront for your middle-tier app from this article. 

2588 https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-on-behalf-of-flow#gaining-consent-for-the-middle-tier-application 

2589 

2590 :param str user_assertion: The incoming token already received by this app 

2591 :param list[str] scopes: Scopes required by downstream API (a resource). 

2592 :param claims_challenge: 

2593 The claims_challenge parameter requests specific claims requested by the resource provider 

2594 in the form of a claims_challenge directive in the www-authenticate header to be 

2595 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

2596 It is a string of a JSON object which contains lists of claims being requested from these locations. 

2597 

2598 :return: A dict representing the json response from Microsoft Entra: 

2599 

2600 - A successful response would contain "access_token" key, 

2601 - an error response would contain "error" and usually "error_description". 

2602 """ 

2603 telemetry_context = self._build_telemetry_context( 

2604 self.ACQUIRE_TOKEN_ON_BEHALF_OF_ID) 

2605 # The implementation is NOT based on Token Exchange (RFC 8693) 

2606 response = _clean_up(self.client.obtain_token_by_assertion( # bases on assertion RFC 7521 

2607 user_assertion, 

2608 self.client.GRANT_TYPE_JWT, # IDTs and AAD ATs are all JWTs 

2609 scope=self._decorate_scope(scopes), # Decoration is used for: 

2610 # 1. Explicitly requesting an RT, without relying on AAD default 

2611 # behavior, even though it currently still issues an RT. 

2612 # 2. Requesting an IDT (which would otherwise be unavailable) 

2613 # so that the calling app could use id_token_claims to implement 

2614 # their own cache mapping, which is likely needed in web apps. 

2615 data=dict( 

2616 kwargs.pop("data", {}), 

2617 requested_token_use="on_behalf_of", 

2618 claims=_merge_claims_challenge_and_capabilities( 

2619 self._client_capabilities, claims_challenge)), 

2620 headers=telemetry_context.generate_headers(), 

2621 # TBD: Expose a login_hint (or ccs_routing_hint) param for web app 

2622 **kwargs)) 

2623 if "access_token" in response: 

2624 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

2625 telemetry_context.update_telemetry(response) 

2626 return response 

2627 

2628 def acquire_token_by_user_federated_identity_credential( 

2629 self, scopes, assertion, username=None, user_object_id=None, 

2630 claims_challenge=None, **kwargs): 

2631 """Acquires a user-scoped token using the ``user_fic`` grant type. 

2632 

2633 This method exchanges a federated identity credential (typically an 

2634 agent instance token from Leg 2 of the agent identity protocol) for 

2635 a user-scoped access token, enabling an agent to act on behalf of 

2636 a specific user. 

2637 

2638 :param list[str] scopes: Scopes required by downstream API (a resource). 

2639 :param str assertion: 

2640 The federated identity credential token (e.g. the instance token 

2641 obtained from Leg 2 of the agent identity flow). 

2642 :param str username: 

2643 The target user's UPN (User Principal Name). 

2644 Mutually exclusive with ``user_object_id``. 

2645 :param str user_object_id: 

2646 The target user's Object ID. 

2647 Mutually exclusive with ``username``. 

2648 :param claims_challenge: 

2649 The claims_challenge parameter requests specific claims requested by the resource provider 

2650 in the form of a claims_challenge directive in the www-authenticate header to be 

2651 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

2652 It is a string of a JSON object which contains lists of claims being requested from these locations. 

2653 

2654 :return: A dict representing the json response from Microsoft Entra: 

2655 

2656 - A successful response would contain "access_token" key, 

2657 - an error response would contain "error" and usually "error_description". 

2658 """ 

2659 # Input validation 

2660 if not assertion: 

2661 raise ValueError("assertion is required and must be non-empty") 

2662 if not username and not user_object_id: 

2663 raise ValueError( 

2664 "Either username or user_object_id must be provided") 

2665 if username and user_object_id: 

2666 raise ValueError( 

2667 "username and user_object_id are mutually exclusive") 

2668 

2669 telemetry_context = self._build_telemetry_context( 

2670 self.ACQUIRE_TOKEN_BY_USER_FIC_ID) 

2671 headers = telemetry_context.generate_headers() 

2672 if username: 

2673 headers["X-AnchorMailbox"] = "upn:{}".format(username) 

2674 elif user_object_id: 

2675 headers["X-AnchorMailbox"] = "Oid:{}@{}".format( 

2676 user_object_id, self.authority.tenant) 

2677 response = _clean_up(self.client.obtain_token_by_user_fic( 

2678 scope=self._decorate_scope(scopes), 

2679 assertion=assertion, 

2680 username=username, 

2681 user_object_id=user_object_id, 

2682 headers=headers, 

2683 data=dict( 

2684 kwargs.pop("data", {}), 

2685 claims=_merge_claims_challenge_and_capabilities( 

2686 self._client_capabilities, claims_challenge)), 

2687 **kwargs)) 

2688 if "access_token" in response: 

2689 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

2690 telemetry_context.update_telemetry(response) 

2691 return response