Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/msal/application.py: 16%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

672 statements  

1import functools 

2import json 

3import time 

4import logging 

5import sys 

6import warnings 

7from threading import Lock 

8from typing import Optional # Needed in Python 3.7 & 3.8 

9from urllib.parse import urlparse 

10import os 

11 

12from .oauth2cli import Client, JwtAssertionCreator 

13from .oauth2cli.oidc import decode_part 

14from .authority import Authority, WORLD_WIDE 

15from .mex import send_request as mex_send_request 

16from .wstrust_request import send_request as wst_send_request 

17from .wstrust_response import * 

18from .token_cache import TokenCache, _get_username, _GRANT_TYPE_BROKER 

19import msal.telemetry 

20from .region import _detect_region 

21from .throttled_http_client import ThrottledHttpClient 

22from .cloudshell import _is_running_in_cloud_shell 

23from .sku import SKU, __version__ 

24from .oauth2cli.authcode import is_wsl 

25 

26 

27logger = logging.getLogger(__name__) 

28_AUTHORITY_TYPE_CLOUDSHELL = "CLOUDSHELL" 

29 

30def _init_broker(enable_pii_log): # Make it a function to allow mocking 

31 from . import broker # Trigger Broker's initialization, lazily 

32 if enable_pii_log: 

33 broker._enable_pii_log() 

34 

35def extract_certs(public_cert_content): 

36 # Parses raw public certificate file contents and returns a list of strings 

37 # Usage: headers = {"x5c": extract_certs(open("my_cert.pem").read())} 

38 public_certificates = re.findall( 

39 r'-----BEGIN CERTIFICATE-----(?P<cert_value>[^-]+)-----END CERTIFICATE-----', 

40 public_cert_content, re.I) 

41 if public_certificates: 

42 return [cert.strip() for cert in public_certificates] 

43 # The public cert tags are not found in the input, 

44 # let's make best effort to exclude a private key pem file. 

45 if "PRIVATE KEY" in public_cert_content: 

46 raise ValueError( 

47 "We expect your public key but detect a private key instead") 

48 return [public_cert_content.strip()] 

49 

50 

51def _merge_claims_challenge_and_capabilities(capabilities, claims_challenge): 

52 # Represent capabilities as {"access_token": {"xms_cc": {"values": capabilities}}} 

53 # and then merge/add it into incoming claims 

54 if not capabilities: 

55 return claims_challenge 

56 claims_dict = json.loads(claims_challenge) if claims_challenge else {} 

57 for key in ["access_token"]: # We could add "id_token" if we'd decide to 

58 claims_dict.setdefault(key, {}).update(xms_cc={"values": capabilities}) 

59 return json.dumps(claims_dict) 

60 

61 

62def _str2bytes(raw): 

63 # A conversion based on duck-typing rather than six.text_type 

64 try: 

65 return raw.encode(encoding="utf-8") 

66 except: 

67 return raw 

68 

69def _extract_cert_and_thumbprints(cert): 

70 # Cert concepts https://security.stackexchange.com/a/226758/125264 

71 from cryptography.hazmat.primitives import hashes, serialization 

72 cert_pem = cert.public_bytes( # Requires cryptography 1.0+ 

73 encoding=serialization.Encoding.PEM).decode() 

74 x5c = [ 

75 '\n'.join( 

76 cert_pem.splitlines() 

77 [1:-1] # Strip the "--- header ---" and "--- footer ---" 

78 ) 

79 ] 

80 # https://cryptography.io/en/latest/x509/reference/#x-509-certificate-object - Requires cryptography 0.7+ 

81 sha256_thumbprint = cert.fingerprint(hashes.SHA256()).hex() 

82 sha1_thumbprint = cert.fingerprint(hashes.SHA1()).hex() # CodeQL [SM02167] for legacy support such as ADFS 

83 return sha256_thumbprint, sha1_thumbprint, x5c 

84 

85def _parse_pfx(pfx_path, passphrase_bytes): 

86 # Cert concepts https://security.stackexchange.com/a/226758/125264 

87 from cryptography.hazmat.primitives.serialization import pkcs12 

88 with open(pfx_path, 'rb') as f: 

89 private_key, cert, _ = pkcs12.load_key_and_certificates( # cryptography 2.5+ 

90 # https://cryptography.io/en/latest/hazmat/primitives/asymmetric/serialization/#cryptography.hazmat.primitives.serialization.pkcs12.load_key_and_certificates 

91 f.read(), passphrase_bytes) 

92 if not (private_key and cert): 

93 raise ValueError("Your PFX file shall contain both private key and cert") 

94 sha256_thumbprint, sha1_thumbprint, x5c = _extract_cert_and_thumbprints(cert) 

95 return private_key, sha256_thumbprint, sha1_thumbprint, x5c 

96 

97 

98def _load_private_key_from_pem_str(private_key_pem_str, passphrase_bytes): 

99 from cryptography.hazmat.primitives import serialization 

100 from cryptography.hazmat.backends import default_backend 

101 return serialization.load_pem_private_key( # cryptography 0.6+ 

102 _str2bytes(private_key_pem_str), 

103 passphrase_bytes, 

104 backend=default_backend(), # It was a required param until 2020 

105 ) 

106 

107 

108def _pii_less_home_account_id(home_account_id): 

109 parts = home_account_id.split(".") # It could contain one or two parts 

110 parts[0] = "********" 

111 return ".".join(parts) 

112 

113 

114def _clean_up(result): 

115 if isinstance(result, dict): 

116 if "_msalruntime_telemetry" in result or "_msal_python_telemetry" in result: 

117 result["msal_telemetry"] = json.dumps({ # Telemetry as an opaque string 

118 "msalruntime_telemetry": result.get("_msalruntime_telemetry"), 

119 "msal_python_telemetry": result.get("_msal_python_telemetry"), 

120 }, separators=(",", ":")) 

121 return_value = { 

122 k: result[k] for k in result 

123 if k != "refresh_in" # MSAL handled refresh_in, customers need not 

124 and not k.startswith('_') # Skim internal properties 

125 } 

126 if "refresh_in" in result: # To encourage proactive refresh 

127 return_value["refresh_on"] = int(time.time() + result["refresh_in"]) 

128 return return_value 

129 return result # It could be None 

130 

131 

132def _preferred_browser(): 

133 """Register Edge and return a name suitable for subsequent webbrowser.get(...) 

134 when appropriate. Otherwise return None. 

135 """ 

136 # On Linux, only Edge will provide device-based Conditional Access support 

137 if sys.platform != "linux": # On other platforms, we have no browser preference 

138 return None 

139 browser_path = "/usr/bin/microsoft-edge" # Use a full path owned by sys admin 

140 # Note: /usr/bin/microsoft-edge, /usr/bin/microsoft-edge-stable, etc. 

141 # are symlinks that point to the actual binaries which are found under 

142 # /opt/microsoft/msedge/msedge or /opt/microsoft/msedge-beta/msedge. 

143 # Either method can be used to detect an Edge installation. 

144 user_has_no_preference = "BROWSER" not in os.environ 

145 user_wont_mind_edge = "microsoft-edge" in os.environ.get("BROWSER", "") # Note: 

146 # BROWSER could contain "microsoft-edge" or "/path/to/microsoft-edge". 

147 # Python documentation (https://docs.python.org/3/library/webbrowser.html) 

148 # does not document the name being implicitly register, 

149 # so there is no public API to know whether the ENV VAR browser would work. 

150 # Therefore, we would not bother examine the env var browser's type. 

151 # We would just register our own Edge instance. 

152 if (user_has_no_preference or user_wont_mind_edge) and os.path.exists(browser_path): 

153 try: 

154 import webbrowser # Lazy import. Some distro may not have this. 

155 browser_name = "msal-edge" # Avoid popular name "microsoft-edge" 

156 # otherwise `BROWSER="microsoft-edge"; webbrowser.get("microsoft-edge")` 

157 # would return a GenericBrowser instance which won't work. 

158 try: 

159 registration_available = isinstance( 

160 webbrowser.get(browser_name), webbrowser.BackgroundBrowser) 

161 except webbrowser.Error: 

162 registration_available = False 

163 if not registration_available: 

164 logger.debug("Register %s with %s", browser_name, browser_path) 

165 # By registering our own browser instance with our own name, 

166 # rather than populating a process-wide BROWSER enn var, 

167 # this approach does not have side effect on non-MSAL code path. 

168 webbrowser.register( # Even double-register happens to work fine 

169 browser_name, None, webbrowser.BackgroundBrowser(browser_path)) 

170 return browser_name 

171 except ImportError: 

172 pass # We may still proceed 

173 return None 

174 

175def _is_ssh_cert_or_pop_request(token_type, auth_scheme) -> bool: 

176 return token_type == "ssh-cert" or token_type == "pop" or isinstance(auth_scheme, msal.auth_scheme.PopAuthScheme) 

177 

178class _ClientWithCcsRoutingInfo(Client): 

179 

180 def initiate_auth_code_flow(self, **kwargs): 

181 if kwargs.get("login_hint"): # eSTS could have utilized this as-is, but nope 

182 kwargs["X-AnchorMailbox"] = "UPN:%s" % kwargs["login_hint"] 

183 return super(_ClientWithCcsRoutingInfo, self).initiate_auth_code_flow( 

184 client_info=1, # To be used as CSS Routing info 

185 **kwargs) 

186 

187 def obtain_token_by_auth_code_flow( 

188 self, auth_code_flow, auth_response, **kwargs): 

189 # Note: the obtain_token_by_browser() is also covered by this 

190 assert isinstance(auth_code_flow, dict) and isinstance(auth_response, dict) 

191 headers = kwargs.pop("headers", {}) 

192 client_info = json.loads( 

193 decode_part(auth_response["client_info"]) 

194 ) if auth_response.get("client_info") else {} 

195 if "uid" in client_info and "utid" in client_info: 

196 # Note: The value of X-AnchorMailbox is also case-insensitive 

197 headers["X-AnchorMailbox"] = "Oid:{uid}@{utid}".format(**client_info) 

198 return super(_ClientWithCcsRoutingInfo, self).obtain_token_by_auth_code_flow( 

199 auth_code_flow, auth_response, headers=headers, **kwargs) 

200 

201 def obtain_token_by_username_password(self, username, password, **kwargs): 

202 headers = kwargs.pop("headers", {}) 

203 headers["X-AnchorMailbox"] = "upn:{}".format(username) 

204 return super(_ClientWithCcsRoutingInfo, self).obtain_token_by_username_password( 

205 username, password, headers=headers, **kwargs) 

206 

207 

208def _msal_extension_check(): 

209 # Can't run this in module or class level otherwise you'll get circular import error 

210 try: 

211 from msal_extensions import __version__ as v 

212 major, minor, _ = v.split(".", maxsplit=3) 

213 if not (int(major) >= 1 and int(minor) >= 2): 

214 warnings.warn( 

215 "Please upgrade msal-extensions. " 

216 "Only msal-extensions 1.2+ can work with msal 1.30+") 

217 except ImportError: 

218 pass # The optional msal_extensions is not installed. Business as usual. 

219 except ValueError: 

220 logger.exception(f"msal_extensions version {v} not in major.minor.patch format") 

221 except: 

222 logger.exception( 

223 "Unable to import msal_extensions during an optional check. " 

224 "This exception can be safely ignored." 

225 ) 

226 

227 

228class ClientApplication(object): 

229 """You do not usually directly use this class. Use its subclasses instead: 

230 :class:`PublicClientApplication` and :class:`ConfidentialClientApplication`. 

231 """ 

232 ACQUIRE_TOKEN_SILENT_ID = "84" 

233 ACQUIRE_TOKEN_BY_REFRESH_TOKEN = "85" 

234 ACQUIRE_TOKEN_BY_USERNAME_PASSWORD_ID = "301" 

235 ACQUIRE_TOKEN_ON_BEHALF_OF_ID = "523" 

236 ACQUIRE_TOKEN_BY_DEVICE_FLOW_ID = "622" 

237 ACQUIRE_TOKEN_FOR_CLIENT_ID = "730" 

238 ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID = "832" 

239 ACQUIRE_TOKEN_INTERACTIVE = "169" 

240 GET_ACCOUNTS_ID = "902" 

241 REMOVE_ACCOUNT_ID = "903" 

242 

243 ATTEMPT_REGION_DISCOVERY = True # "TryAutoDetect" 

244 DISABLE_MSAL_FORCE_REGION = False # Used in azure_region to disable MSAL_FORCE_REGION behavior 

245 _TOKEN_SOURCE = "token_source" 

246 _TOKEN_SOURCE_IDP = "identity_provider" 

247 _TOKEN_SOURCE_CACHE = "cache" 

248 _TOKEN_SOURCE_BROKER = "broker" 

249 

250 _enable_broker = False 

251 _AUTH_SCHEME_UNSUPPORTED = ( 

252 "auth_scheme is currently only available from broker. " 

253 "You can enable broker by following these instructions. " 

254 "https://msal-python.readthedocs.io/en/latest/#publicclientapplication") 

255 

256 def __init__( 

257 self, client_id, 

258 client_credential=None, authority=None, validate_authority=True, 

259 token_cache=None, 

260 http_client=None, 

261 verify=True, proxies=None, timeout=None, 

262 client_claims=None, app_name=None, app_version=None, 

263 client_capabilities=None, 

264 azure_region=None, # Note: We choose to add this param in this base class, 

265 # despite it is currently only needed by ConfidentialClientApplication. 

266 # This way, it holds the same positional param place for PCA, 

267 # when we would eventually want to add this feature to PCA in future. 

268 exclude_scopes=None, 

269 http_cache=None, 

270 instance_discovery=None, 

271 allow_broker=None, 

272 enable_pii_log=None, 

273 oidc_authority=None, 

274 ): 

275 """Create an instance of application. 

276 

277 :param str client_id: Your app has a client_id after you register it on Microsoft Entra admin center. 

278 

279 :param client_credential: 

280 For :class:`PublicClientApplication`, you use `None` here. 

281 

282 For :class:`ConfidentialClientApplication`, 

283 it supports many different input formats for different scenarios. 

284 

285 .. admonition:: Support using a client secret. 

286 

287 Just feed in a string, such as ``"your client secret"``. 

288 

289 .. admonition:: Support using a certificate in X.509 (.pem) format 

290 

291 Deprecated because it uses SHA-1 thumbprint, 

292 unless you are still using ADFS which supports SHA-1 thumbprint only. 

293 Please use the .pfx option documented later in this page. 

294 

295 Feed in a dict in this form:: 

296 

297 { 

298 "private_key": "...-----BEGIN PRIVATE KEY-----... in PEM format", 

299 "thumbprint": "An SHA-1 thumbprint such as A1B2C3D4E5F6..." 

300 "Changed in version 1.35.0, if thumbprint is absent" 

301 "and a public_certificate is present, MSAL will" 

302 "automatically calculate an SHA-256 thumbprint instead.", 

303 "passphrase": "Needed if the private_key is encrypted (Added in version 1.6.0)", 

304 "public_certificate": "...-----BEGIN CERTIFICATE-----...", # Needed if you use Subject Name/Issuer auth. Added in version 0.5.0. 

305 } 

306 

307 MSAL Python requires a "private_key" in PEM format. 

308 If your cert is in PKCS12 (.pfx) format, 

309 you can convert it to X.509 (.pem) format, 

310 by ``openssl pkcs12 -in file.pfx -out file.pem -nodes``. 

311 

312 The thumbprint is available in your app's registration in Azure Portal. 

313 Alternatively, you can `calculate the thumbprint <https://github.com/Azure/azure-sdk-for-python/blob/07d10639d7e47f4852eaeb74aef5d569db499d6e/sdk/identity/azure-identity/azure/identity/_credentials/certificate.py#L94-L97>`_. 

314 

315 ``public_certificate`` (optional) is public key certificate 

316 which will be sent through 'x5c' JWT header. 

317 This is useful when you use `Subject Name/Issuer Authentication 

318 <https://github.com/AzureAD/microsoft-authentication-library-for-python/issues/60>`_ 

319 which is an approach to allow easier certificate rotation. 

320 Per `specs <https://tools.ietf.org/html/rfc7515#section-4.1.6>`_, 

321 "the certificate containing 

322 the public key corresponding to the key used to digitally sign the 

323 JWS MUST be the first certificate. This MAY be followed by 

324 additional certificates, with each subsequent certificate being the 

325 one used to certify the previous one." 

326 However, your certificate's issuer may use a different order. 

327 So, if your attempt ends up with an error AADSTS700027 - 

328 "The provided signature value did not match the expected signature value", 

329 you may try use only the leaf cert (in PEM/str format) instead. 

330 

331 .. admonition:: Supporting raw assertion obtained from elsewhere 

332 

333 *Added in version 1.13.0*: 

334 It can also be a completely pre-signed assertion that you've assembled yourself. 

335 Simply pass a container containing only the key "client_assertion", like this:: 

336 

337 { 

338 "client_assertion": "...a JWT with claims aud, exp, iss, jti, nbf, and sub..." 

339 } 

340 

341 .. admonition:: Supporting reading client certificates from PFX files 

342 

343 This usage will automatically use SHA-256 thumbprint of the certificate. 

344 

345 *Added in version 1.29.0*: 

346 Feed in a dictionary containing the path to a PFX file:: 

347 

348 { 

349 "private_key_pfx_path": "/path/to/your.pfx", # Added in version 1.29.0 

350 "public_certificate": True, # Only needed if you use Subject Name/Issuer auth. Added in version 1.30.0 

351 "passphrase": "Passphrase if the private_key is encrypted (Optional)", 

352 } 

353 

354 The following command will generate a .pfx file from your .key and .pem file:: 

355 

356 openssl pkcs12 -export -out certificate.pfx -inkey privateKey.key -in certificate.pem 

357 

358 `Subject Name/Issuer Auth 

359 <https://github.com/AzureAD/microsoft-authentication-library-for-python/issues/60>`_ 

360 is an approach to allow easier certificate rotation. 

361 If your .pfx file contains both the private key and public cert, 

362 you can opt in for Subject Name/Issuer Auth by setting "public_certificate" to ``True``. 

363 

364 :type client_credential: Union[dict, str, None] 

365 

366 :param dict client_claims: 

367 *Added in version 0.5.0*: 

368 It is a dictionary of extra claims that would be signed by 

369 by this :class:`ConfidentialClientApplication` 's private key. 

370 For example, you can use {"client_ip": "x.x.x.x"}. 

371 You may also override any of the following default claims:: 

372 

373 { 

374 "aud": the_token_endpoint, 

375 "iss": self.client_id, 

376 "sub": same_as_issuer, 

377 "exp": now + 10_min, 

378 "iat": now, 

379 "jti": a_random_uuid 

380 } 

381 

382 :param str authority: 

383 A URL that identifies a token authority. It should be of the format 

384 ``https://login.microsoftonline.com/your_tenant`` 

385 By default, we will use ``https://login.microsoftonline.com/common`` 

386 

387 *Changed in version 1.17*: you can also use predefined constant 

388 and a builder like this:: 

389 

390 from msal.authority import ( 

391 AuthorityBuilder, 

392 AZURE_US_GOVERNMENT, AZURE_CHINA, AZURE_PUBLIC) 

393 my_authority = AuthorityBuilder(AZURE_PUBLIC, "contoso.onmicrosoft.com") 

394 # Now you get an equivalent of 

395 # "https://login.microsoftonline.com/contoso.onmicrosoft.com" 

396 

397 # You can feed such an authority to msal's ClientApplication 

398 from msal import PublicClientApplication 

399 app = PublicClientApplication("my_client_id", authority=my_authority, ...) 

400 

401 :param bool validate_authority: (optional) Turns authority validation 

402 on or off. This parameter default to true. 

403 :param TokenCache token_cache: 

404 Sets the token cache used by this ClientApplication instance. 

405 By default, an in-memory cache will be created and used. 

406 :param http_client: (optional) 

407 Your implementation of abstract class HttpClient <msal.oauth2cli.http.http_client> 

408 Defaults to a requests session instance. 

409 Since MSAL 1.11.0, the default session would be configured 

410 to attempt one retry on connection error. 

411 If you are providing your own http_client, 

412 it will be your http_client's duty to decide whether to perform retry. 

413 

414 :param verify: (optional) 

415 It will be passed to the 

416 `verify parameter in the underlying requests library 

417 <http://docs.python-requests.org/en/v2.9.1/user/advanced/#ssl-cert-verification>`_ 

418 This does not apply if you have chosen to pass your own Http client 

419 :param proxies: (optional) 

420 It will be passed to the 

421 `proxies parameter in the underlying requests library 

422 <http://docs.python-requests.org/en/v2.9.1/user/advanced/#proxies>`_ 

423 This does not apply if you have chosen to pass your own Http client 

424 :param timeout: (optional) 

425 It will be passed to the 

426 `timeout parameter in the underlying requests library 

427 <http://docs.python-requests.org/en/v2.9.1/user/advanced/#timeouts>`_ 

428 This does not apply if you have chosen to pass your own Http client 

429 :param app_name: (optional) 

430 You can provide your application name for Microsoft telemetry purposes. 

431 Default value is None, means it will not be passed to Microsoft. 

432 :param app_version: (optional) 

433 You can provide your application version for Microsoft telemetry purposes. 

434 Default value is None, means it will not be passed to Microsoft. 

435 :param list[str] client_capabilities: (optional) 

436 Allows configuration of one or more client capabilities, e.g. ["CP1"]. 

437 

438 Client capability is meant to inform the Microsoft identity platform 

439 (STS) what this client is capable for, 

440 so STS can decide to turn on certain features. 

441 For example, if client is capable to handle *claims challenge*, 

442 STS may issue 

443 `Continuous Access Evaluation (CAE) <https://learn.microsoft.com/entra/identity/conditional-access/concept-continuous-access-evaluation>`_ 

444 access tokens to resources, 

445 knowing that when the resource emits a *claims challenge* 

446 the client will be able to handle those challenges. 

447 

448 Implementation details: 

449 Client capability is implemented using "claims" parameter on the wire, 

450 for now. 

451 MSAL will combine them into 

452 `claims parameter <https://openid.net/specs/openid-connect-core-1_0-final.html#ClaimsParameter>`_ 

453 which you will later provide via one of the acquire-token request. 

454 

455 :param str azure_region: (optional) 

456 Instructs MSAL to use the Entra regional token service. This legacy feature is only available to 

457 first-party applications. Only ``acquire_token_for_client()`` is supported. 

458 

459 Supports 4 values: 

460 

461 1. ``azure_region=None`` - This default value means no region is configured. 

462 MSAL will use the region defined in env var ``MSAL_FORCE_REGION``. 

463 2. ``azure_region="some_region"`` - meaning the specified region is used. 

464 3. ``azure_region=True`` - meaning 

465 MSAL will try to auto-detect the region. This is not recommended. 

466 4. ``azure_region=False`` - meaning MSAL will use no region. 

467 

468 .. note:: 

469 Region auto-discovery has been tested on VMs and on Azure Functions. It is unreliable. 

470 Applications using this option should configure a short timeout. 

471 

472 For more details and for the values of the region string 

473 see https://learn.microsoft.com/entra/msal/dotnet/resources/region-discovery-troubleshooting 

474 

475 New in version 1.12.0. 

476 

477 :param list[str] exclude_scopes: (optional) 

478 Historically MSAL hardcodes `offline_access` scope, 

479 which would allow your app to have prolonged access to user's data. 

480 If that is unnecessary or undesirable for your app, 

481 now you can use this parameter to supply an exclusion list of scopes, 

482 such as ``exclude_scopes = ["offline_access"]``. 

483 

484 :param dict http_cache: 

485 MSAL has long been caching tokens in the ``token_cache``. 

486 Recently, MSAL also introduced a concept of ``http_cache``, 

487 by automatically caching some finite amount of non-token http responses, 

488 so that *long-lived* 

489 ``PublicClientApplication`` and ``ConfidentialClientApplication`` 

490 would be more performant and responsive in some situations. 

491 

492 This ``http_cache`` parameter accepts any dict-like object. 

493 If not provided, MSAL will use an in-memory dict. 

494 

495 If your app is a command-line app (CLI), 

496 you would want to persist your http_cache across different CLI runs. 

497 The persisted file's format may change due to, but not limited to, 

498 `unstable protocol <https://docs.python.org/3/library/pickle.html#data-stream-format>`_, 

499 so your implementation shall tolerate unexpected loading errors. 

500 The following recipe shows a way to do so:: 

501 

502 # Just add the following lines at the beginning of your CLI script 

503 import sys, atexit, pickle, logging 

504 http_cache_filename = sys.argv[0] + ".http_cache" 

505 try: 

506 with open(http_cache_filename, "rb") as f: 

507 persisted_http_cache = pickle.load(f) # Take a snapshot 

508 except ( 

509 FileNotFoundError, # Or IOError in Python 2 

510 pickle.UnpicklingError, # A corrupted http cache file 

511 AttributeError, # Cache created by a different version of MSAL 

512 ): 

513 persisted_http_cache = {} # Recover by starting afresh 

514 except: # Unexpected exceptions 

515 logging.exception("You may want to debug this") 

516 persisted_http_cache = {} # Recover by starting afresh 

517 atexit.register(lambda: pickle.dump( 

518 # When exit, flush it back to the file. 

519 # It may occasionally overwrite another process's concurrent write, 

520 # but that is fine. Subsequent runs will reach eventual consistency. 

521 persisted_http_cache, open(http_cache_file, "wb"))) 

522 

523 # And then you can implement your app as you normally would 

524 app = msal.PublicClientApplication( 

525 "your_client_id", 

526 ..., 

527 http_cache=persisted_http_cache, # Utilize persisted_http_cache 

528 ..., 

529 #token_cache=..., # You may combine the old token_cache trick 

530 # Please refer to token_cache recipe at 

531 # https://msal-python.readthedocs.io/en/latest/#msal.SerializableTokenCache 

532 ) 

533 app.acquire_token_interactive(["your", "scope"], ...) 

534 

535 Content inside ``http_cache`` are cheap to obtain. 

536 There is no need to share them among different apps. 

537 

538 Content inside ``http_cache`` will contain no tokens nor 

539 Personally Identifiable Information (PII). Encryption is unnecessary. 

540 

541 New in version 1.16.0. 

542 

543 :param boolean instance_discovery: 

544 Historically, MSAL would connect to a central endpoint located at 

545 ``https://login.microsoftonline.com`` to acquire some metadata, 

546 especially when using an unfamiliar authority. 

547 This behavior is known as Instance Discovery. 

548 

549 This parameter defaults to None, which enables the Instance Discovery. 

550 

551 If you know some authorities which you allow MSAL to operate with as-is, 

552 without involving any Instance Discovery, the recommended pattern is:: 

553 

554 known_authorities = frozenset([ # Treat your known authorities as const 

555 "https://contoso.com/adfs", "https://login.azs/foo"]) 

556 ... 

557 authority = "https://contoso.com/adfs" # Assuming your app will use this 

558 app1 = PublicClientApplication( 

559 "client_id", 

560 authority=authority, 

561 # Conditionally disable Instance Discovery for known authorities 

562 instance_discovery=authority not in known_authorities, 

563 ) 

564 

565 If you do not know some authorities beforehand, 

566 yet still want MSAL to accept any authority that you will provide, 

567 you can use a ``False`` to unconditionally disable Instance Discovery. 

568 

569 New in version 1.19.0. 

570 

571 :param boolean allow_broker: 

572 Deprecated. Please use ``enable_broker_on_windows`` instead. 

573 

574 :param boolean enable_pii_log: 

575 When enabled, logs may include PII (Personal Identifiable Information). 

576 This can be useful in troubleshooting broker behaviors. 

577 The default behavior is False. 

578 

579 New in version 1.24.0. 

580 

581 :param str oidc_authority: 

582 *Added in version 1.28.0*: 

583 It is a URL that identifies an OpenID Connect (OIDC) authority of 

584 the format ``https://contoso.com/tenant``. 

585 MSAL will append ".well-known/openid-configuration" to the authority 

586 and retrieve the OIDC metadata from there, to figure out the endpoints. 

587 

588 Note: Broker will NOT be used for OIDC authority. 

589 """ 

590 self.client_id = client_id 

591 self.client_credential = client_credential 

592 self.client_claims = client_claims 

593 self._client_capabilities = client_capabilities 

594 self._instance_discovery = instance_discovery 

595 

596 if exclude_scopes and not isinstance(exclude_scopes, list): 

597 raise ValueError( 

598 "Invalid exclude_scopes={}. It need to be a list of strings.".format( 

599 repr(exclude_scopes))) 

600 self._exclude_scopes = frozenset(exclude_scopes or []) 

601 if "openid" in self._exclude_scopes: 

602 raise ValueError( 

603 'Invalid exclude_scopes={}. You can not opt out "openid" scope'.format( 

604 repr(exclude_scopes))) 

605 

606 if http_client: 

607 self.http_client = http_client 

608 else: 

609 import requests # Lazy load 

610 

611 self.http_client = requests.Session() 

612 self.http_client.verify = verify 

613 self.http_client.proxies = proxies 

614 # Requests, does not support session - wide timeout 

615 # But you can patch that (https://github.com/psf/requests/issues/3341): 

616 self.http_client.request = functools.partial( 

617 self.http_client.request, timeout=timeout) 

618 

619 # Enable a minimal retry. Better than nothing. 

620 # https://github.com/psf/requests/blob/v2.25.1/requests/adapters.py#L94-L108 

621 a = requests.adapters.HTTPAdapter(max_retries=1) 

622 self.http_client.mount("http://", a) 

623 self.http_client.mount("https://", a) 

624 self.http_client = ThrottledHttpClient( 

625 self.http_client, 

626 http_cache=http_cache, 

627 default_throttle_time=60 

628 # The default value 60 was recommended mainly for PCA at the end of 

629 # https://identitydivision.visualstudio.com/devex/_git/AuthLibrariesApiReview?version=GBdev&path=%2FService%20protection%2FIntial%20set%20of%20protection%20measures.md&_a=preview 

630 if isinstance(self, PublicClientApplication) else 5, 

631 ) 

632 

633 self.app_name = app_name 

634 self.app_version = app_version 

635 

636 # Here the self.authority will not be the same type as authority in input 

637 if oidc_authority and authority: 

638 raise ValueError("You can not provide both authority and oidc_authority") 

639 if isinstance(authority, str) and urlparse(authority).path.startswith( 

640 "/dstsv2"): # dSTS authority's path always starts with "/dstsv2" 

641 oidc_authority = authority # So we treat it as if an oidc_authority 

642 try: 

643 authority_to_use = authority or "https://{}/common/".format(WORLD_WIDE) 

644 self.authority = Authority( 

645 authority_to_use, 

646 self.http_client, 

647 validate_authority=validate_authority, 

648 instance_discovery=self._instance_discovery, 

649 oidc_authority_url=oidc_authority, 

650 ) 

651 except ValueError: # Those are explicit authority validation errors 

652 raise 

653 except Exception: # The rest are typically connection errors 

654 if validate_authority and not oidc_authority and ( 

655 azure_region # Opted in to use region 

656 or (azure_region is None and os.getenv("MSAL_FORCE_REGION")) # Will use region 

657 ): 

658 # Since caller opts in to use region, here we tolerate connection 

659 # errors happened during authority validation at non-region endpoint 

660 self.authority = Authority( 

661 authority_to_use, 

662 self.http_client, 

663 instance_discovery=False, 

664 ) 

665 else: 

666 raise 

667 

668 self._decide_broker(allow_broker, enable_pii_log) 

669 self.token_cache = token_cache or TokenCache() 

670 self._region_configured = azure_region 

671 self._region_detected = None 

672 self.client, self._regional_client = self._build_client( 

673 client_credential, self.authority) 

674 self.authority_groups = None 

675 self._telemetry_buffer = {} 

676 self._telemetry_lock = Lock() 

677 _msal_extension_check() 

678 

679 

680 def _decide_broker(self, allow_broker, enable_pii_log): 

681 is_confidential_app = self.client_credential or isinstance( 

682 self, ConfidentialClientApplication) 

683 if is_confidential_app and allow_broker: 

684 raise ValueError("allow_broker=True is only supported in PublicClientApplication") 

685 # Historically, we chose to support ClientApplication("client_id", allow_broker=True) 

686 if allow_broker: 

687 warnings.warn( 

688 "allow_broker is deprecated. " 

689 "Please use PublicClientApplication(..., " 

690 "enable_broker_on_windows=True, " 

691 # No need to mention non-Windows platforms, because allow_broker is only for Windows 

692 "...)", 

693 DeprecationWarning) 

694 opted_in_for_broker = ( 

695 self._enable_broker # True means Opted-in from PCA 

696 or ( 

697 # When we started the broker project on Windows platform, 

698 # the allow_broker was meant to be cross-platform. Now we realize 

699 # that other platforms have different redirect_uri requirements, 

700 # so the old allow_broker is deprecated and will only for Windows. 

701 allow_broker and sys.platform == "win32") 

702 ) 

703 self._enable_broker = ( # This same variable will also store the state 

704 opted_in_for_broker 

705 and not is_confidential_app 

706 and not self.authority.is_adfs 

707 and not self.authority._is_b2c 

708 ) 

709 if self._enable_broker: 

710 try: 

711 _init_broker(enable_pii_log) 

712 except RuntimeError: 

713 self._enable_broker = False 

714 logger.warning( # It is common on Mac and Linux where broker is not built-in 

715 "Broker is unavailable on this platform. " 

716 "We will fallback to non-broker.") 

717 logger.debug("Broker enabled? %s", self._enable_broker) 

718 

719 def is_pop_supported(self): 

720 """Returns True if this client supports Proof-of-Possession Access Token.""" 

721 return self._enable_broker and sys.platform in ("win32", "darwin") 

722 

723 def _decorate_scope( 

724 self, scopes, 

725 reserved_scope=frozenset(['openid', 'profile', 'offline_access'])): 

726 if not isinstance(scopes, (list, set, tuple)): 

727 raise ValueError("The input scopes should be a list, tuple, or set") 

728 scope_set = set(scopes) # Input scopes is typically a list. Copy it to a set. 

729 if scope_set & reserved_scope: 

730 # These scopes are reserved for the API to provide good experience. 

731 # We could make the developer pass these and then if they do they will 

732 # come back asking why they don't see refresh token or user information. 

733 raise ValueError( 

734 """You cannot use any scope value that is reserved. 

735Your input: {} 

736The reserved list: {}""".format(list(scope_set), list(reserved_scope))) 

737 raise ValueError( 

738 "You cannot use any scope value that is in this reserved list: {}".format( 

739 list(reserved_scope))) 

740 

741 # client_id can also be used as a scope in B2C 

742 decorated = scope_set | reserved_scope 

743 decorated -= self._exclude_scopes 

744 return list(decorated) 

745 

746 def _build_telemetry_context( 

747 self, api_id, correlation_id=None, refresh_reason=None): 

748 return msal.telemetry._TelemetryContext( 

749 self._telemetry_buffer, self._telemetry_lock, api_id, 

750 correlation_id=correlation_id, refresh_reason=refresh_reason) 

751 

752 def _get_regional_authority(self, central_authority) -> Optional[Authority]: 

753 if self._region_configured is False: # User opts out of ESTS-R 

754 return None # Short circuit to completely bypass region detection 

755 if self._region_configured is None: # User did not make an ESTS-R choice 

756 self._region_configured = os.getenv("MSAL_FORCE_REGION") or None 

757 self._region_detected = self._region_detected or _detect_region( 

758 self.http_client if self._region_configured is not None else None) 

759 if (self._region_configured != self.ATTEMPT_REGION_DISCOVERY 

760 and self._region_configured != self._region_detected): 

761 logger.warning('Region configured ({}) != region detected ({})'.format( 

762 repr(self._region_configured), repr(self._region_detected))) 

763 region_to_use = ( 

764 self._region_detected 

765 if self._region_configured == self.ATTEMPT_REGION_DISCOVERY 

766 else self._region_configured) # It will retain the None i.e. opted out 

767 logger.debug('Region to be used: {}'.format(repr(region_to_use))) 

768 if region_to_use: 

769 regional_host = ("{}.login.microsoft.com".format(region_to_use) 

770 if central_authority.instance in ( 

771 # The list came from point 3 of the algorithm section in this internal doc 

772 # https://identitydivision.visualstudio.com/DevEx/_git/AuthLibrariesApiReview?path=/PinAuthToRegion/AAD%20SDK%20Proposal%20to%20Pin%20Auth%20to%20region.md&anchor=algorithm&_a=preview 

773 "login.microsoftonline.com", 

774 "login.microsoft.com", 

775 "login.windows.net", 

776 "sts.windows.net", 

777 ) 

778 else "{}.{}".format(region_to_use, central_authority.instance)) 

779 return Authority( # The central_authority has already been validated 

780 "https://{}/{}".format(regional_host, central_authority.tenant), 

781 self.http_client, 

782 instance_discovery=False, 

783 ) 

784 return None 

785 

786 def _build_client(self, client_credential, authority, skip_regional_client=False): 

787 client_assertion = None 

788 client_assertion_type = None 

789 default_headers = { 

790 "x-client-sku": SKU, "x-client-ver": __version__, 

791 "x-client-os": sys.platform, 

792 "x-ms-lib-capability": "retry-after, h429", 

793 } 

794 if self.app_name: 

795 default_headers['x-app-name'] = self.app_name 

796 if self.app_version: 

797 default_headers['x-app-ver'] = self.app_version 

798 default_body = {"client_info": 1} 

799 if isinstance(client_credential, dict): 

800 client_assertion_type = Client.CLIENT_ASSERTION_TYPE_JWT 

801 # Use client_credential.get("...") rather than "..." in client_credential 

802 # so that we can ignore an empty string came from an empty ENV VAR. 

803 if client_credential.get("client_assertion"): 

804 client_assertion = client_credential['client_assertion'] 

805 else: 

806 headers = {} 

807 sha1_thumbprint = sha256_thumbprint = None 

808 passphrase_bytes = _str2bytes( 

809 client_credential["passphrase"] 

810 ) if client_credential.get("passphrase") else None 

811 if client_credential.get("private_key_pfx_path"): 

812 private_key, sha256_thumbprint, sha1_thumbprint, x5c = _parse_pfx( 

813 client_credential["private_key_pfx_path"], 

814 passphrase_bytes) 

815 if client_credential.get("public_certificate") is True and x5c: 

816 headers["x5c"] = x5c 

817 elif client_credential.get("private_key"): # PEM blob 

818 private_key = ( # handles both encrypted and unencrypted 

819 _load_private_key_from_pem_str( 

820 client_credential['private_key'], passphrase_bytes) 

821 if passphrase_bytes 

822 else client_credential['private_key'] 

823 ) 

824 

825 # Determine thumbprints based on what's provided 

826 if client_credential.get("thumbprint"): 

827 # User provided a thumbprint - use it as SHA-1 (legacy/manual approach) 

828 sha1_thumbprint = client_credential["thumbprint"] 

829 sha256_thumbprint = None 

830 elif isinstance(client_credential.get('public_certificate'), str): 

831 # No thumbprint provided, but we have a certificate to calculate thumbprints 

832 from cryptography import x509 

833 cert = x509.load_pem_x509_certificate( 

834 _str2bytes(client_credential['public_certificate'])) 

835 sha256_thumbprint, sha1_thumbprint, headers["x5c"] = ( 

836 _extract_cert_and_thumbprints(cert)) 

837 else: 

838 raise ValueError( 

839 "You must provide either 'thumbprint' or 'public_certificate' " 

840 "from which the thumbprint can be calculated.") 

841 else: 

842 raise ValueError( 

843 "client_credential needs to follow this format " 

844 "https://msal-python.readthedocs.io/en/latest/#msal.ClientApplication.params.client_credential") 

845 if ("x5c" not in headers # So the .pfx file contains no certificate 

846 and isinstance(client_credential.get('public_certificate'), str) 

847 ): # Then we treat the public_certificate value as PEM content 

848 headers["x5c"] = extract_certs(client_credential['public_certificate']) 

849 if sha256_thumbprint and not authority.is_adfs: 

850 assertion_params = { 

851 "algorithm": "PS256", "sha256_thumbprint": sha256_thumbprint, 

852 } 

853 else: # Fall back 

854 if not sha1_thumbprint: 

855 raise ValueError("You shall provide a thumbprint in SHA1.") 

856 assertion_params = { 

857 "algorithm": "RS256", "sha1_thumbprint": sha1_thumbprint, 

858 } 

859 assertion = JwtAssertionCreator( 

860 private_key, headers=headers, **assertion_params) 

861 client_assertion = assertion.create_regenerative_assertion( 

862 audience=authority.token_endpoint, issuer=self.client_id, 

863 additional_claims=self.client_claims or {}) 

864 else: 

865 default_body['client_secret'] = client_credential 

866 central_configuration = { 

867 "authorization_endpoint": authority.authorization_endpoint, 

868 "token_endpoint": authority.token_endpoint, 

869 "device_authorization_endpoint": authority.device_authorization_endpoint, 

870 } 

871 central_client = _ClientWithCcsRoutingInfo( 

872 central_configuration, 

873 self.client_id, 

874 http_client=self.http_client, 

875 default_headers=default_headers, 

876 default_body=default_body, 

877 client_assertion=client_assertion, 

878 client_assertion_type=client_assertion_type, 

879 on_obtaining_tokens=lambda event: self.token_cache.add(dict( 

880 event, environment=authority.instance)), 

881 on_removing_rt=self.token_cache.remove_rt, 

882 on_updating_rt=self.token_cache.update_rt) 

883 

884 regional_client = None 

885 if (client_credential # Currently regional endpoint only serves some CCA flows 

886 and not skip_regional_client): 

887 regional_authority = self._get_regional_authority(authority) 

888 if regional_authority: 

889 regional_configuration = { 

890 "authorization_endpoint": regional_authority.authorization_endpoint, 

891 "token_endpoint": regional_authority.token_endpoint, 

892 "device_authorization_endpoint": 

893 regional_authority.device_authorization_endpoint, 

894 } 

895 regional_client = _ClientWithCcsRoutingInfo( 

896 regional_configuration, 

897 self.client_id, 

898 http_client=self.http_client, 

899 default_headers=default_headers, 

900 default_body=default_body, 

901 client_assertion=client_assertion, 

902 client_assertion_type=client_assertion_type, 

903 on_obtaining_tokens=lambda event: self.token_cache.add(dict( 

904 event, environment=authority.instance)), 

905 on_removing_rt=self.token_cache.remove_rt, 

906 on_updating_rt=self.token_cache.update_rt) 

907 return central_client, regional_client 

908 

909 def initiate_auth_code_flow( 

910 self, 

911 scopes, # type: list[str] 

912 redirect_uri=None, 

913 state=None, # Recommended by OAuth2 for CSRF protection 

914 prompt=None, 

915 login_hint=None, # type: Optional[str] 

916 domain_hint=None, # type: Optional[str] 

917 claims_challenge=None, 

918 max_age=None, 

919 response_mode=None, # type: Optional[str] 

920 ): 

921 """Initiate an auth code flow. 

922 

923 Later when the response reaches your redirect_uri, 

924 you can use :func:`~acquire_token_by_auth_code_flow()` 

925 to complete the authentication/authorization. 

926 

927 :param list scopes: 

928 It is a list of case-sensitive strings. 

929 :param str redirect_uri: 

930 Optional. If not specified, server will use the pre-registered one. 

931 :param str state: 

932 An opaque value used by the client to 

933 maintain state between the request and callback. 

934 If absent, this library will automatically generate one internally. 

935 :param str prompt: 

936 By default, no prompt value will be sent, not even string ``"none"``. 

937 You will have to specify a value explicitly. 

938 Its valid values are the constants defined in 

939 :class:`Prompt <msal.Prompt>`. 

940 

941 :param str login_hint: 

942 Optional. Identifier of the user. Generally a User Principal Name (UPN). 

943 :param domain_hint: 

944 Can be one of "consumers" or "organizations" or your tenant domain "contoso.com". 

945 If included, it will skip the email-based discovery process that user goes 

946 through on the sign-in page, leading to a slightly more streamlined user experience. 

947 More information on possible values available in 

948 `Auth Code Flow doc <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-auth-code-flow#request-an-authorization-code>`_ and 

949 `domain_hint doc <https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-oapx/86fb452d-e34a-494e-ac61-e526e263b6d8>`_. 

950 

951 :param int max_age: 

952 OPTIONAL. Maximum Authentication Age. 

953 Specifies the allowable elapsed time in seconds 

954 since the last time the End-User was actively authenticated. 

955 If the elapsed time is greater than this value, 

956 Microsoft identity platform will actively re-authenticate the End-User. 

957 

958 MSAL Python will also automatically validate the auth_time in ID token. 

959 

960 New in version 1.15. 

961 

962 :param str response_mode: 

963 OPTIONAL. Specifies the method with which response parameters should be returned. 

964 The default value is equivalent to ``query``, which was still secure enough in MSAL Python 

965 (because MSAL Python does not transfer tokens via query parameter in the first place). 

966 For even better security, we recommend using the value ``form_post``. 

967 In "form_post" mode, response parameters 

968 will be encoded as HTML form values that are transmitted via the HTTP POST method and 

969 encoded in the body using the application/x-www-form-urlencoded format. 

970 Valid values can be either "form_post" for HTTP POST to callback URI or 

971 "query" (the default) for HTTP GET with parameters encoded in query string. 

972 More information on possible values 

973 `here <https://openid.net/specs/oauth-v2-multiple-response-types-1_0.html#ResponseModes>` 

974 and `here <https://openid.net/specs/oauth-v2-form-post-response-mode-1_0.html#FormPostResponseMode>` 

975 

976 .. note:: 

977 You should configure your web framework to accept form_post responses instead of query responses. 

978 While this parameter still works, it will be removed in a future version. 

979 Using query-based response modes is less secure and should be avoided. 

980 

981 :return: 

982 The auth code flow. It is a dict in this form:: 

983 

984 { 

985 "auth_uri": "https://...", // Guide user to visit this 

986 "state": "...", // You may choose to verify it by yourself, 

987 // or just let acquire_token_by_auth_code_flow() 

988 // do that for you. 

989 "...": "...", // Everything else are reserved and internal 

990 } 

991 

992 The caller is expected to: 

993 

994 1. somehow store this content, typically inside the current session, 

995 2. guide the end user (i.e. resource owner) to visit that auth_uri, 

996 3. and then relay this dict and subsequent auth response to 

997 :func:`~acquire_token_by_auth_code_flow()`. 

998 """ 

999 # Note to maintainers: Do not emit warning for the use of response_mode here, 

1000 # because response_mode=form_post is still the recommended usage for MSAL Python 1.x. 

1001 # App developers making the right call shall not be disturbed by unactionable warnings. 

1002 client = _ClientWithCcsRoutingInfo( 

1003 {"authorization_endpoint": self.authority.authorization_endpoint}, 

1004 self.client_id, 

1005 http_client=self.http_client) 

1006 flow = client.initiate_auth_code_flow( 

1007 redirect_uri=redirect_uri, state=state, login_hint=login_hint, 

1008 prompt=prompt, 

1009 scope=self._decorate_scope(scopes), 

1010 domain_hint=domain_hint, 

1011 claims=_merge_claims_challenge_and_capabilities( 

1012 self._client_capabilities, claims_challenge), 

1013 max_age=max_age, 

1014 response_mode=response_mode, 

1015 ) 

1016 flow["claims_challenge"] = claims_challenge 

1017 return flow 

1018 

1019 def get_authorization_request_url( 

1020 self, 

1021 scopes, # type: list[str] 

1022 login_hint=None, # type: Optional[str] 

1023 state=None, # Recommended by OAuth2 for CSRF protection 

1024 redirect_uri=None, 

1025 response_type="code", # Could be "token" if you use Implicit Grant 

1026 prompt=None, 

1027 nonce=None, 

1028 domain_hint=None, # type: Optional[str] 

1029 claims_challenge=None, 

1030 **kwargs): 

1031 """Constructs a URL for you to start a Authorization Code Grant. 

1032 

1033 :param list[str] scopes: (Required) 

1034 Scopes requested to access a protected API (a resource). 

1035 :param str state: Recommended by OAuth2 for CSRF protection. 

1036 :param str login_hint: 

1037 Identifier of the user. Generally a User Principal Name (UPN). 

1038 :param str redirect_uri: 

1039 Address to return to upon receiving a response from the authority. 

1040 :param str response_type: 

1041 Default value is "code" for an OAuth2 Authorization Code grant. 

1042 

1043 You could use other content such as "id_token" or "token", 

1044 which would trigger an Implicit Grant, but that is 

1045 `not recommended <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-implicit-grant-flow#is-the-implicit-grant-suitable-for-my-app>`_. 

1046 

1047 :param str prompt: 

1048 By default, no prompt value will be sent, not even string ``"none"``. 

1049 You will have to specify a value explicitly. 

1050 Its valid values are the constants defined in 

1051 :class:`Prompt <msal.Prompt>`. 

1052 :param nonce: 

1053 A cryptographically random value used to mitigate replay attacks. See also 

1054 `OIDC specs <https://openid.net/specs/openid-connect-core-1_0.html#AuthRequest>`_. 

1055 :param domain_hint: 

1056 Can be one of "consumers" or "organizations" or your tenant domain "contoso.com". 

1057 If included, it will skip the email-based discovery process that user goes 

1058 through on the sign-in page, leading to a slightly more streamlined user experience. 

1059 More information on possible values available in 

1060 `Auth Code Flow doc <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-auth-code-flow#request-an-authorization-code>`_ and 

1061 `domain_hint doc <https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-oapx/86fb452d-e34a-494e-ac61-e526e263b6d8>`_. 

1062 :param claims_challenge: 

1063 The claims_challenge parameter requests specific claims requested by the resource provider 

1064 in the form of a claims_challenge directive in the www-authenticate header to be 

1065 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

1066 It is a string of a JSON object which contains lists of claims being requested from these locations. 

1067 

1068 :return: The authorization url as a string. 

1069 """ 

1070 authority = kwargs.pop("authority", None) # Historically we support this 

1071 if authority: 

1072 warnings.warn( 

1073 "We haven't decided if this method will accept authority parameter") 

1074 # The previous implementation is, it will use self.authority by default. 

1075 # Multi-tenant app can use new authority on demand 

1076 the_authority = Authority( 

1077 authority, 

1078 self.http_client, 

1079 instance_discovery=self._instance_discovery, 

1080 ) if authority else self.authority 

1081 

1082 client = _ClientWithCcsRoutingInfo( 

1083 {"authorization_endpoint": the_authority.authorization_endpoint}, 

1084 self.client_id, 

1085 http_client=self.http_client) 

1086 warnings.warn( 

1087 "Change your get_authorization_request_url() " 

1088 "to initiate_auth_code_flow()", DeprecationWarning) 

1089 with warnings.catch_warnings(record=True): 

1090 return client.build_auth_request_uri( 

1091 response_type=response_type, 

1092 redirect_uri=redirect_uri, state=state, login_hint=login_hint, 

1093 prompt=prompt, 

1094 scope=self._decorate_scope(scopes), 

1095 nonce=nonce, 

1096 domain_hint=domain_hint, 

1097 claims=_merge_claims_challenge_and_capabilities( 

1098 self._client_capabilities, claims_challenge), 

1099 ) 

1100 

1101 def acquire_token_by_auth_code_flow( 

1102 self, auth_code_flow, auth_response, scopes=None, **kwargs): 

1103 """Validate the auth response being redirected back, and obtain tokens. 

1104 

1105 It automatically provides nonce protection. 

1106 

1107 :param dict auth_code_flow: 

1108 The same dict returned by :func:`~initiate_auth_code_flow()`. 

1109 :param dict auth_response: 

1110 A dict of the query string received from auth server. 

1111 :param list[str] scopes: 

1112 Scopes requested to access a protected API (a resource). 

1113 

1114 Most of the time, you can leave it empty. 

1115 

1116 If you requested user consent for multiple resources, here you will 

1117 need to provide a subset of what you required in 

1118 :func:`~initiate_auth_code_flow()`. 

1119 

1120 OAuth2 was designed mostly for singleton services, 

1121 where tokens are always meant for the same resource and the only 

1122 changes are in the scopes. 

1123 In Microsoft Entra, tokens can be issued for multiple 3rd party resources. 

1124 You can ask authorization code for multiple resources, 

1125 but when you redeem it, the token is for only one intended 

1126 recipient, called audience. 

1127 So the developer need to specify a scope so that we can restrict the 

1128 token to be issued for the corresponding audience. 

1129 

1130 :return: 

1131 * A dict containing "access_token" and/or "id_token", among others, 

1132 depends on what scope was used. 

1133 (See https://tools.ietf.org/html/rfc6749#section-5.1) 

1134 * A dict containing "error", optionally "error_description", "error_uri". 

1135 (It is either `this <https://tools.ietf.org/html/rfc6749#section-4.1.2.1>`_ 

1136 or `that <https://tools.ietf.org/html/rfc6749#section-5.2>`_) 

1137 * Most client-side data error would result in ValueError exception. 

1138 So the usage pattern could be without any protocol details:: 

1139 

1140 def authorize(): # A controller in a web app 

1141 try: 

1142 result = msal_app.acquire_token_by_auth_code_flow( 

1143 session.get("flow", {}), request.args) 

1144 if "error" in result: 

1145 return render_template("error.html", result) 

1146 use(result) # Token(s) are available in result and cache 

1147 except ValueError: # Usually caused by CSRF 

1148 pass # Simply ignore them 

1149 return redirect(url_for("index")) 

1150 """ 

1151 self._validate_ssh_cert_input_data(kwargs.get("data", {})) 

1152 telemetry_context = self._build_telemetry_context( 

1153 self.ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID) 

1154 response = _clean_up(self.client.obtain_token_by_auth_code_flow( 

1155 auth_code_flow, 

1156 auth_response, 

1157 scope=self._decorate_scope(scopes) if scopes else None, 

1158 headers=telemetry_context.generate_headers(), 

1159 data=dict( 

1160 kwargs.pop("data", {}), 

1161 claims=_merge_claims_challenge_and_capabilities( 

1162 self._client_capabilities, 

1163 auth_code_flow.pop("claims_challenge", None))), 

1164 **kwargs)) 

1165 if "access_token" in response: 

1166 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

1167 telemetry_context.update_telemetry(response) 

1168 return response 

1169 

1170 def acquire_token_by_authorization_code( 

1171 self, 

1172 code, 

1173 scopes, # Syntactically required. STS accepts empty value though. 

1174 redirect_uri=None, 

1175 # REQUIRED, if the "redirect_uri" parameter was included in the 

1176 # authorization request as described in Section 4.1.1, and their 

1177 # values MUST be identical. 

1178 nonce=None, 

1179 claims_challenge=None, 

1180 **kwargs): 

1181 """The second half of the Authorization Code Grant. 

1182 

1183 :param code: The authorization code returned from Authorization Server. 

1184 :param list[str] scopes: (Required) 

1185 Scopes requested to access a protected API (a resource). 

1186 

1187 If you requested user consent for multiple resources, here you will 

1188 typically want to provide a subset of what you required in AuthCode. 

1189 

1190 OAuth2 was designed mostly for singleton services, 

1191 where tokens are always meant for the same resource and the only 

1192 changes are in the scopes. 

1193 In Microsoft Entra, tokens can be issued for multiple 3rd party resources. 

1194 You can ask authorization code for multiple resources, 

1195 but when you redeem it, the token is for only one intended 

1196 recipient, called audience. 

1197 So the developer need to specify a scope so that we can restrict the 

1198 token to be issued for the corresponding audience. 

1199 

1200 :param nonce: 

1201 If you provided a nonce when calling :func:`get_authorization_request_url`, 

1202 same nonce should also be provided here, so that we'll validate it. 

1203 An exception will be raised if the nonce in id token mismatches. 

1204 

1205 :param claims_challenge: 

1206 The claims_challenge parameter requests specific claims requested by the resource provider 

1207 in the form of a claims_challenge directive in the www-authenticate header to be 

1208 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

1209 It is a string of a JSON object which contains lists of claims being requested from these locations. 

1210 

1211 :return: A dict representing the json response from Microsoft Entra: 

1212 

1213 - A successful response would contain "access_token" key, 

1214 - an error response would contain "error" and usually "error_description". 

1215 """ 

1216 # If scope is absent on the wire, STS will give you a token associated 

1217 # to the FIRST scope sent during the authorization request. 

1218 # So in theory, you can omit scope here when you were working with only 

1219 # one scope. But, MSAL decorates your scope anyway, so they are never 

1220 # really empty. 

1221 assert isinstance(scopes, list), "Invalid parameter type" 

1222 self._validate_ssh_cert_input_data(kwargs.get("data", {})) 

1223 warnings.warn( 

1224 "Change your acquire_token_by_authorization_code() " 

1225 "to acquire_token_by_auth_code_flow()", DeprecationWarning) 

1226 with warnings.catch_warnings(record=True): 

1227 telemetry_context = self._build_telemetry_context( 

1228 self.ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID) 

1229 response = _clean_up(self.client.obtain_token_by_authorization_code( 

1230 code, redirect_uri=redirect_uri, 

1231 scope=self._decorate_scope(scopes), 

1232 headers=telemetry_context.generate_headers(), 

1233 data=dict( 

1234 kwargs.pop("data", {}), 

1235 claims=_merge_claims_challenge_and_capabilities( 

1236 self._client_capabilities, claims_challenge)), 

1237 nonce=nonce, 

1238 **kwargs)) 

1239 if "access_token" in response: 

1240 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

1241 telemetry_context.update_telemetry(response) 

1242 return response 

1243 

1244 def get_accounts(self, username=None): 

1245 """Get a list of accounts which previously signed in, i.e. exists in cache. 

1246 

1247 An account can later be used in :func:`~acquire_token_silent` 

1248 to find its tokens. 

1249 

1250 :param username: 

1251 Filter accounts with this username only. Case insensitive. 

1252 :return: A list of account objects. 

1253 Each account is a dict. For now, we only document its "username" field. 

1254 Your app can choose to display those information to end user, 

1255 and allow user to choose one of his/her accounts to proceed. 

1256 """ 

1257 accounts = self._find_msal_accounts(environment=self.authority.instance) 

1258 if not accounts: # Now try other aliases of this authority instance 

1259 for alias in self._get_authority_aliases(self.authority.instance): 

1260 accounts = self._find_msal_accounts(environment=alias) 

1261 if accounts: 

1262 break 

1263 if username: 

1264 # Federated account["username"] from AAD could contain mixed case 

1265 lowercase_username = username.lower() 

1266 accounts = [a for a in accounts 

1267 if a["username"].lower() == lowercase_username] 

1268 if not accounts: 

1269 logger.debug(( # This would also happen when the cache is empty 

1270 "get_accounts(username='{}') finds no account. " 

1271 "If tokens were acquired without 'profile' scope, " 

1272 "they would contain no username for filtering. " 

1273 "Consider calling get_accounts(username=None) instead." 

1274 ).format(username)) 

1275 # Does not further filter by existing RTs here. It probably won't matter. 

1276 # Because in most cases Accounts and RTs co-exist. 

1277 # Even in the rare case when an RT is revoked and then removed, 

1278 # acquire_token_silent() would then yield no result, 

1279 # apps would fall back to other acquire methods. This is the standard pattern. 

1280 return accounts 

1281 

1282 def _find_msal_accounts(self, environment): 

1283 interested_authority_types = [ 

1284 TokenCache.AuthorityType.ADFS, TokenCache.AuthorityType.MSSTS] 

1285 if _is_running_in_cloud_shell(): 

1286 interested_authority_types.append(_AUTHORITY_TYPE_CLOUDSHELL) 

1287 grouped_accounts = { 

1288 a.get("home_account_id"): # Grouped by home tenant's id 

1289 { # These are minimal amount of non-tenant-specific account info 

1290 "home_account_id": a.get("home_account_id"), 

1291 "environment": a.get("environment"), 

1292 "username": a.get("username"), 

1293 "account_source": a.get("account_source"), 

1294 

1295 # The following fields for backward compatibility, for now 

1296 "authority_type": a.get("authority_type"), 

1297 "local_account_id": a.get("local_account_id"), # Tenant-specific 

1298 "realm": a.get("realm"), # Tenant-specific 

1299 } 

1300 for a in self.token_cache.search( 

1301 TokenCache.CredentialType.ACCOUNT, 

1302 query={"environment": environment}) 

1303 if a["authority_type"] in interested_authority_types 

1304 } 

1305 return list(grouped_accounts.values()) 

1306 

1307 def _get_instance_metadata(self): # This exists so it can be mocked in unit test 

1308 resp = self.http_client.get( 

1309 "https://login.microsoftonline.com/common/discovery/instance?api-version=1.1&authorization_endpoint=https://login.microsoftonline.com/common/oauth2/authorize", # TBD: We may extend this to use self._instance_discovery endpoint 

1310 headers={'Accept': 'application/json'}) 

1311 resp.raise_for_status() 

1312 return json.loads(resp.text)['metadata'] 

1313 

1314 def _get_authority_aliases(self, instance): 

1315 if self._instance_discovery is False: 

1316 return [] 

1317 if self.authority._is_known_to_developer: 

1318 # Then it is an ADFS/B2C/known_authority_hosts situation 

1319 # which may not reach the central endpoint, so we skip it. 

1320 return [] 

1321 if not self.authority_groups: 

1322 self.authority_groups = [ 

1323 set(group['aliases']) for group in self._get_instance_metadata()] 

1324 for group in self.authority_groups: 

1325 if instance in group: 

1326 return [alias for alias in group if alias != instance] 

1327 return [] 

1328 

1329 def remove_account(self, account): 

1330 """Sign me out and forget me from token cache""" 

1331 if self._enable_broker: 

1332 from .broker import _signout_silently 

1333 error = _signout_silently(self.client_id, account["local_account_id"]) 

1334 if error: 

1335 logger.debug("_signout_silently() returns error: %s", error) 

1336 # Broker sign-out has been attempted, even if the _forget_me() below throws. 

1337 self._forget_me(account) 

1338 

1339 def _sign_out(self, home_account): 

1340 # Remove all relevant RTs and ATs from token cache 

1341 owned_by_home_account = { 

1342 "environment": home_account["environment"], 

1343 "home_account_id": home_account["home_account_id"],} # realm-independent 

1344 app_metadata = self._get_app_metadata(home_account["environment"]) 

1345 # Remove RTs/FRTs, and they are realm-independent 

1346 for rt in [ # Remove RTs from a static list (rather than from a dynamic generator), 

1347 # to avoid changing self.token_cache while it is being iterated 

1348 rt for rt in self.token_cache.search( 

1349 TokenCache.CredentialType.REFRESH_TOKEN, query=owned_by_home_account) 

1350 # Do RT's app ownership check as a precaution, in case family apps 

1351 # and 3rd-party apps share same token cache, although they should not. 

1352 if rt["client_id"] == self.client_id or ( 

1353 app_metadata.get("family_id") # Now let's settle family business 

1354 and rt.get("family_id") == app_metadata["family_id"]) 

1355 ]: 

1356 self.token_cache.remove_rt(rt) 

1357 for at in list(self.token_cache.search( # Remove ATs from a static list, 

1358 # to avoid changing self.token_cache while it is being iterated 

1359 TokenCache.CredentialType.ACCESS_TOKEN, query=owned_by_home_account, 

1360 # Regardless of realm, b/c we've removed realm-independent RTs anyway 

1361 )): 

1362 # To avoid the complexity of locating sibling family app's AT, 

1363 # we skip AT's app ownership check. 

1364 # It means ATs for other apps will also be removed, it is OK because: 

1365 # * non-family apps are not supposed to share token cache to begin with; 

1366 # * Even if it happens, we keep other app's RT already, so SSO still works 

1367 self.token_cache.remove_at(at) 

1368 

1369 def _forget_me(self, home_account): 

1370 # It implies signout, and then also remove all relevant accounts and IDTs 

1371 self._sign_out(home_account) 

1372 owned_by_home_account = { 

1373 "environment": home_account["environment"], 

1374 "home_account_id": home_account["home_account_id"],} # realm-independent 

1375 for idt in list(self.token_cache.search( # Remove IDTs from a static list, 

1376 # to avoid changing self.token_cache while it is being iterated 

1377 TokenCache.CredentialType.ID_TOKEN, query=owned_by_home_account, # regardless of realm 

1378 )): 

1379 self.token_cache.remove_idt(idt) 

1380 for a in list(self.token_cache.search( # Remove Accounts from a static list, 

1381 # to avoid changing self.token_cache while it is being iterated 

1382 TokenCache.CredentialType.ACCOUNT, query=owned_by_home_account, # regardless of realm 

1383 )): 

1384 self.token_cache.remove_account(a) 

1385 

1386 def _acquire_token_by_cloud_shell(self, scopes, data=None): 

1387 from .cloudshell import _obtain_token 

1388 response = _obtain_token( 

1389 self.http_client, scopes, client_id=self.client_id, data=data) 

1390 if "error" not in response: 

1391 self.token_cache.add(dict( 

1392 client_id=self.client_id, 

1393 scope=response["scope"].split() if "scope" in response else scopes, 

1394 token_endpoint=self.authority.token_endpoint, 

1395 response=response, 

1396 data=data or {}, 

1397 authority_type=_AUTHORITY_TYPE_CLOUDSHELL, 

1398 )) 

1399 if "access_token" in response: 

1400 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_BROKER 

1401 return response 

1402 

1403 def acquire_token_silent( 

1404 self, 

1405 scopes, # type: List[str] 

1406 account, # type: Optional[Account] 

1407 authority=None, # See get_authorization_request_url() 

1408 force_refresh=False, # type: Optional[boolean] 

1409 claims_challenge=None, 

1410 auth_scheme=None, 

1411 **kwargs): 

1412 """Acquire an access token for given account, without user interaction. 

1413 

1414 It has same parameters as the :func:`~acquire_token_silent_with_error`. 

1415 The difference is the behavior of the return value. 

1416 This method will combine the cache empty and refresh error 

1417 into one return value, `None`. 

1418 If your app does not care about the exact token refresh error during 

1419 token cache look-up, then this method is easier and recommended. 

1420 

1421 :return: 

1422 - A dict containing no "error" key, 

1423 and typically contains an "access_token" key, 

1424 if cache lookup succeeded. 

1425 - None when cache lookup does not yield a token. 

1426 """ 

1427 if not account: 

1428 return None # A backward-compatible NO-OP to drop the account=None usage 

1429 result = _clean_up(self._acquire_token_silent_with_error( 

1430 scopes, account, authority=authority, force_refresh=force_refresh, 

1431 claims_challenge=claims_challenge, auth_scheme=auth_scheme, **kwargs)) 

1432 return result if result and "error" not in result else None 

1433 

1434 def acquire_token_silent_with_error( 

1435 self, 

1436 scopes, # type: List[str] 

1437 account, # type: Optional[Account] 

1438 authority=None, # See get_authorization_request_url() 

1439 force_refresh=False, # type: Optional[boolean] 

1440 claims_challenge=None, 

1441 auth_scheme=None, 

1442 **kwargs): 

1443 """Acquire an access token for given account, without user interaction. 

1444 

1445 It is done either by finding a valid access token from cache, 

1446 or by finding a valid refresh token from cache and then automatically 

1447 use it to redeem a new access token. 

1448 

1449 This method will differentiate cache empty from token refresh error. 

1450 If your app cares the exact token refresh error during 

1451 token cache look-up, then this method is suitable. 

1452 Otherwise, the other method :func:`~acquire_token_silent` is recommended. 

1453 

1454 :param list[str] scopes: (Required) 

1455 Scopes requested to access a protected API (a resource). 

1456 :param account: (Required) 

1457 One of the account object returned by :func:`~get_accounts`. 

1458 Starting from MSAL Python 1.23, 

1459 a ``None`` input will become a NO-OP and always return ``None``. 

1460 :param force_refresh: 

1461 If True, it will skip Access Token look-up, 

1462 and try to find a Refresh Token to obtain a new Access Token. 

1463 :param claims_challenge: 

1464 The claims_challenge parameter requests specific claims requested by the resource provider 

1465 in the form of a claims_challenge directive in the www-authenticate header to be 

1466 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

1467 It is a string of a JSON object which contains lists of claims being requested from these locations. 

1468 :param object auth_scheme: 

1469 You can provide an ``msal.auth_scheme.PopAuthScheme`` object 

1470 so that MSAL will get a Proof-of-Possession (POP) token for you. 

1471 

1472 New in version 1.26.0. 

1473 

1474 :return: 

1475 - A dict containing no "error" key, 

1476 and typically contains an "access_token" key, 

1477 if cache lookup succeeded. 

1478 - None when there is simply no token in the cache. 

1479 - A dict containing an "error" key, when token refresh failed. 

1480 """ 

1481 if not account: 

1482 return None # A backward-compatible NO-OP to drop the account=None usage 

1483 return _clean_up(self._acquire_token_silent_with_error( 

1484 scopes, account, authority=authority, force_refresh=force_refresh, 

1485 claims_challenge=claims_challenge, auth_scheme=auth_scheme, **kwargs)) 

1486 

1487 def _acquire_token_silent_with_error( 

1488 self, 

1489 scopes, # type: List[str] 

1490 account, # type: Optional[Account] 

1491 authority=None, # See get_authorization_request_url() 

1492 force_refresh=False, # type: Optional[boolean] 

1493 claims_challenge=None, 

1494 auth_scheme=None, 

1495 **kwargs): 

1496 assert isinstance(scopes, list), "Invalid parameter type" 

1497 self._validate_ssh_cert_input_data(kwargs.get("data", {})) 

1498 correlation_id = msal.telemetry._get_new_correlation_id() 

1499 if authority: 

1500 warnings.warn("We haven't decided how/if this method will accept authority parameter") 

1501 # the_authority = Authority( 

1502 # authority, 

1503 # self.http_client, 

1504 # instance_discovery=self._instance_discovery, 

1505 # ) if authority else self.authority 

1506 result = self._acquire_token_silent_from_cache_and_possibly_refresh_it( 

1507 scopes, account, self.authority, force_refresh=force_refresh, 

1508 claims_challenge=claims_challenge, 

1509 correlation_id=correlation_id, 

1510 auth_scheme=auth_scheme, 

1511 **kwargs) 

1512 if result and "error" not in result: 

1513 return result 

1514 final_result = result 

1515 for alias in self._get_authority_aliases(self.authority.instance): 

1516 if not list(self.token_cache.search( # Need a list to test emptiness 

1517 self.token_cache.CredentialType.REFRESH_TOKEN, 

1518 # target=scopes, # MUST NOT filter by scopes, because: 

1519 # 1. AAD RTs are scope-independent; 

1520 # 2. therefore target is optional per schema; 

1521 query={"environment": alias})): 

1522 # Skip heavy weight logic when RT for this alias doesn't exist 

1523 continue 

1524 the_authority = Authority( 

1525 "https://" + alias + "/" + self.authority.tenant, 

1526 self.http_client, 

1527 instance_discovery=False, 

1528 ) 

1529 result = self._acquire_token_silent_from_cache_and_possibly_refresh_it( 

1530 scopes, account, the_authority, force_refresh=force_refresh, 

1531 claims_challenge=claims_challenge, 

1532 correlation_id=correlation_id, 

1533 auth_scheme=auth_scheme, 

1534 **kwargs) 

1535 if result: 

1536 if "error" not in result: 

1537 return result 

1538 final_result = result 

1539 if final_result and final_result.get("suberror"): 

1540 final_result["classification"] = { # Suppress these suberrors, per #57 

1541 "bad_token": "", 

1542 "token_expired": "", 

1543 "protection_policy_required": "", 

1544 "client_mismatch": "", 

1545 "device_authentication_failed": "", 

1546 }.get(final_result["suberror"], final_result["suberror"]) 

1547 return final_result 

1548 

1549 def _acquire_token_silent_from_cache_and_possibly_refresh_it( 

1550 self, 

1551 scopes, # type: List[str] 

1552 account, # type: Optional[Account] 

1553 authority, # This can be different than self.authority 

1554 force_refresh=False, # type: Optional[boolean] 

1555 claims_challenge=None, 

1556 correlation_id=None, 

1557 http_exceptions=None, 

1558 auth_scheme=None, 

1559 **kwargs): 

1560 # This internal method has two calling patterns: 

1561 # it accepts a non-empty account to find token for a user, 

1562 # and accepts account=None to find a token for the current app. 

1563 access_token_from_cache = None 

1564 if not (force_refresh or claims_challenge or auth_scheme): # Then attempt AT cache 

1565 query={ 

1566 "client_id": self.client_id, 

1567 "environment": authority.instance, 

1568 "realm": authority.tenant, 

1569 "home_account_id": (account or {}).get("home_account_id"), 

1570 } 

1571 key_id = kwargs.get("data", {}).get("key_id") 

1572 if key_id: # Some token types (SSH-certs, POP) are bound to a key 

1573 query["key_id"] = key_id 

1574 now = time.time() 

1575 refresh_reason = msal.telemetry.AT_ABSENT 

1576 for entry in self.token_cache.search( # A generator allows us to 

1577 # break early in cache-hit without finding a full list 

1578 self.token_cache.CredentialType.ACCESS_TOKEN, 

1579 target=scopes, 

1580 query=query, 

1581 ): # This loop is about token search, not about token deletion. 

1582 # Note that search() holds a lock during this loop; 

1583 # that is fine because this loop is fast 

1584 expires_in = int(entry["expires_on"]) - now 

1585 if expires_in < 5*60: # Then consider it expired 

1586 refresh_reason = msal.telemetry.AT_EXPIRED 

1587 continue # Removal is not necessary, it will be overwritten 

1588 logger.debug("Cache hit an AT") 

1589 access_token_from_cache = { # Mimic a real response 

1590 "access_token": entry["secret"], 

1591 "token_type": entry.get("token_type", "Bearer"), 

1592 "expires_in": int(expires_in), # OAuth2 specs defines it as int 

1593 self._TOKEN_SOURCE: self._TOKEN_SOURCE_CACHE, 

1594 } 

1595 if "refresh_on" in entry: 

1596 access_token_from_cache["refresh_on"] = int(entry["refresh_on"]) 

1597 if int(entry["refresh_on"]) < now: # aging 

1598 refresh_reason = msal.telemetry.AT_AGING 

1599 break # With a fallback in hand, we break here to go refresh 

1600 self._build_telemetry_context(-1).hit_an_access_token() 

1601 return access_token_from_cache # It is still good as new 

1602 else: 

1603 refresh_reason = msal.telemetry.FORCE_REFRESH # TODO: It could also mean claims_challenge 

1604 assert refresh_reason, "It should have been established at this point" 

1605 if not http_exceptions: # It can be a tuple of exceptions 

1606 # The exact HTTP exceptions are transportation-layer dependent 

1607 from requests.exceptions import RequestException # Lazy load 

1608 http_exceptions = (RequestException,) 

1609 try: 

1610 data = kwargs.get("data", {}) 

1611 if account and account.get("authority_type") == _AUTHORITY_TYPE_CLOUDSHELL: 

1612 if auth_scheme: 

1613 raise ValueError("auth_scheme is not supported in Cloud Shell") 

1614 return self._acquire_token_by_cloud_shell(scopes, data=data) 

1615 

1616 is_ssh_cert_or_pop_request = _is_ssh_cert_or_pop_request(data.get("token_type"), auth_scheme) 

1617 

1618 if self._enable_broker and account and account.get("account_source") in ( 

1619 _GRANT_TYPE_BROKER, # Broker successfully established this account previously. 

1620 None, # Unknown data from older MSAL. Broker might still work. 

1621 ) and (sys.platform in ("win32", "darwin") or not is_ssh_cert_or_pop_request): 

1622 from .broker import _acquire_token_silently 

1623 response = _acquire_token_silently( 

1624 "https://{}/{}".format(self.authority.instance, self.authority.tenant), 

1625 self.client_id, 

1626 account["local_account_id"], 

1627 scopes, 

1628 claims=_merge_claims_challenge_and_capabilities( 

1629 self._client_capabilities, claims_challenge), 

1630 correlation_id=correlation_id, 

1631 auth_scheme=auth_scheme, 

1632 **data) 

1633 if response: # Broker provides a decisive outcome 

1634 account_was_established_by_broker = account.get( 

1635 "account_source") == _GRANT_TYPE_BROKER 

1636 broker_attempt_succeeded_just_now = "error" not in response 

1637 if account_was_established_by_broker or broker_attempt_succeeded_just_now: 

1638 return self._process_broker_response(response, scopes, data) 

1639 

1640 if auth_scheme: 

1641 raise ValueError(self._AUTH_SCHEME_UNSUPPORTED) 

1642 if account: 

1643 result = self._acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family( 

1644 authority, self._decorate_scope(scopes), account, 

1645 refresh_reason=refresh_reason, claims_challenge=claims_challenge, 

1646 correlation_id=correlation_id, 

1647 **kwargs) 

1648 else: # The caller is acquire_token_for_client() 

1649 result = self._acquire_token_for_client( 

1650 scopes, refresh_reason, claims_challenge=claims_challenge, 

1651 **kwargs) 

1652 if result and "access_token" in result: 

1653 result[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

1654 if (result and "error" not in result) or (not access_token_from_cache): 

1655 return result 

1656 except http_exceptions: 

1657 # Typically network error. Potential AAD outage? 

1658 if not access_token_from_cache: # It means there is no fall back option 

1659 raise # We choose to bubble up the exception 

1660 return access_token_from_cache 

1661 

1662 def _process_broker_response(self, response, scopes, data): 

1663 if "error" not in response: 

1664 self.token_cache.add(dict( 

1665 client_id=self.client_id, 

1666 scope=response["scope"].split() if "scope" in response else scopes, 

1667 token_endpoint=self.authority.token_endpoint, 

1668 response=response, 

1669 data=data, 

1670 _account_id=response["_account_id"], 

1671 environment=self.authority.instance, # Be consistent with non-broker flows 

1672 grant_type=_GRANT_TYPE_BROKER, # A pseudo grant type for TokenCache to mark account_source as broker 

1673 )) 

1674 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_BROKER 

1675 return _clean_up(response) 

1676 

1677 def _acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family( 

1678 self, authority, scopes, account, **kwargs): 

1679 query = { 

1680 "environment": authority.instance, 

1681 "home_account_id": (account or {}).get("home_account_id"), 

1682 # "realm": authority.tenant, # AAD RTs are tenant-independent 

1683 } 

1684 app_metadata = self._get_app_metadata(authority.instance) 

1685 if not app_metadata: # Meaning this app is now used for the first time. 

1686 # When/if we have a way to directly detect current app's family, 

1687 # we'll rewrite this block, to support multiple families. 

1688 # For now, we try existing RTs (*). If it works, we are in that family. 

1689 # (*) RTs of a different app/family are not supposed to be 

1690 # shared with or accessible by us in the first place. 

1691 at = self._acquire_token_silent_by_finding_specific_refresh_token( 

1692 authority, scopes, 

1693 dict(query, family_id="1"), # A hack, we have only 1 family for now 

1694 rt_remover=lambda rt_item: None, # NO-OP b/c RTs are likely not mine 

1695 break_condition=lambda response: # Break loop when app not in family 

1696 # Based on an AAD-only behavior mentioned in internal doc here 

1697 # https://msazure.visualstudio.com/One/_git/ESTS-Docs/pullrequest/1138595 

1698 "client_mismatch" in response.get("error_additional_info", []), 

1699 **kwargs) 

1700 if at and "error" not in at: 

1701 return at 

1702 last_resp = None 

1703 if app_metadata.get("family_id"): # Meaning this app belongs to this family 

1704 last_resp = at = self._acquire_token_silent_by_finding_specific_refresh_token( 

1705 authority, scopes, dict(query, family_id=app_metadata["family_id"]), 

1706 **kwargs) 

1707 if at and "error" not in at: 

1708 return at 

1709 # Either this app is an orphan, so we will naturally use its own RT; 

1710 # or all attempts above have failed, so we fall back to non-foci behavior. 

1711 return self._acquire_token_silent_by_finding_specific_refresh_token( 

1712 authority, scopes, dict(query, client_id=self.client_id), 

1713 **kwargs) or last_resp 

1714 

1715 def _get_app_metadata(self, environment): 

1716 return self.token_cache._get_app_metadata( 

1717 environment=environment, client_id=self.client_id, default={}) 

1718 

1719 def _acquire_token_silent_by_finding_specific_refresh_token( 

1720 self, authority, scopes, query, 

1721 rt_remover=None, break_condition=lambda response: False, 

1722 refresh_reason=None, correlation_id=None, claims_challenge=None, 

1723 **kwargs): 

1724 matches = list(self.token_cache.search( # We want a list to test emptiness 

1725 self.token_cache.CredentialType.REFRESH_TOKEN, 

1726 # target=scopes, # AAD RTs are scope-independent 

1727 query=query)) 

1728 logger.debug("Found %d RTs matching %s", len(matches), { 

1729 k: _pii_less_home_account_id(v) if k == "home_account_id" and v else v 

1730 for k, v in query.items() 

1731 }) 

1732 

1733 response = None # A distinguishable value to mean cache is empty 

1734 if not matches: # Then exit early to avoid expensive operations 

1735 return response 

1736 client, _ = self._build_client( 

1737 # Potentially expensive if building regional client 

1738 self.client_credential, authority, skip_regional_client=True) 

1739 telemetry_context = self._build_telemetry_context( 

1740 self.ACQUIRE_TOKEN_SILENT_ID, 

1741 correlation_id=correlation_id, refresh_reason=refresh_reason) 

1742 for entry in sorted( # Since unfit RTs would not be aggressively removed, 

1743 # we start from newer RTs which are more likely fit. 

1744 matches, 

1745 key=lambda e: int(e.get("last_modification_time", "0")), 

1746 reverse=True): 

1747 logger.debug("Cache attempts an RT") 

1748 headers = telemetry_context.generate_headers() 

1749 if query.get("home_account_id"): # Then use it as CCS Routing info 

1750 headers["X-AnchorMailbox"] = "Oid:{}".format( # case-insensitive value 

1751 query["home_account_id"].replace(".", "@")) 

1752 response = client.obtain_token_by_refresh_token( 

1753 entry, rt_getter=lambda token_item: token_item["secret"], 

1754 on_removing_rt=lambda rt_item: None, # Disable RT removal, 

1755 # because an invalid_grant could be caused by new MFA policy, 

1756 # the RT could still be useful for other MFA-less scope or tenant 

1757 on_obtaining_tokens=lambda event: self.token_cache.add(dict( 

1758 event, 

1759 environment=authority.instance, 

1760 skip_account_creation=True, # To honor a concurrent remove_account() 

1761 )), 

1762 scope=scopes, 

1763 headers=headers, 

1764 data=dict( 

1765 kwargs.pop("data", {}), 

1766 claims=_merge_claims_challenge_and_capabilities( 

1767 self._client_capabilities, claims_challenge)), 

1768 **kwargs) 

1769 telemetry_context.update_telemetry(response) 

1770 if "error" not in response: 

1771 return response 

1772 logger.debug("Refresh failed. {error}: {error_description}".format( 

1773 error=response.get("error"), 

1774 error_description=response.get("error_description"), 

1775 )) 

1776 if break_condition(response): 

1777 break 

1778 return response # Returns the latest error (if any), or just None 

1779 

1780 def _validate_ssh_cert_input_data(self, data): 

1781 if data.get("token_type") == "ssh-cert": 

1782 if not data.get("req_cnf"): 

1783 raise ValueError( 

1784 "When requesting an SSH certificate, " 

1785 "you must include a string parameter named 'req_cnf' " 

1786 "containing the public key in JWK format " 

1787 "(https://tools.ietf.org/html/rfc7517).") 

1788 if not data.get("key_id"): 

1789 raise ValueError( 

1790 "When requesting an SSH certificate, " 

1791 "you must include a string parameter named 'key_id' " 

1792 "which identifies the key in the 'req_cnf' argument.") 

1793 

1794 def acquire_token_by_refresh_token(self, refresh_token, scopes, **kwargs): 

1795 """Acquire token(s) based on a refresh token (RT) obtained from elsewhere. 

1796 

1797 You use this method only when you have old RTs from elsewhere, 

1798 and now you want to migrate them into MSAL. 

1799 Calling this method results in new tokens automatically storing into MSAL. 

1800 

1801 You do NOT need to use this method if you are already using MSAL. 

1802 MSAL maintains RT automatically inside its token cache, 

1803 and an access token can be retrieved 

1804 when you call :func:`~acquire_token_silent`. 

1805 

1806 :param str refresh_token: The old refresh token, as a string. 

1807 

1808 :param list scopes: 

1809 The scopes associate with this old RT. 

1810 Each scope needs to be in the Microsoft identity platform (v2) format. 

1811 See `Scopes not resources <https://docs.microsoft.com/en-us/azure/active-directory/develop/migrate-python-adal-msal#scopes-not-resources>`_. 

1812 

1813 :return: 

1814 * A dict contains "error" and some other keys, when error happened. 

1815 * A dict contains no "error" key means migration was successful. 

1816 """ 

1817 self._validate_ssh_cert_input_data(kwargs.get("data", {})) 

1818 telemetry_context = self._build_telemetry_context( 

1819 self.ACQUIRE_TOKEN_BY_REFRESH_TOKEN, 

1820 refresh_reason=msal.telemetry.FORCE_REFRESH) 

1821 response = _clean_up(self.client.obtain_token_by_refresh_token( 

1822 refresh_token, 

1823 scope=self._decorate_scope(scopes), 

1824 headers=telemetry_context.generate_headers(), 

1825 rt_getter=lambda rt: rt, 

1826 on_updating_rt=False, 

1827 on_removing_rt=lambda rt_item: None, # No OP 

1828 **kwargs)) 

1829 if "access_token" in response: 

1830 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

1831 telemetry_context.update_telemetry(response) 

1832 return response 

1833 

1834 def acquire_token_by_username_password( 

1835 self, username, password, scopes, claims_challenge=None, 

1836 # Note: We shouldn't need to surface enable_msa_passthrough, 

1837 # because this ROPC won't work with MSA account anyway. 

1838 auth_scheme=None, 

1839 **kwargs): 

1840 """Gets a token for a given resource via user credentials. 

1841 

1842 See this page for constraints of Username Password Flow. 

1843 https://github.com/AzureAD/microsoft-authentication-library-for-python/wiki/Username-Password-Authentication 

1844 

1845 :param str username: Typically a UPN in the form of an email address. 

1846 :param str password: The password. 

1847 :param list[str] scopes: 

1848 Scopes requested to access a protected API (a resource). 

1849 :param claims_challenge: 

1850 The claims_challenge parameter requests specific claims requested by the resource provider 

1851 in the form of a claims_challenge directive in the www-authenticate header to be 

1852 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

1853 It is a string of a JSON object which contains lists of claims being requested from these locations. 

1854 

1855 :param object auth_scheme: 

1856 You can provide an ``msal.auth_scheme.PopAuthScheme`` object 

1857 so that MSAL will get a Proof-of-Possession (POP) token for you. 

1858 

1859 New in version 1.26.0. 

1860 

1861 :return: A dict representing the json response from Microsoft Entra: 

1862 

1863 - A successful response would contain "access_token" key, 

1864 - an error response would contain "error" and usually "error_description". 

1865 

1866 [Deprecated] This API is deprecated for public client flows and will be 

1867 removed in a future release. Use a more secure flow instead. 

1868 Migration guide: https://aka.ms/msal-ropc-migration 

1869 

1870 """ 

1871 is_confidential_app = self.client_credential or isinstance( 

1872 self, ConfidentialClientApplication) 

1873 if not is_confidential_app: 

1874 warnings.warn("""This API has been deprecated for public client flows, please use a more secure flow. 

1875 See https://aka.ms/msal-ropc-migration for migration guidance""", DeprecationWarning) 

1876 claims = _merge_claims_challenge_and_capabilities( 

1877 self._client_capabilities, claims_challenge) 

1878 if self._enable_broker and sys.platform in ("win32", "darwin"): 

1879 from .broker import _signin_silently 

1880 response = _signin_silently( 

1881 "https://{}/{}".format(self.authority.instance, self.authority.tenant), 

1882 self.client_id, 

1883 scopes, # Decorated scopes won't work due to offline_access 

1884 MSALRuntime_Username=username, 

1885 MSALRuntime_Password=password, 

1886 validateAuthority="no" if ( 

1887 self.authority._is_known_to_developer 

1888 or self._instance_discovery is False) else None, 

1889 claims=claims, 

1890 auth_scheme=auth_scheme, 

1891 ) 

1892 return self._process_broker_response(response, scopes, kwargs.get("data", {})) 

1893 

1894 if auth_scheme: 

1895 raise ValueError(self._AUTH_SCHEME_UNSUPPORTED) 

1896 scopes = self._decorate_scope(scopes) 

1897 telemetry_context = self._build_telemetry_context( 

1898 self.ACQUIRE_TOKEN_BY_USERNAME_PASSWORD_ID) 

1899 headers = telemetry_context.generate_headers() 

1900 data = dict(kwargs.pop("data", {}), claims=claims) 

1901 response = None 

1902 if not self.authority.is_adfs: 

1903 user_realm_result = self.authority.user_realm_discovery( 

1904 username, correlation_id=headers[msal.telemetry.CLIENT_REQUEST_ID]) 

1905 if user_realm_result.get("account_type") == "Federated": 

1906 response = _clean_up(self._acquire_token_by_username_password_federated( 

1907 user_realm_result, username, password, scopes=scopes, 

1908 data=data, 

1909 headers=headers, **kwargs)) 

1910 if response is None: # Either ADFS or not federated 

1911 response = _clean_up(self.client.obtain_token_by_username_password( 

1912 username, password, scope=scopes, 

1913 headers=headers, 

1914 data=data, 

1915 **kwargs)) 

1916 if "access_token" in response: 

1917 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

1918 telemetry_context.update_telemetry(response) 

1919 return response 

1920 

1921 def _acquire_token_by_username_password_federated( 

1922 self, user_realm_result, username, password, scopes=None, **kwargs): 

1923 wstrust_endpoint = {} 

1924 if user_realm_result.get("federation_metadata_url"): 

1925 wstrust_endpoint = mex_send_request( 

1926 user_realm_result["federation_metadata_url"], 

1927 self.http_client) 

1928 if wstrust_endpoint is None: 

1929 raise ValueError("Unable to find wstrust endpoint from MEX. " 

1930 "This typically happens when attempting MSA accounts. " 

1931 "More details available here. " 

1932 "https://github.com/AzureAD/microsoft-authentication-library-for-python/wiki/Username-Password-Authentication") 

1933 logger.debug("wstrust_endpoint = %s", wstrust_endpoint) 

1934 wstrust_result = wst_send_request( 

1935 username, password, 

1936 user_realm_result.get("cloud_audience_urn", "urn:federation:MicrosoftOnline"), 

1937 wstrust_endpoint.get("address", 

1938 # Fallback to an AAD supplied endpoint 

1939 user_realm_result.get("federation_active_auth_url")), 

1940 wstrust_endpoint.get("action"), self.http_client) 

1941 if not ("token" in wstrust_result and "type" in wstrust_result): 

1942 raise RuntimeError("Unsuccessful RSTR. %s" % wstrust_result) 

1943 GRANT_TYPE_SAML1_1 = 'urn:ietf:params:oauth:grant-type:saml1_1-bearer' 

1944 grant_type = { 

1945 SAML_TOKEN_TYPE_V1: GRANT_TYPE_SAML1_1, 

1946 SAML_TOKEN_TYPE_V2: self.client.GRANT_TYPE_SAML2, 

1947 WSS_SAML_TOKEN_PROFILE_V1_1: GRANT_TYPE_SAML1_1, 

1948 WSS_SAML_TOKEN_PROFILE_V2: self.client.GRANT_TYPE_SAML2 

1949 }.get(wstrust_result.get("type")) 

1950 if not grant_type: 

1951 raise RuntimeError( 

1952 "RSTR returned unknown token type: %s", wstrust_result.get("type")) 

1953 self.client.grant_assertion_encoders.setdefault( # Register a non-standard type 

1954 grant_type, self.client.encode_saml_assertion) 

1955 return self.client.obtain_token_by_assertion( 

1956 wstrust_result["token"], grant_type, scope=scopes, 

1957 on_obtaining_tokens=lambda event: self.token_cache.add(dict( 

1958 event, 

1959 environment=self.authority.instance, 

1960 username=username, # Useful in case IDT contains no such info 

1961 )), 

1962 **kwargs) 

1963 

1964 

1965class PublicClientApplication(ClientApplication): # browser app or mobile app 

1966 

1967 DEVICE_FLOW_CORRELATION_ID = "_correlation_id" 

1968 CONSOLE_WINDOW_HANDLE = object() 

1969 

1970 def __init__( 

1971 self, client_id, client_credential=None, 

1972 *, 

1973 enable_broker_on_windows=None, 

1974 enable_broker_on_mac=None, 

1975 enable_broker_on_linux=None, 

1976 enable_broker_on_wsl=None, 

1977 **kwargs): 

1978 """Same as :func:`ClientApplication.__init__`, 

1979 except that ``client_credential`` parameter shall remain ``None``. 

1980 

1981 .. note:: 

1982 

1983 **What is a broker, and why use it?** 

1984 

1985 A broker is a component installed on your device. 

1986 Broker implicitly gives your device an identity. By using a broker, 

1987 your device becomes a factor that can satisfy MFA (Multi-factor authentication). 

1988 This factor would become mandatory 

1989 if a tenant's admin enables a corresponding Conditional Access (CA) policy. 

1990 The broker's presence allows Microsoft identity platform 

1991 to have higher confidence that the tokens are being issued to your device, 

1992 and that is more secure. 

1993 

1994 An additional benefit of broker is, 

1995 it runs as a long-lived process with your device's OS, 

1996 and maintains its own cache, 

1997 so that your broker-enabled apps (even a CLI) 

1998 could automatically SSO from a previously established signed-in session. 

1999 

2000 **How to opt in to use broker?** 

2001 

2002 1. You can set any combination of the following opt-in parameters to true: 

2003 

2004 +--------------------------+-----------------------------------+------------------------------------------------------------------------------------+ 

2005 | Opt-in flag | If app will run on | App has registered this as a Desktop platform redirect URI in Azure Portal | 

2006 +==========================+===================================+====================================================================================+ 

2007 | enable_broker_on_windows | Windows 10+ | ms-appx-web://Microsoft.AAD.BrokerPlugin/your_client_id | 

2008 +--------------------------+-----------------------------------+------------------------------------------------------------------------------------+ 

2009 | enable_broker_on_wsl | WSL | ms-appx-web://Microsoft.AAD.BrokerPlugin/your_client_id | 

2010 +--------------------------+-----------------------------------+------------------------------------------------------------------------------------+ 

2011 | enable_broker_on_mac | Mac with Company Portal installed | msauth.com.msauth.unsignedapp://auth | 

2012 +--------------------------+-----------------------------------+------------------------------------------------------------------------------------+ 

2013 | enable_broker_on_linux | Linux with Intune installed | ``https://login.microsoftonline.com/common/oauth2/nativeclient`` (MUST be enabled) | 

2014 +--------------------------+-----------------------------------+------------------------------------------------------------------------------------+ 

2015 

2016 2. Install broker dependency, 

2017 e.g. ``pip install msal[broker]>=1.33,<2``. 

2018 

2019 3. Test with ``acquire_token_interactive()`` and ``acquire_token_silent()``. 

2020 

2021 **The fallback behaviors of MSAL Python's broker support** 

2022 

2023 MSAL will either error out, or silently fallback to non-broker flows. 

2024 

2025 1. MSAL will ignore the `enable_broker_...` and bypass broker 

2026 on those auth flows that are known to be NOT supported by broker. 

2027 This includes ADFS, B2C, etc.. 

2028 For other "could-use-broker" scenarios, please see below. 

2029 2. MSAL errors out when app developer opted-in to use broker 

2030 but a direct dependency "mid-tier" package is not installed. 

2031 Error message guides app developer to declare the correct dependency 

2032 ``msal[broker]``. 

2033 We error out here because the error is actionable to app developers. 

2034 3. MSAL silently "deactivates" the broker and fallback to non-broker, 

2035 when opted-in, dependency installed yet failed to initialize. 

2036 We anticipate this would happen on a device whose OS is too old 

2037 or the underlying broker component is somehow unavailable. 

2038 There is not much an app developer or the end user can do here. 

2039 Eventually, the conditional access policy shall 

2040 force the user to switch to a different device. 

2041 4. MSAL errors out when broker is opted in, installed, initialized, 

2042 but subsequent token request(s) failed. 

2043 

2044 :param boolean enable_broker_on_windows: 

2045 This setting is only effective if your app is running on Windows 10+. 

2046 This parameter defaults to None, which means MSAL will not utilize a broker. 

2047 

2048 New in MSAL Python 1.25.0. 

2049 

2050 :param boolean enable_broker_on_mac: 

2051 This setting is only effective if your app is running on Mac. 

2052 This parameter defaults to None, which means MSAL will not utilize a broker. 

2053 

2054 New in MSAL Python 1.31.0. 

2055 

2056 :param boolean enable_broker_on_linux: 

2057 This setting is only effective if your app is running on Linux, including WSL. 

2058 This parameter defaults to None, which means MSAL will not utilize a broker. 

2059 

2060 New in MSAL Python 1.33.0. 

2061 

2062 :param boolean enable_broker_on_wsl: 

2063 This setting is only effective if your app is running on WSL. 

2064 This parameter defaults to None, which means MSAL will not utilize a broker. 

2065 

2066 New in MSAL Python 1.33.0. 

2067 """ 

2068 if client_credential is not None: 

2069 raise ValueError("Public Client should not possess credentials") 

2070 

2071 self._enable_broker = bool( 

2072 enable_broker_on_windows and sys.platform == "win32" 

2073 or enable_broker_on_mac and sys.platform == "darwin" 

2074 or enable_broker_on_linux and sys.platform == "linux" 

2075 or enable_broker_on_wsl and is_wsl() 

2076 ) 

2077 

2078 super(PublicClientApplication, self).__init__( 

2079 client_id, client_credential=None, **kwargs) 

2080 

2081 def acquire_token_interactive( 

2082 self, 

2083 scopes, # type: list[str] 

2084 prompt=None, 

2085 login_hint=None, # type: Optional[str] 

2086 domain_hint=None, # type: Optional[str] 

2087 claims_challenge=None, 

2088 timeout=None, 

2089 port=None, 

2090 extra_scopes_to_consent=None, 

2091 max_age=None, 

2092 parent_window_handle=None, 

2093 on_before_launching_ui=None, 

2094 auth_scheme=None, 

2095 **kwargs): 

2096 """Acquire token interactively i.e. via a local browser. 

2097 

2098 Prerequisite: In Azure Portal, configure the Redirect URI of your 

2099 "Mobile and Desktop application" as ``http://localhost``. 

2100 If you opts in to use broker during ``PublicClientApplication`` creation, 

2101 your app also need this Redirect URI: 

2102 ``ms-appx-web://Microsoft.AAD.BrokerPlugin/YOUR_CLIENT_ID`` 

2103 

2104 :param list scopes: 

2105 It is a list of case-sensitive strings. 

2106 :param str prompt: 

2107 By default, no prompt value will be sent, not even string ``"none"``. 

2108 You will have to specify a value explicitly. 

2109 Its valid values are the constants defined in 

2110 :class:`Prompt <msal.Prompt>`. 

2111 :param str login_hint: 

2112 Optional. Identifier of the user. Generally a User Principal Name (UPN). 

2113 :param domain_hint: 

2114 Can be one of "consumers" or "organizations" or your tenant domain "contoso.com". 

2115 If included, it will skip the email-based discovery process that user goes 

2116 through on the sign-in page, leading to a slightly more streamlined user experience. 

2117 More information on possible values available in 

2118 `Auth Code Flow doc <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-auth-code-flow#request-an-authorization-code>`_ and 

2119 `domain_hint doc <https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-oapx/86fb452d-e34a-494e-ac61-e526e263b6d8>`_. 

2120 

2121 :param claims_challenge: 

2122 The claims_challenge parameter requests specific claims requested by the resource provider 

2123 in the form of a claims_challenge directive in the www-authenticate header to be 

2124 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

2125 It is a string of a JSON object which contains lists of claims being requested from these locations. 

2126 

2127 :param int timeout: 

2128 This method will block the current thread. 

2129 This parameter specifies the timeout value in seconds. 

2130 Default value ``None`` means wait indefinitely. 

2131 

2132 :param int port: 

2133 The port to be used to listen to an incoming auth response. 

2134 By default we will use a system-allocated port. 

2135 (The rest of the redirect_uri is hard coded as ``http://localhost``.) 

2136 

2137 :param list extra_scopes_to_consent: 

2138 "Extra scopes to consent" is a concept only available in Microsoft Entra. 

2139 It refers to other resources you might want to prompt to consent for, 

2140 in the same interaction, but for which you won't get back a 

2141 token for in this particular operation. 

2142 

2143 :param int max_age: 

2144 OPTIONAL. Maximum Authentication Age. 

2145 Specifies the allowable elapsed time in seconds 

2146 since the last time the End-User was actively authenticated. 

2147 If the elapsed time is greater than this value, 

2148 Microsoft identity platform will actively re-authenticate the End-User. 

2149 

2150 MSAL Python will also automatically validate the auth_time in ID token. 

2151 

2152 New in version 1.15. 

2153 

2154 :param int parent_window_handle: 

2155 OPTIONAL. 

2156 

2157 * If your app does not opt in to use broker, 

2158 you do not need to provide a ``parent_window_handle`` here. 

2159 

2160 * If your app opts in to use broker, 

2161 ``parent_window_handle`` is required. 

2162 

2163 - If your app is a GUI app running on Windows or Mac system, 

2164 you are required to also provide its window handle, 

2165 so that the sign-in window will pop up on top of your window. 

2166 - If your app is a console app running on Windows or Mac system, 

2167 you can use a placeholder 

2168 ``PublicClientApplication.CONSOLE_WINDOW_HANDLE``. 

2169 

2170 Most Python scripts are console apps. 

2171 

2172 New in version 1.20.0. 

2173 

2174 :param function on_before_launching_ui: 

2175 A callback with the form of 

2176 ``lambda ui="xyz", **kwargs: print("A {} will be launched".format(ui))``, 

2177 where ``ui`` will be either "browser" or "broker". 

2178 You can use it to inform your end user to expect a pop-up window. 

2179 

2180 New in version 1.20.0. 

2181 

2182 :param object auth_scheme: 

2183 You can provide an ``msal.auth_scheme.PopAuthScheme`` object 

2184 so that MSAL will get a Proof-of-Possession (POP) token for you. 

2185 

2186 New in version 1.26.0. 

2187 

2188 :return: 

2189 - A dict containing no "error" key, 

2190 and typically contains an "access_token" key. 

2191 - A dict containing an "error" key, when token refresh failed. 

2192 """ 

2193 data = kwargs.pop("data", {}) 

2194 enable_msa_passthrough = kwargs.pop( # MUST remove it from kwargs 

2195 "enable_msa_passthrough", # Keep it as a hidden param, for now. 

2196 # OPTIONAL. MSA-Passthrough is a legacy configuration, 

2197 # needed by a small amount of Microsoft first-party apps, 

2198 # which would login MSA accounts via ".../organizations" authority. 

2199 # If you app belongs to this category, AND you are enabling broker, 

2200 # you would want to enable this flag. Default value is False. 

2201 # More background of MSA-PT is available from this internal docs: 

2202 # https://microsoft.sharepoint.com/:w:/t/Identity-DevEx/EatIUauX3c9Ctw1l7AQ6iM8B5CeBZxc58eoQCE0IuZ0VFw?e=tgc3jP&CID=39c853be-76ea-79d7-ee73-f1b2706ede05 

2203 False 

2204 ) and data.get("token_type") != "ssh-cert" # Work around a known issue as of PyMsalRuntime 0.8 

2205 self._validate_ssh_cert_input_data(data) 

2206 is_ssh_cert_or_pop_request = _is_ssh_cert_or_pop_request(data.get("token_type"), auth_scheme) 

2207 

2208 if not on_before_launching_ui: 

2209 on_before_launching_ui = lambda **kwargs: None 

2210 if _is_running_in_cloud_shell() and prompt == "none": 

2211 # Note: _acquire_token_by_cloud_shell() is always silent, 

2212 # so we would not fire on_before_launching_ui() 

2213 return self._acquire_token_by_cloud_shell(scopes, data=data) 

2214 claims = _merge_claims_challenge_and_capabilities( 

2215 self._client_capabilities, claims_challenge) 

2216 if self._enable_broker and (sys.platform in ("win32", "darwin") or not is_ssh_cert_or_pop_request): 

2217 if parent_window_handle is None: 

2218 raise ValueError( 

2219 "parent_window_handle is required when you opted into using broker. " 

2220 "You need to provide the window handle of your GUI application, " 

2221 "or use msal.PublicClientApplication.CONSOLE_WINDOW_HANDLE " 

2222 "when and only when your application is a console app.") 

2223 if extra_scopes_to_consent: 

2224 logger.warning( 

2225 "Ignoring parameter extra_scopes_to_consent, " 

2226 "which is not supported by broker") 

2227 response = self._acquire_token_interactive_via_broker( 

2228 scopes, 

2229 parent_window_handle, 

2230 enable_msa_passthrough, 

2231 claims, 

2232 data, 

2233 on_before_launching_ui, 

2234 auth_scheme, 

2235 prompt=prompt, 

2236 login_hint=login_hint, 

2237 max_age=max_age, 

2238 ) 

2239 return self._process_broker_response(response, scopes, data) 

2240 

2241 if isinstance(auth_scheme, msal.auth_scheme.PopAuthScheme) and sys.platform == "linux": 

2242 raise ValueError("POP is not supported on Linux") 

2243 elif auth_scheme: 

2244 raise ValueError(self._AUTH_SCHEME_UNSUPPORTED) 

2245 on_before_launching_ui(ui="browser") 

2246 telemetry_context = self._build_telemetry_context( 

2247 self.ACQUIRE_TOKEN_INTERACTIVE) 

2248 response = _clean_up(self.client.obtain_token_by_browser( 

2249 scope=self._decorate_scope(scopes) if scopes else None, 

2250 extra_scope_to_consent=extra_scopes_to_consent, 

2251 redirect_uri="http://localhost:{port}".format( 

2252 # Hardcode the host, for now. AAD portal rejects 127.0.0.1 anyway 

2253 port=port or 0), 

2254 prompt=prompt, 

2255 login_hint=login_hint, 

2256 max_age=max_age, 

2257 timeout=timeout, 

2258 auth_params={ 

2259 "claims": claims, 

2260 "domain_hint": domain_hint, 

2261 }, 

2262 data=dict(data, claims=claims), 

2263 headers=telemetry_context.generate_headers(), 

2264 browser_name=_preferred_browser(), 

2265 **kwargs)) 

2266 if "access_token" in response: 

2267 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

2268 telemetry_context.update_telemetry(response) 

2269 return response 

2270 

2271 def _acquire_token_interactive_via_broker( 

2272 self, 

2273 scopes, # type: list[str] 

2274 parent_window_handle, # type: int 

2275 enable_msa_passthrough, # type: boolean 

2276 claims, # type: str 

2277 data, # type: dict 

2278 on_before_launching_ui, # type: callable 

2279 auth_scheme, # type: object 

2280 prompt=None, 

2281 login_hint=None, # type: Optional[str] 

2282 max_age=None, 

2283 **kwargs): 

2284 from .broker import _signin_interactively, _signin_silently, _acquire_token_silently 

2285 if "welcome_template" in kwargs: 

2286 logger.debug(kwargs["welcome_template"]) # Experimental 

2287 authority = "https://{}/{}".format( 

2288 self.authority.instance, self.authority.tenant) 

2289 validate_authority = "no" if ( 

2290 self.authority._is_known_to_developer 

2291 or self._instance_discovery is False) else None 

2292 # Calls different broker methods to mimic the OIDC behaviors 

2293 if login_hint and prompt != "select_account": # OIDC prompts when the user did not sign in 

2294 accounts = self.get_accounts(username=login_hint) 

2295 if len(accounts) == 1: # Unambiguously proceed with this account 

2296 logger.debug("Calling broker._acquire_token_silently()") 

2297 response = _acquire_token_silently( # When it works, it bypasses prompt 

2298 authority, 

2299 self.client_id, 

2300 accounts[0]["local_account_id"], 

2301 scopes, 

2302 claims=claims, 

2303 auth_scheme=auth_scheme, 

2304 **data) 

2305 if response and "error" not in response: 

2306 return response 

2307 # login_hint undecisive or not exists 

2308 if prompt == "none" or not prompt: # Must/Can attempt _signin_silently() 

2309 logger.debug("Calling broker._signin_silently()") 

2310 response = _signin_silently( # Unlike OIDC, it doesn't honor login_hint 

2311 authority, self.client_id, scopes, 

2312 validateAuthority=validate_authority, 

2313 claims=claims, 

2314 max_age=max_age, 

2315 enable_msa_pt=enable_msa_passthrough, 

2316 auth_scheme=auth_scheme, 

2317 **data) 

2318 is_wrong_account = bool( 

2319 # _signin_silently() only gets tokens for default account, 

2320 # but this seems to have been fixed in PyMsalRuntime 0.11.2 

2321 "access_token" in response and login_hint 

2322 and login_hint != response.get( 

2323 "id_token_claims", {}).get("preferred_username")) 

2324 wrong_account_error_message = ( 

2325 'prompt="none" will not work for login_hint="non-default-user"') 

2326 if is_wrong_account: 

2327 logger.debug(wrong_account_error_message) 

2328 if prompt == "none": 

2329 return response if not is_wrong_account else { 

2330 "error": "broker_error", 

2331 "error_description": wrong_account_error_message, 

2332 } 

2333 else: 

2334 assert bool(prompt) is False 

2335 from pymsalruntime import Response_Status 

2336 recoverable_errors = frozenset([ 

2337 Response_Status.Status_AccountUnusable, 

2338 Response_Status.Status_InteractionRequired, 

2339 ]) 

2340 if is_wrong_account or "error" in response and response.get( 

2341 "_broker_status") in recoverable_errors: 

2342 pass # It will fall back to the _signin_interactively() 

2343 else: 

2344 return response 

2345 

2346 logger.debug("Falls back to broker._signin_interactively()") 

2347 on_before_launching_ui(ui="broker") 

2348 return _signin_interactively( 

2349 authority, self.client_id, scopes, 

2350 None if parent_window_handle is self.CONSOLE_WINDOW_HANDLE 

2351 else parent_window_handle, 

2352 validateAuthority=validate_authority, 

2353 login_hint=login_hint, 

2354 prompt=prompt, 

2355 claims=claims, 

2356 max_age=max_age, 

2357 enable_msa_pt=enable_msa_passthrough, 

2358 auth_scheme=auth_scheme, 

2359 **data) 

2360 

2361 def initiate_device_flow(self, scopes=None, *, claims_challenge=None, **kwargs): 

2362 """Initiate a Device Flow instance, 

2363 which will be used in :func:`~acquire_token_by_device_flow`. 

2364 

2365 :param list[str] scopes: 

2366 Scopes requested to access a protected API (a resource). 

2367 :return: A dict representing a newly created Device Flow object. 

2368 

2369 - A successful response would contain "user_code" key, among others 

2370 - an error response would contain some other readable key/value pairs. 

2371 """ 

2372 correlation_id = msal.telemetry._get_new_correlation_id() 

2373 flow = self.client.initiate_device_flow( 

2374 scope=self._decorate_scope(scopes or []), 

2375 headers={msal.telemetry.CLIENT_REQUEST_ID: correlation_id}, 

2376 data={"claims": _merge_claims_challenge_and_capabilities( 

2377 self._client_capabilities, claims_challenge)}, 

2378 **kwargs) 

2379 flow[self.DEVICE_FLOW_CORRELATION_ID] = correlation_id 

2380 return flow 

2381 

2382 def acquire_token_by_device_flow(self, flow, claims_challenge=None, **kwargs): 

2383 """Obtain token by a device flow object, with customizable polling effect. 

2384 

2385 :param dict flow: 

2386 A dict previously generated by :func:`~initiate_device_flow`. 

2387 By default, this method's polling effect will block current thread. 

2388 You can abort the polling loop at any time, 

2389 by changing the value of the flow's "expires_at" key to 0. 

2390 :param claims_challenge: 

2391 The claims_challenge parameter requests specific claims requested by the resource provider 

2392 in the form of a claims_challenge directive in the www-authenticate header to be 

2393 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

2394 It is a string of a JSON object which contains lists of claims being requested from these locations. 

2395 

2396 :return: A dict representing the json response from Microsoft Entra: 

2397 

2398 - A successful response would contain "access_token" key, 

2399 - an error response would contain "error" and usually "error_description". 

2400 """ 

2401 telemetry_context = self._build_telemetry_context( 

2402 self.ACQUIRE_TOKEN_BY_DEVICE_FLOW_ID, 

2403 correlation_id=flow.get(self.DEVICE_FLOW_CORRELATION_ID)) 

2404 response = _clean_up(self.client.obtain_token_by_device_flow( 

2405 flow, 

2406 data=dict( 

2407 kwargs.pop("data", {}), 

2408 code=flow["device_code"], # 2018-10-4 Hack: 

2409 # during transition period, 

2410 # service seemingly need both device_code and code parameter. 

2411 claims=_merge_claims_challenge_and_capabilities( 

2412 self._client_capabilities, claims_challenge), 

2413 ), 

2414 headers=telemetry_context.generate_headers(), 

2415 **kwargs)) 

2416 if "access_token" in response: 

2417 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

2418 telemetry_context.update_telemetry(response) 

2419 return response 

2420 

2421 

2422class ConfidentialClientApplication(ClientApplication): # server-side web app 

2423 """Same as :func:`ClientApplication.__init__`, 

2424 except that ``allow_broker`` parameter shall remain ``None``. 

2425 """ 

2426 

2427 def acquire_token_for_client(self, scopes, claims_challenge=None, **kwargs): 

2428 """Acquires token for the current confidential client, not for an end user. 

2429 

2430 Since MSAL Python 1.23, it will automatically look for token from cache, 

2431 and only send request to Identity Provider when cache misses. 

2432 

2433 :param list[str] scopes: (Required) 

2434 Scopes requested to access a protected API (a resource). 

2435 :param claims_challenge: 

2436 The claims_challenge parameter requests specific claims requested by the resource provider 

2437 in the form of a claims_challenge directive in the www-authenticate header to be 

2438 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

2439 It is a string of a JSON object which contains lists of claims being requested from these locations. 

2440 

2441 :return: A dict representing the json response from Microsoft Entra: 

2442 

2443 - A successful response would contain "access_token" key, 

2444 - an error response would contain "error" and usually "error_description". 

2445 """ 

2446 if kwargs.get("force_refresh"): 

2447 raise ValueError( # We choose to disallow force_refresh 

2448 "Historically, this method does not support force_refresh behavior. " 

2449 ) 

2450 return _clean_up(self._acquire_token_silent_with_error( 

2451 scopes, None, claims_challenge=claims_challenge, **kwargs)) 

2452 

2453 def _acquire_token_for_client( 

2454 self, 

2455 scopes, 

2456 refresh_reason, 

2457 claims_challenge=None, 

2458 **kwargs 

2459 ): 

2460 if self.authority.tenant.lower() in ["common", "organizations"]: 

2461 warnings.warn( 

2462 "Using /common or /organizations authority " 

2463 "in acquire_token_for_client() is unreliable. " 

2464 "Please use a specific tenant instead.", DeprecationWarning) 

2465 self._validate_ssh_cert_input_data(kwargs.get("data", {})) 

2466 telemetry_context = self._build_telemetry_context( 

2467 self.ACQUIRE_TOKEN_FOR_CLIENT_ID, refresh_reason=refresh_reason) 

2468 client = self._regional_client or self.client 

2469 response = client.obtain_token_for_client( 

2470 scope=scopes, # This grant flow requires no scope decoration 

2471 headers=telemetry_context.generate_headers(), 

2472 data=dict( 

2473 kwargs.pop("data", {}), 

2474 claims=_merge_claims_challenge_and_capabilities( 

2475 self._client_capabilities, claims_challenge)), 

2476 **kwargs) 

2477 telemetry_context.update_telemetry(response) 

2478 return response 

2479 

2480 def remove_tokens_for_client(self): 

2481 """Remove all tokens that were previously acquired via 

2482 :func:`~acquire_token_for_client()` for the current client.""" 

2483 for env in [self.authority.instance] + self._get_authority_aliases( 

2484 self.authority.instance): 

2485 for at in list(self.token_cache.search( # Remove ATs from a snapshot 

2486 TokenCache.CredentialType.ACCESS_TOKEN, query={ 

2487 "client_id": self.client_id, 

2488 "environment": env, 

2489 "home_account_id": None, # These are mostly app-only tokens 

2490 })): 

2491 self.token_cache.remove_at(at) 

2492 # acquire_token_for_client() obtains no RTs, so we have no RT to remove 

2493 

2494 def acquire_token_on_behalf_of(self, user_assertion, scopes, claims_challenge=None, **kwargs): 

2495 """Acquires token using on-behalf-of (OBO) flow. 

2496 

2497 The current app is a middle-tier service which was called with a token 

2498 representing an end user. 

2499 The current app can use such token (a.k.a. a user assertion) to request 

2500 another token to access downstream web API, on behalf of that user. 

2501 See `detail docs here <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-on-behalf-of-flow>`_ . 

2502 

2503 The current middle-tier app has no user interaction to obtain consent. 

2504 See how to gain consent upfront for your middle-tier app from this article. 

2505 https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-on-behalf-of-flow#gaining-consent-for-the-middle-tier-application 

2506 

2507 :param str user_assertion: The incoming token already received by this app 

2508 :param list[str] scopes: Scopes required by downstream API (a resource). 

2509 :param claims_challenge: 

2510 The claims_challenge parameter requests specific claims requested by the resource provider 

2511 in the form of a claims_challenge directive in the www-authenticate header to be 

2512 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

2513 It is a string of a JSON object which contains lists of claims being requested from these locations. 

2514 

2515 :return: A dict representing the json response from Microsoft Entra: 

2516 

2517 - A successful response would contain "access_token" key, 

2518 - an error response would contain "error" and usually "error_description". 

2519 """ 

2520 telemetry_context = self._build_telemetry_context( 

2521 self.ACQUIRE_TOKEN_ON_BEHALF_OF_ID) 

2522 # The implementation is NOT based on Token Exchange (RFC 8693) 

2523 response = _clean_up(self.client.obtain_token_by_assertion( # bases on assertion RFC 7521 

2524 user_assertion, 

2525 self.client.GRANT_TYPE_JWT, # IDTs and AAD ATs are all JWTs 

2526 scope=self._decorate_scope(scopes), # Decoration is used for: 

2527 # 1. Explicitly requesting an RT, without relying on AAD default 

2528 # behavior, even though it currently still issues an RT. 

2529 # 2. Requesting an IDT (which would otherwise be unavailable) 

2530 # so that the calling app could use id_token_claims to implement 

2531 # their own cache mapping, which is likely needed in web apps. 

2532 data=dict( 

2533 kwargs.pop("data", {}), 

2534 requested_token_use="on_behalf_of", 

2535 claims=_merge_claims_challenge_and_capabilities( 

2536 self._client_capabilities, claims_challenge)), 

2537 headers=telemetry_context.generate_headers(), 

2538 # TBD: Expose a login_hint (or ccs_routing_hint) param for web app 

2539 **kwargs)) 

2540 if "access_token" in response: 

2541 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

2542 telemetry_context.update_telemetry(response) 

2543 return response