Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/msal/application.py: 16%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

672 statements  

1import functools 

2import json 

3import time 

4import logging 

5import sys 

6import warnings 

7from threading import Lock 

8from typing import Optional # Needed in Python 3.7 & 3.8 

9from urllib.parse import urlparse 

10import os 

11 

12from .oauth2cli import Client, JwtAssertionCreator 

13from .oauth2cli.oidc import decode_part 

14from .authority import Authority, WORLD_WIDE 

15from .mex import send_request as mex_send_request 

16from .wstrust_request import send_request as wst_send_request 

17from .wstrust_response import * 

18from .token_cache import TokenCache, _get_username, _GRANT_TYPE_BROKER 

19import msal.telemetry 

20from .region import _detect_region 

21from .throttled_http_client import ThrottledHttpClient 

22from .cloudshell import _is_running_in_cloud_shell 

23from .sku import SKU, __version__ 

24from .oauth2cli.authcode import is_wsl 

25 

26 

27logger = logging.getLogger(__name__) 

28_AUTHORITY_TYPE_CLOUDSHELL = "CLOUDSHELL" 

29 

30def _init_broker(enable_pii_log): # Make it a function to allow mocking 

31 from . import broker # Trigger Broker's initialization, lazily 

32 if enable_pii_log: 

33 broker._enable_pii_log() 

34 

35def extract_certs(public_cert_content): 

36 # Parses raw public certificate file contents and returns a list of strings 

37 # Usage: headers = {"x5c": extract_certs(open("my_cert.pem").read())} 

38 public_certificates = re.findall( 

39 r'-----BEGIN CERTIFICATE-----(?P<cert_value>[^-]+)-----END CERTIFICATE-----', 

40 public_cert_content, re.I) 

41 if public_certificates: 

42 return [cert.strip() for cert in public_certificates] 

43 # The public cert tags are not found in the input, 

44 # let's make best effort to exclude a private key pem file. 

45 if "PRIVATE KEY" in public_cert_content: 

46 raise ValueError( 

47 "We expect your public key but detect a private key instead") 

48 return [public_cert_content.strip()] 

49 

50 

51def _merge_claims_challenge_and_capabilities(capabilities, claims_challenge): 

52 # Represent capabilities as {"access_token": {"xms_cc": {"values": capabilities}}} 

53 # and then merge/add it into incoming claims 

54 if not capabilities: 

55 return claims_challenge 

56 claims_dict = json.loads(claims_challenge) if claims_challenge else {} 

57 for key in ["access_token"]: # We could add "id_token" if we'd decide to 

58 claims_dict.setdefault(key, {}).update(xms_cc={"values": capabilities}) 

59 return json.dumps(claims_dict) 

60 

61 

62def _str2bytes(raw): 

63 # A conversion based on duck-typing rather than six.text_type 

64 try: 

65 return raw.encode(encoding="utf-8") 

66 except: 

67 return raw 

68 

69def _extract_cert_and_thumbprints(cert): 

70 # Cert concepts https://security.stackexchange.com/a/226758/125264 

71 from cryptography.hazmat.primitives import hashes, serialization 

72 cert_pem = cert.public_bytes( # Requires cryptography 1.0+ 

73 encoding=serialization.Encoding.PEM).decode() 

74 x5c = [ 

75 '\n'.join( 

76 cert_pem.splitlines() 

77 [1:-1] # Strip the "--- header ---" and "--- footer ---" 

78 ) 

79 ] 

80 # https://cryptography.io/en/latest/x509/reference/#x-509-certificate-object 

81 sha256_thumbprint = cert.fingerprint(hashes.SHA256()).hex() # Requires cryptography 0.7+ 

82 sha1_thumbprint = cert.fingerprint(hashes.SHA1()).hex() # Requires cryptography 0.7+ 

83 return sha256_thumbprint, sha1_thumbprint, x5c 

84 

85def _parse_pfx(pfx_path, passphrase_bytes): 

86 # Cert concepts https://security.stackexchange.com/a/226758/125264 

87 from cryptography.hazmat.primitives.serialization import pkcs12 

88 with open(pfx_path, 'rb') as f: 

89 private_key, cert, _ = pkcs12.load_key_and_certificates( # cryptography 2.5+ 

90 # https://cryptography.io/en/latest/hazmat/primitives/asymmetric/serialization/#cryptography.hazmat.primitives.serialization.pkcs12.load_key_and_certificates 

91 f.read(), passphrase_bytes) 

92 if not (private_key and cert): 

93 raise ValueError("Your PFX file shall contain both private key and cert") 

94 sha256_thumbprint, sha1_thumbprint, x5c = _extract_cert_and_thumbprints(cert) 

95 return private_key, sha256_thumbprint, sha1_thumbprint, x5c 

96 

97 

98def _load_private_key_from_pem_str(private_key_pem_str, passphrase_bytes): 

99 from cryptography.hazmat.primitives import serialization 

100 from cryptography.hazmat.backends import default_backend 

101 return serialization.load_pem_private_key( # cryptography 0.6+ 

102 _str2bytes(private_key_pem_str), 

103 passphrase_bytes, 

104 backend=default_backend(), # It was a required param until 2020 

105 ) 

106 

107 

108def _pii_less_home_account_id(home_account_id): 

109 parts = home_account_id.split(".") # It could contain one or two parts 

110 parts[0] = "********" 

111 return ".".join(parts) 

112 

113 

114def _clean_up(result): 

115 if isinstance(result, dict): 

116 if "_msalruntime_telemetry" in result or "_msal_python_telemetry" in result: 

117 result["msal_telemetry"] = json.dumps({ # Telemetry as an opaque string 

118 "msalruntime_telemetry": result.get("_msalruntime_telemetry"), 

119 "msal_python_telemetry": result.get("_msal_python_telemetry"), 

120 }, separators=(",", ":")) 

121 return_value = { 

122 k: result[k] for k in result 

123 if k != "refresh_in" # MSAL handled refresh_in, customers need not 

124 and not k.startswith('_') # Skim internal properties 

125 } 

126 if "refresh_in" in result: # To encourage proactive refresh 

127 return_value["refresh_on"] = int(time.time() + result["refresh_in"]) 

128 return return_value 

129 return result # It could be None 

130 

131 

132def _preferred_browser(): 

133 """Register Edge and return a name suitable for subsequent webbrowser.get(...) 

134 when appropriate. Otherwise return None. 

135 """ 

136 # On Linux, only Edge will provide device-based Conditional Access support 

137 if sys.platform != "linux": # On other platforms, we have no browser preference 

138 return None 

139 browser_path = "/usr/bin/microsoft-edge" # Use a full path owned by sys admin 

140 # Note: /usr/bin/microsoft-edge, /usr/bin/microsoft-edge-stable, etc. 

141 # are symlinks that point to the actual binaries which are found under 

142 # /opt/microsoft/msedge/msedge or /opt/microsoft/msedge-beta/msedge. 

143 # Either method can be used to detect an Edge installation. 

144 user_has_no_preference = "BROWSER" not in os.environ 

145 user_wont_mind_edge = "microsoft-edge" in os.environ.get("BROWSER", "") # Note: 

146 # BROWSER could contain "microsoft-edge" or "/path/to/microsoft-edge". 

147 # Python documentation (https://docs.python.org/3/library/webbrowser.html) 

148 # does not document the name being implicitly register, 

149 # so there is no public API to know whether the ENV VAR browser would work. 

150 # Therefore, we would not bother examine the env var browser's type. 

151 # We would just register our own Edge instance. 

152 if (user_has_no_preference or user_wont_mind_edge) and os.path.exists(browser_path): 

153 try: 

154 import webbrowser # Lazy import. Some distro may not have this. 

155 browser_name = "msal-edge" # Avoid popular name "microsoft-edge" 

156 # otherwise `BROWSER="microsoft-edge"; webbrowser.get("microsoft-edge")` 

157 # would return a GenericBrowser instance which won't work. 

158 try: 

159 registration_available = isinstance( 

160 webbrowser.get(browser_name), webbrowser.BackgroundBrowser) 

161 except webbrowser.Error: 

162 registration_available = False 

163 if not registration_available: 

164 logger.debug("Register %s with %s", browser_name, browser_path) 

165 # By registering our own browser instance with our own name, 

166 # rather than populating a process-wide BROWSER enn var, 

167 # this approach does not have side effect on non-MSAL code path. 

168 webbrowser.register( # Even double-register happens to work fine 

169 browser_name, None, webbrowser.BackgroundBrowser(browser_path)) 

170 return browser_name 

171 except ImportError: 

172 pass # We may still proceed 

173 return None 

174 

175def _is_ssh_cert_or_pop_request(token_type, auth_scheme) -> bool: 

176 return token_type == "ssh-cert" or token_type == "pop" or isinstance(auth_scheme, msal.auth_scheme.PopAuthScheme) 

177 

178class _ClientWithCcsRoutingInfo(Client): 

179 

180 def initiate_auth_code_flow(self, **kwargs): 

181 if kwargs.get("login_hint"): # eSTS could have utilized this as-is, but nope 

182 kwargs["X-AnchorMailbox"] = "UPN:%s" % kwargs["login_hint"] 

183 return super(_ClientWithCcsRoutingInfo, self).initiate_auth_code_flow( 

184 client_info=1, # To be used as CSS Routing info 

185 **kwargs) 

186 

187 def obtain_token_by_auth_code_flow( 

188 self, auth_code_flow, auth_response, **kwargs): 

189 # Note: the obtain_token_by_browser() is also covered by this 

190 assert isinstance(auth_code_flow, dict) and isinstance(auth_response, dict) 

191 headers = kwargs.pop("headers", {}) 

192 client_info = json.loads( 

193 decode_part(auth_response["client_info"]) 

194 ) if auth_response.get("client_info") else {} 

195 if "uid" in client_info and "utid" in client_info: 

196 # Note: The value of X-AnchorMailbox is also case-insensitive 

197 headers["X-AnchorMailbox"] = "Oid:{uid}@{utid}".format(**client_info) 

198 return super(_ClientWithCcsRoutingInfo, self).obtain_token_by_auth_code_flow( 

199 auth_code_flow, auth_response, headers=headers, **kwargs) 

200 

201 def obtain_token_by_username_password(self, username, password, **kwargs): 

202 headers = kwargs.pop("headers", {}) 

203 headers["X-AnchorMailbox"] = "upn:{}".format(username) 

204 return super(_ClientWithCcsRoutingInfo, self).obtain_token_by_username_password( 

205 username, password, headers=headers, **kwargs) 

206 

207 

208def _msal_extension_check(): 

209 # Can't run this in module or class level otherwise you'll get circular import error 

210 try: 

211 from msal_extensions import __version__ as v 

212 major, minor, _ = v.split(".", maxsplit=3) 

213 if not (int(major) >= 1 and int(minor) >= 2): 

214 warnings.warn( 

215 "Please upgrade msal-extensions. " 

216 "Only msal-extensions 1.2+ can work with msal 1.30+") 

217 except ImportError: 

218 pass # The optional msal_extensions is not installed. Business as usual. 

219 except ValueError: 

220 logger.exception(f"msal_extensions version {v} not in major.minor.patch format") 

221 except: 

222 logger.exception( 

223 "Unable to import msal_extensions during an optional check. " 

224 "This exception can be safely ignored." 

225 ) 

226 

227 

228class ClientApplication(object): 

229 """You do not usually directly use this class. Use its subclasses instead: 

230 :class:`PublicClientApplication` and :class:`ConfidentialClientApplication`. 

231 """ 

232 ACQUIRE_TOKEN_SILENT_ID = "84" 

233 ACQUIRE_TOKEN_BY_REFRESH_TOKEN = "85" 

234 ACQUIRE_TOKEN_BY_USERNAME_PASSWORD_ID = "301" 

235 ACQUIRE_TOKEN_ON_BEHALF_OF_ID = "523" 

236 ACQUIRE_TOKEN_BY_DEVICE_FLOW_ID = "622" 

237 ACQUIRE_TOKEN_FOR_CLIENT_ID = "730" 

238 ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID = "832" 

239 ACQUIRE_TOKEN_INTERACTIVE = "169" 

240 GET_ACCOUNTS_ID = "902" 

241 REMOVE_ACCOUNT_ID = "903" 

242 

243 ATTEMPT_REGION_DISCOVERY = True # "TryAutoDetect" 

244 DISABLE_MSAL_FORCE_REGION = False # Used in azure_region to disable MSAL_FORCE_REGION behavior 

245 _TOKEN_SOURCE = "token_source" 

246 _TOKEN_SOURCE_IDP = "identity_provider" 

247 _TOKEN_SOURCE_CACHE = "cache" 

248 _TOKEN_SOURCE_BROKER = "broker" 

249 

250 _enable_broker = False 

251 _AUTH_SCHEME_UNSUPPORTED = ( 

252 "auth_scheme is currently only available from broker. " 

253 "You can enable broker by following these instructions. " 

254 "https://msal-python.readthedocs.io/en/latest/#publicclientapplication") 

255 

256 def __init__( 

257 self, client_id, 

258 client_credential=None, authority=None, validate_authority=True, 

259 token_cache=None, 

260 http_client=None, 

261 verify=True, proxies=None, timeout=None, 

262 client_claims=None, app_name=None, app_version=None, 

263 client_capabilities=None, 

264 azure_region=None, # Note: We choose to add this param in this base class, 

265 # despite it is currently only needed by ConfidentialClientApplication. 

266 # This way, it holds the same positional param place for PCA, 

267 # when we would eventually want to add this feature to PCA in future. 

268 exclude_scopes=None, 

269 http_cache=None, 

270 instance_discovery=None, 

271 allow_broker=None, 

272 enable_pii_log=None, 

273 oidc_authority=None, 

274 ): 

275 """Create an instance of application. 

276 

277 :param str client_id: Your app has a client_id after you register it on Microsoft Entra admin center. 

278 

279 :param client_credential: 

280 For :class:`PublicClientApplication`, you use `None` here. 

281 

282 For :class:`ConfidentialClientApplication`, 

283 it supports many different input formats for different scenarios. 

284 

285 .. admonition:: Support using a client secret. 

286 

287 Just feed in a string, such as ``"your client secret"``. 

288 

289 .. admonition:: Support using a certificate in X.509 (.pem) format 

290 

291 Deprecated because it uses SHA-1 thumbprint, 

292 unless you are still using ADFS which supports SHA-1 thumbprint only. 

293 Please use the .pfx option documented later in this page. 

294 

295 Feed in a dict in this form:: 

296 

297 { 

298 "private_key": "...-----BEGIN PRIVATE KEY-----... in PEM format", 

299 "thumbprint": "An SHA-1 thumbprint such as A1B2C3D4E5F6..." 

300 "Changed in version 1.35.0, if thumbprint is absent" 

301 "and a public_certificate is present, MSAL will" 

302 "automatically calculate an SHA-256 thumbprint instead.", 

303 "passphrase": "Needed if the private_key is encrypted (Added in version 1.6.0)", 

304 "public_certificate": "...-----BEGIN CERTIFICATE-----...", # Needed if you use Subject Name/Issuer auth. Added in version 0.5.0. 

305 } 

306 

307 MSAL Python requires a "private_key" in PEM format. 

308 If your cert is in PKCS12 (.pfx) format, 

309 you can convert it to X.509 (.pem) format, 

310 by ``openssl pkcs12 -in file.pfx -out file.pem -nodes``. 

311 

312 The thumbprint is available in your app's registration in Azure Portal. 

313 Alternatively, you can `calculate the thumbprint <https://github.com/Azure/azure-sdk-for-python/blob/07d10639d7e47f4852eaeb74aef5d569db499d6e/sdk/identity/azure-identity/azure/identity/_credentials/certificate.py#L94-L97>`_. 

314 

315 ``public_certificate`` (optional) is public key certificate 

316 which will be sent through 'x5c' JWT header. 

317 This is useful when you use `Subject Name/Issuer Authentication 

318 <https://github.com/AzureAD/microsoft-authentication-library-for-python/issues/60>`_ 

319 which is an approach to allow easier certificate rotation. 

320 Per `specs <https://tools.ietf.org/html/rfc7515#section-4.1.6>`_, 

321 "the certificate containing 

322 the public key corresponding to the key used to digitally sign the 

323 JWS MUST be the first certificate. This MAY be followed by 

324 additional certificates, with each subsequent certificate being the 

325 one used to certify the previous one." 

326 However, your certificate's issuer may use a different order. 

327 So, if your attempt ends up with an error AADSTS700027 - 

328 "The provided signature value did not match the expected signature value", 

329 you may try use only the leaf cert (in PEM/str format) instead. 

330 

331 .. admonition:: Supporting raw assertion obtained from elsewhere 

332 

333 *Added in version 1.13.0*: 

334 It can also be a completely pre-signed assertion that you've assembled yourself. 

335 Simply pass a container containing only the key "client_assertion", like this:: 

336 

337 { 

338 "client_assertion": "...a JWT with claims aud, exp, iss, jti, nbf, and sub..." 

339 } 

340 

341 .. admonition:: Supporting reading client certificates from PFX files 

342 

343 This usage will automatically use SHA-256 thumbprint of the certificate. 

344 

345 *Added in version 1.29.0*: 

346 Feed in a dictionary containing the path to a PFX file:: 

347 

348 { 

349 "private_key_pfx_path": "/path/to/your.pfx", # Added in version 1.29.0 

350 "public_certificate": True, # Only needed if you use Subject Name/Issuer auth. Added in version 1.30.0 

351 "passphrase": "Passphrase if the private_key is encrypted (Optional)", 

352 } 

353 

354 The following command will generate a .pfx file from your .key and .pem file:: 

355 

356 openssl pkcs12 -export -out certificate.pfx -inkey privateKey.key -in certificate.pem 

357 

358 `Subject Name/Issuer Auth 

359 <https://github.com/AzureAD/microsoft-authentication-library-for-python/issues/60>`_ 

360 is an approach to allow easier certificate rotation. 

361 If your .pfx file contains both the private key and public cert, 

362 you can opt in for Subject Name/Issuer Auth by setting "public_certificate" to ``True``. 

363 

364 :type client_credential: Union[dict, str, None] 

365 

366 :param dict client_claims: 

367 *Added in version 0.5.0*: 

368 It is a dictionary of extra claims that would be signed by 

369 by this :class:`ConfidentialClientApplication` 's private key. 

370 For example, you can use {"client_ip": "x.x.x.x"}. 

371 You may also override any of the following default claims:: 

372 

373 { 

374 "aud": the_token_endpoint, 

375 "iss": self.client_id, 

376 "sub": same_as_issuer, 

377 "exp": now + 10_min, 

378 "iat": now, 

379 "jti": a_random_uuid 

380 } 

381 

382 :param str authority: 

383 A URL that identifies a token authority. It should be of the format 

384 ``https://login.microsoftonline.com/your_tenant`` 

385 By default, we will use ``https://login.microsoftonline.com/common`` 

386 

387 *Changed in version 1.17*: you can also use predefined constant 

388 and a builder like this:: 

389 

390 from msal.authority import ( 

391 AuthorityBuilder, 

392 AZURE_US_GOVERNMENT, AZURE_CHINA, AZURE_PUBLIC) 

393 my_authority = AuthorityBuilder(AZURE_PUBLIC, "contoso.onmicrosoft.com") 

394 # Now you get an equivalent of 

395 # "https://login.microsoftonline.com/contoso.onmicrosoft.com" 

396 

397 # You can feed such an authority to msal's ClientApplication 

398 from msal import PublicClientApplication 

399 app = PublicClientApplication("my_client_id", authority=my_authority, ...) 

400 

401 :param bool validate_authority: (optional) Turns authority validation 

402 on or off. This parameter default to true. 

403 :param TokenCache token_cache: 

404 Sets the token cache used by this ClientApplication instance. 

405 By default, an in-memory cache will be created and used. 

406 :param http_client: (optional) 

407 Your implementation of abstract class HttpClient <msal.oauth2cli.http.http_client> 

408 Defaults to a requests session instance. 

409 Since MSAL 1.11.0, the default session would be configured 

410 to attempt one retry on connection error. 

411 If you are providing your own http_client, 

412 it will be your http_client's duty to decide whether to perform retry. 

413 

414 :param verify: (optional) 

415 It will be passed to the 

416 `verify parameter in the underlying requests library 

417 <http://docs.python-requests.org/en/v2.9.1/user/advanced/#ssl-cert-verification>`_ 

418 This does not apply if you have chosen to pass your own Http client 

419 :param proxies: (optional) 

420 It will be passed to the 

421 `proxies parameter in the underlying requests library 

422 <http://docs.python-requests.org/en/v2.9.1/user/advanced/#proxies>`_ 

423 This does not apply if you have chosen to pass your own Http client 

424 :param timeout: (optional) 

425 It will be passed to the 

426 `timeout parameter in the underlying requests library 

427 <http://docs.python-requests.org/en/v2.9.1/user/advanced/#timeouts>`_ 

428 This does not apply if you have chosen to pass your own Http client 

429 :param app_name: (optional) 

430 You can provide your application name for Microsoft telemetry purposes. 

431 Default value is None, means it will not be passed to Microsoft. 

432 :param app_version: (optional) 

433 You can provide your application version for Microsoft telemetry purposes. 

434 Default value is None, means it will not be passed to Microsoft. 

435 :param list[str] client_capabilities: (optional) 

436 Allows configuration of one or more client capabilities, e.g. ["CP1"]. 

437 

438 Client capability is meant to inform the Microsoft identity platform 

439 (STS) what this client is capable for, 

440 so STS can decide to turn on certain features. 

441 For example, if client is capable to handle *claims challenge*, 

442 STS may issue 

443 `Continuous Access Evaluation (CAE) <https://learn.microsoft.com/entra/identity/conditional-access/concept-continuous-access-evaluation>`_ 

444 access tokens to resources, 

445 knowing that when the resource emits a *claims challenge* 

446 the client will be able to handle those challenges. 

447 

448 Implementation details: 

449 Client capability is implemented using "claims" parameter on the wire, 

450 for now. 

451 MSAL will combine them into 

452 `claims parameter <https://openid.net/specs/openid-connect-core-1_0-final.html#ClaimsParameter>`_ 

453 which you will later provide via one of the acquire-token request. 

454 

455 :param str azure_region: (optional) 

456 Instructs MSAL to use the Entra regional token service. This legacy feature is only available to 

457 first-party applications. Only ``acquire_token_for_client()`` is supported. 

458 

459 Supports 4 values: 

460 

461 1. ``azure_region=None`` - This default value means no region is configured. 

462 MSAL will use the region defined in env var ``MSAL_FORCE_REGION``. 

463 2. ``azure_region="some_region"`` - meaning the specified region is used. 

464 3. ``azure_region=True`` - meaning 

465 MSAL will try to auto-detect the region. This is not recommended. 

466 4. ``azure_region=False`` - meaning MSAL will use no region. 

467 

468 .. note:: 

469 Region auto-discovery has been tested on VMs and on Azure Functions. It is unreliable. 

470 Applications using this option should configure a short timeout. 

471 

472 For more details and for the values of the region string 

473 see https://learn.microsoft.com/entra/msal/dotnet/resources/region-discovery-troubleshooting 

474 

475 New in version 1.12.0. 

476 

477 :param list[str] exclude_scopes: (optional) 

478 Historically MSAL hardcodes `offline_access` scope, 

479 which would allow your app to have prolonged access to user's data. 

480 If that is unnecessary or undesirable for your app, 

481 now you can use this parameter to supply an exclusion list of scopes, 

482 such as ``exclude_scopes = ["offline_access"]``. 

483 

484 :param dict http_cache: 

485 MSAL has long been caching tokens in the ``token_cache``. 

486 Recently, MSAL also introduced a concept of ``http_cache``, 

487 by automatically caching some finite amount of non-token http responses, 

488 so that *long-lived* 

489 ``PublicClientApplication`` and ``ConfidentialClientApplication`` 

490 would be more performant and responsive in some situations. 

491 

492 This ``http_cache`` parameter accepts any dict-like object. 

493 If not provided, MSAL will use an in-memory dict. 

494 

495 If your app is a command-line app (CLI), 

496 you would want to persist your http_cache across different CLI runs. 

497 The persisted file's format may change due to, but not limited to, 

498 `unstable protocol <https://docs.python.org/3/library/pickle.html#data-stream-format>`_, 

499 so your implementation shall tolerate unexpected loading errors. 

500 The following recipe shows a way to do so:: 

501 

502 # Just add the following lines at the beginning of your CLI script 

503 import sys, atexit, pickle, logging 

504 http_cache_filename = sys.argv[0] + ".http_cache" 

505 try: 

506 with open(http_cache_filename, "rb") as f: 

507 persisted_http_cache = pickle.load(f) # Take a snapshot 

508 except ( 

509 FileNotFoundError, # Or IOError in Python 2 

510 pickle.UnpicklingError, # A corrupted http cache file 

511 AttributeError, # Cache created by a different version of MSAL 

512 ): 

513 persisted_http_cache = {} # Recover by starting afresh 

514 except: # Unexpected exceptions 

515 logging.exception("You may want to debug this") 

516 persisted_http_cache = {} # Recover by starting afresh 

517 atexit.register(lambda: pickle.dump( 

518 # When exit, flush it back to the file. 

519 # It may occasionally overwrite another process's concurrent write, 

520 # but that is fine. Subsequent runs will reach eventual consistency. 

521 persisted_http_cache, open(http_cache_file, "wb"))) 

522 

523 # And then you can implement your app as you normally would 

524 app = msal.PublicClientApplication( 

525 "your_client_id", 

526 ..., 

527 http_cache=persisted_http_cache, # Utilize persisted_http_cache 

528 ..., 

529 #token_cache=..., # You may combine the old token_cache trick 

530 # Please refer to token_cache recipe at 

531 # https://msal-python.readthedocs.io/en/latest/#msal.SerializableTokenCache 

532 ) 

533 app.acquire_token_interactive(["your", "scope"], ...) 

534 

535 Content inside ``http_cache`` are cheap to obtain. 

536 There is no need to share them among different apps. 

537 

538 Content inside ``http_cache`` will contain no tokens nor 

539 Personally Identifiable Information (PII). Encryption is unnecessary. 

540 

541 New in version 1.16.0. 

542 

543 :param boolean instance_discovery: 

544 Historically, MSAL would connect to a central endpoint located at 

545 ``https://login.microsoftonline.com`` to acquire some metadata, 

546 especially when using an unfamiliar authority. 

547 This behavior is known as Instance Discovery. 

548 

549 This parameter defaults to None, which enables the Instance Discovery. 

550 

551 If you know some authorities which you allow MSAL to operate with as-is, 

552 without involving any Instance Discovery, the recommended pattern is:: 

553 

554 known_authorities = frozenset([ # Treat your known authorities as const 

555 "https://contoso.com/adfs", "https://login.azs/foo"]) 

556 ... 

557 authority = "https://contoso.com/adfs" # Assuming your app will use this 

558 app1 = PublicClientApplication( 

559 "client_id", 

560 authority=authority, 

561 # Conditionally disable Instance Discovery for known authorities 

562 instance_discovery=authority not in known_authorities, 

563 ) 

564 

565 If you do not know some authorities beforehand, 

566 yet still want MSAL to accept any authority that you will provide, 

567 you can use a ``False`` to unconditionally disable Instance Discovery. 

568 

569 New in version 1.19.0. 

570 

571 :param boolean allow_broker: 

572 Deprecated. Please use ``enable_broker_on_windows`` instead. 

573 

574 :param boolean enable_pii_log: 

575 When enabled, logs may include PII (Personal Identifiable Information). 

576 This can be useful in troubleshooting broker behaviors. 

577 The default behavior is False. 

578 

579 New in version 1.24.0. 

580 

581 :param str oidc_authority: 

582 *Added in version 1.28.0*: 

583 It is a URL that identifies an OpenID Connect (OIDC) authority of 

584 the format ``https://contoso.com/tenant``. 

585 MSAL will append ".well-known/openid-configuration" to the authority 

586 and retrieve the OIDC metadata from there, to figure out the endpoints. 

587 

588 Note: Broker will NOT be used for OIDC authority. 

589 """ 

590 self.client_id = client_id 

591 self.client_credential = client_credential 

592 self.client_claims = client_claims 

593 self._client_capabilities = client_capabilities 

594 self._instance_discovery = instance_discovery 

595 

596 if exclude_scopes and not isinstance(exclude_scopes, list): 

597 raise ValueError( 

598 "Invalid exclude_scopes={}. It need to be a list of strings.".format( 

599 repr(exclude_scopes))) 

600 self._exclude_scopes = frozenset(exclude_scopes or []) 

601 if "openid" in self._exclude_scopes: 

602 raise ValueError( 

603 'Invalid exclude_scopes={}. You can not opt out "openid" scope'.format( 

604 repr(exclude_scopes))) 

605 

606 if http_client: 

607 self.http_client = http_client 

608 else: 

609 import requests # Lazy load 

610 

611 self.http_client = requests.Session() 

612 self.http_client.verify = verify 

613 self.http_client.proxies = proxies 

614 # Requests, does not support session - wide timeout 

615 # But you can patch that (https://github.com/psf/requests/issues/3341): 

616 self.http_client.request = functools.partial( 

617 self.http_client.request, timeout=timeout) 

618 

619 # Enable a minimal retry. Better than nothing. 

620 # https://github.com/psf/requests/blob/v2.25.1/requests/adapters.py#L94-L108 

621 a = requests.adapters.HTTPAdapter(max_retries=1) 

622 self.http_client.mount("http://", a) 

623 self.http_client.mount("https://", a) 

624 self.http_client = ThrottledHttpClient( 

625 self.http_client, 

626 http_cache=http_cache, 

627 default_throttle_time=60 

628 # The default value 60 was recommended mainly for PCA at the end of 

629 # https://identitydivision.visualstudio.com/devex/_git/AuthLibrariesApiReview?version=GBdev&path=%2FService%20protection%2FIntial%20set%20of%20protection%20measures.md&_a=preview 

630 if isinstance(self, PublicClientApplication) else 5, 

631 ) 

632 

633 self.app_name = app_name 

634 self.app_version = app_version 

635 

636 # Here the self.authority will not be the same type as authority in input 

637 if oidc_authority and authority: 

638 raise ValueError("You can not provide both authority and oidc_authority") 

639 if isinstance(authority, str) and urlparse(authority).path.startswith( 

640 "/dstsv2"): # dSTS authority's path always starts with "/dstsv2" 

641 oidc_authority = authority # So we treat it as if an oidc_authority 

642 try: 

643 authority_to_use = authority or "https://{}/common/".format(WORLD_WIDE) 

644 self.authority = Authority( 

645 authority_to_use, 

646 self.http_client, 

647 validate_authority=validate_authority, 

648 instance_discovery=self._instance_discovery, 

649 oidc_authority_url=oidc_authority, 

650 ) 

651 except ValueError: # Those are explicit authority validation errors 

652 raise 

653 except Exception: # The rest are typically connection errors 

654 if validate_authority and not oidc_authority and ( 

655 azure_region # Opted in to use region 

656 or (azure_region is None and os.getenv("MSAL_FORCE_REGION")) # Will use region 

657 ): 

658 # Since caller opts in to use region, here we tolerate connection 

659 # errors happened during authority validation at non-region endpoint 

660 self.authority = Authority( 

661 authority_to_use, 

662 self.http_client, 

663 instance_discovery=False, 

664 ) 

665 else: 

666 raise 

667 

668 self._decide_broker(allow_broker, enable_pii_log) 

669 self.token_cache = token_cache or TokenCache() 

670 self._region_configured = azure_region 

671 self._region_detected = None 

672 self.client, self._regional_client = self._build_client( 

673 client_credential, self.authority) 

674 self.authority_groups = None 

675 self._telemetry_buffer = {} 

676 self._telemetry_lock = Lock() 

677 _msal_extension_check() 

678 

679 

680 def _decide_broker(self, allow_broker, enable_pii_log): 

681 is_confidential_app = self.client_credential or isinstance( 

682 self, ConfidentialClientApplication) 

683 if is_confidential_app and allow_broker: 

684 raise ValueError("allow_broker=True is only supported in PublicClientApplication") 

685 # Historically, we chose to support ClientApplication("client_id", allow_broker=True) 

686 if allow_broker: 

687 warnings.warn( 

688 "allow_broker is deprecated. " 

689 "Please use PublicClientApplication(..., " 

690 "enable_broker_on_windows=True, " 

691 # No need to mention non-Windows platforms, because allow_broker is only for Windows 

692 "...)", 

693 DeprecationWarning) 

694 opted_in_for_broker = ( 

695 self._enable_broker # True means Opted-in from PCA 

696 or ( 

697 # When we started the broker project on Windows platform, 

698 # the allow_broker was meant to be cross-platform. Now we realize 

699 # that other platforms have different redirect_uri requirements, 

700 # so the old allow_broker is deprecated and will only for Windows. 

701 allow_broker and sys.platform == "win32") 

702 ) 

703 self._enable_broker = ( # This same variable will also store the state 

704 opted_in_for_broker 

705 and not is_confidential_app 

706 and not self.authority.is_adfs 

707 and not self.authority._is_b2c 

708 ) 

709 if self._enable_broker: 

710 try: 

711 _init_broker(enable_pii_log) 

712 except RuntimeError: 

713 self._enable_broker = False 

714 logger.warning( # It is common on Mac and Linux where broker is not built-in 

715 "Broker is unavailable on this platform. " 

716 "We will fallback to non-broker.") 

717 logger.debug("Broker enabled? %s", self._enable_broker) 

718 

719 def is_pop_supported(self): 

720 """Returns True if this client supports Proof-of-Possession Access Token.""" 

721 return self._enable_broker and sys.platform in ("win32", "darwin") 

722 

723 def _decorate_scope( 

724 self, scopes, 

725 reserved_scope=frozenset(['openid', 'profile', 'offline_access'])): 

726 if not isinstance(scopes, (list, set, tuple)): 

727 raise ValueError("The input scopes should be a list, tuple, or set") 

728 scope_set = set(scopes) # Input scopes is typically a list. Copy it to a set. 

729 if scope_set & reserved_scope: 

730 # These scopes are reserved for the API to provide good experience. 

731 # We could make the developer pass these and then if they do they will 

732 # come back asking why they don't see refresh token or user information. 

733 raise ValueError( 

734 """You cannot use any scope value that is reserved. 

735Your input: {} 

736The reserved list: {}""".format(list(scope_set), list(reserved_scope))) 

737 raise ValueError( 

738 "You cannot use any scope value that is in this reserved list: {}".format( 

739 list(reserved_scope))) 

740 

741 # client_id can also be used as a scope in B2C 

742 decorated = scope_set | reserved_scope 

743 decorated -= self._exclude_scopes 

744 return list(decorated) 

745 

746 def _build_telemetry_context( 

747 self, api_id, correlation_id=None, refresh_reason=None): 

748 return msal.telemetry._TelemetryContext( 

749 self._telemetry_buffer, self._telemetry_lock, api_id, 

750 correlation_id=correlation_id, refresh_reason=refresh_reason) 

751 

752 def _get_regional_authority(self, central_authority) -> Optional[Authority]: 

753 if self._region_configured is False: # User opts out of ESTS-R 

754 return None # Short circuit to completely bypass region detection 

755 if self._region_configured is None: # User did not make an ESTS-R choice 

756 self._region_configured = os.getenv("MSAL_FORCE_REGION") or None 

757 self._region_detected = self._region_detected or _detect_region( 

758 self.http_client if self._region_configured is not None else None) 

759 if (self._region_configured != self.ATTEMPT_REGION_DISCOVERY 

760 and self._region_configured != self._region_detected): 

761 logger.warning('Region configured ({}) != region detected ({})'.format( 

762 repr(self._region_configured), repr(self._region_detected))) 

763 region_to_use = ( 

764 self._region_detected 

765 if self._region_configured == self.ATTEMPT_REGION_DISCOVERY 

766 else self._region_configured) # It will retain the None i.e. opted out 

767 logger.debug('Region to be used: {}'.format(repr(region_to_use))) 

768 if region_to_use: 

769 regional_host = ("{}.login.microsoft.com".format(region_to_use) 

770 if central_authority.instance in ( 

771 # The list came from point 3 of the algorithm section in this internal doc 

772 # https://identitydivision.visualstudio.com/DevEx/_git/AuthLibrariesApiReview?path=/PinAuthToRegion/AAD%20SDK%20Proposal%20to%20Pin%20Auth%20to%20region.md&anchor=algorithm&_a=preview 

773 "login.microsoftonline.com", 

774 "login.microsoft.com", 

775 "login.windows.net", 

776 "sts.windows.net", 

777 ) 

778 else "{}.{}".format(region_to_use, central_authority.instance)) 

779 return Authority( # The central_authority has already been validated 

780 "https://{}/{}".format(regional_host, central_authority.tenant), 

781 self.http_client, 

782 instance_discovery=False, 

783 ) 

784 return None 

785 

786 def _build_client(self, client_credential, authority, skip_regional_client=False): 

787 client_assertion = None 

788 client_assertion_type = None 

789 default_headers = { 

790 "x-client-sku": SKU, "x-client-ver": __version__, 

791 "x-client-os": sys.platform, 

792 "x-ms-lib-capability": "retry-after, h429", 

793 } 

794 if self.app_name: 

795 default_headers['x-app-name'] = self.app_name 

796 if self.app_version: 

797 default_headers['x-app-ver'] = self.app_version 

798 default_body = {"client_info": 1} 

799 if isinstance(client_credential, dict): 

800 client_assertion_type = Client.CLIENT_ASSERTION_TYPE_JWT 

801 # Use client_credential.get("...") rather than "..." in client_credential 

802 # so that we can ignore an empty string came from an empty ENV VAR. 

803 if client_credential.get("client_assertion"): 

804 client_assertion = client_credential['client_assertion'] 

805 else: 

806 headers = {} 

807 sha1_thumbprint = sha256_thumbprint = None 

808 passphrase_bytes = _str2bytes( 

809 client_credential["passphrase"] 

810 ) if client_credential.get("passphrase") else None 

811 if client_credential.get("private_key_pfx_path"): 

812 private_key, sha256_thumbprint, sha1_thumbprint, x5c = _parse_pfx( 

813 client_credential["private_key_pfx_path"], 

814 passphrase_bytes) 

815 if client_credential.get("public_certificate") is True and x5c: 

816 headers["x5c"] = x5c 

817 elif client_credential.get("private_key"): # PEM blob 

818 private_key = ( # handles both encrypted and unencrypted 

819 _load_private_key_from_pem_str( 

820 client_credential['private_key'], passphrase_bytes) 

821 if passphrase_bytes 

822 else client_credential['private_key'] 

823 ) 

824 

825 # Determine thumbprints based on what's provided 

826 if client_credential.get("thumbprint"): 

827 # User provided a thumbprint - use it as SHA-1 (legacy/manual approach) 

828 sha1_thumbprint = client_credential["thumbprint"] 

829 sha256_thumbprint = None 

830 elif isinstance(client_credential.get('public_certificate'), str): 

831 # No thumbprint provided, but we have a certificate to calculate thumbprints 

832 from cryptography import x509 

833 cert = x509.load_pem_x509_certificate( 

834 _str2bytes(client_credential['public_certificate'])) 

835 sha256_thumbprint, sha1_thumbprint, headers["x5c"] = ( 

836 _extract_cert_and_thumbprints(cert)) 

837 else: 

838 raise ValueError( 

839 "You must provide either 'thumbprint' or 'public_certificate' " 

840 "from which the thumbprint can be calculated.") 

841 else: 

842 raise ValueError( 

843 "client_credential needs to follow this format " 

844 "https://msal-python.readthedocs.io/en/latest/#msal.ClientApplication.params.client_credential") 

845 if ("x5c" not in headers # So the .pfx file contains no certificate 

846 and isinstance(client_credential.get('public_certificate'), str) 

847 ): # Then we treat the public_certificate value as PEM content 

848 headers["x5c"] = extract_certs(client_credential['public_certificate']) 

849 if sha256_thumbprint and not authority.is_adfs: 

850 assertion_params = { 

851 "algorithm": "PS256", "sha256_thumbprint": sha256_thumbprint, 

852 } 

853 else: # Fall back 

854 if not sha1_thumbprint: 

855 raise ValueError("You shall provide a thumbprint in SHA1.") 

856 assertion_params = { 

857 "algorithm": "RS256", "sha1_thumbprint": sha1_thumbprint, 

858 } 

859 assertion = JwtAssertionCreator( 

860 private_key, headers=headers, **assertion_params) 

861 client_assertion = assertion.create_regenerative_assertion( 

862 audience=authority.token_endpoint, issuer=self.client_id, 

863 additional_claims=self.client_claims or {}) 

864 else: 

865 default_body['client_secret'] = client_credential 

866 central_configuration = { 

867 "authorization_endpoint": authority.authorization_endpoint, 

868 "token_endpoint": authority.token_endpoint, 

869 "device_authorization_endpoint": authority.device_authorization_endpoint, 

870 } 

871 central_client = _ClientWithCcsRoutingInfo( 

872 central_configuration, 

873 self.client_id, 

874 http_client=self.http_client, 

875 default_headers=default_headers, 

876 default_body=default_body, 

877 client_assertion=client_assertion, 

878 client_assertion_type=client_assertion_type, 

879 on_obtaining_tokens=lambda event: self.token_cache.add(dict( 

880 event, environment=authority.instance)), 

881 on_removing_rt=self.token_cache.remove_rt, 

882 on_updating_rt=self.token_cache.update_rt) 

883 

884 regional_client = None 

885 if (client_credential # Currently regional endpoint only serves some CCA flows 

886 and not skip_regional_client): 

887 regional_authority = self._get_regional_authority(authority) 

888 if regional_authority: 

889 regional_configuration = { 

890 "authorization_endpoint": regional_authority.authorization_endpoint, 

891 "token_endpoint": regional_authority.token_endpoint, 

892 "device_authorization_endpoint": 

893 regional_authority.device_authorization_endpoint, 

894 } 

895 regional_client = _ClientWithCcsRoutingInfo( 

896 regional_configuration, 

897 self.client_id, 

898 http_client=self.http_client, 

899 default_headers=default_headers, 

900 default_body=default_body, 

901 client_assertion=client_assertion, 

902 client_assertion_type=client_assertion_type, 

903 on_obtaining_tokens=lambda event: self.token_cache.add(dict( 

904 event, environment=authority.instance)), 

905 on_removing_rt=self.token_cache.remove_rt, 

906 on_updating_rt=self.token_cache.update_rt) 

907 return central_client, regional_client 

908 

909 def initiate_auth_code_flow( 

910 self, 

911 scopes, # type: list[str] 

912 redirect_uri=None, 

913 state=None, # Recommended by OAuth2 for CSRF protection 

914 prompt=None, 

915 login_hint=None, # type: Optional[str] 

916 domain_hint=None, # type: Optional[str] 

917 claims_challenge=None, 

918 max_age=None, 

919 response_mode=None, # type: Optional[str] 

920 ): 

921 """Initiate an auth code flow. 

922 

923 Later when the response reaches your redirect_uri, 

924 you can use :func:`~acquire_token_by_auth_code_flow()` 

925 to complete the authentication/authorization. 

926 

927 :param list scopes: 

928 It is a list of case-sensitive strings. 

929 :param str redirect_uri: 

930 Optional. If not specified, server will use the pre-registered one. 

931 :param str state: 

932 An opaque value used by the client to 

933 maintain state between the request and callback. 

934 If absent, this library will automatically generate one internally. 

935 :param str prompt: 

936 By default, no prompt value will be sent, not even string ``"none"``. 

937 You will have to specify a value explicitly. 

938 Its valid values are the constants defined in 

939 :class:`Prompt <msal.Prompt>`. 

940 

941 :param str login_hint: 

942 Optional. Identifier of the user. Generally a User Principal Name (UPN). 

943 :param domain_hint: 

944 Can be one of "consumers" or "organizations" or your tenant domain "contoso.com". 

945 If included, it will skip the email-based discovery process that user goes 

946 through on the sign-in page, leading to a slightly more streamlined user experience. 

947 More information on possible values available in 

948 `Auth Code Flow doc <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-auth-code-flow#request-an-authorization-code>`_ and 

949 `domain_hint doc <https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-oapx/86fb452d-e34a-494e-ac61-e526e263b6d8>`_. 

950 

951 :param int max_age: 

952 OPTIONAL. Maximum Authentication Age. 

953 Specifies the allowable elapsed time in seconds 

954 since the last time the End-User was actively authenticated. 

955 If the elapsed time is greater than this value, 

956 Microsoft identity platform will actively re-authenticate the End-User. 

957 

958 MSAL Python will also automatically validate the auth_time in ID token. 

959 

960 New in version 1.15. 

961 

962 :param str response_mode: 

963 OPTIONAL. Specifies the method with which response parameters should be returned. 

964 The default value is equivalent to ``query``, which is still secure enough in MSAL Python 

965 (because MSAL Python does not transfer tokens via query parameter in the first place). 

966 For even better security, we recommend using the value ``form_post``. 

967 In "form_post" mode, response parameters 

968 will be encoded as HTML form values that are transmitted via the HTTP POST method and 

969 encoded in the body using the application/x-www-form-urlencoded format. 

970 Valid values can be either "form_post" for HTTP POST to callback URI or 

971 "query" (the default) for HTTP GET with parameters encoded in query string. 

972 More information on possible values 

973 `here <https://openid.net/specs/oauth-v2-multiple-response-types-1_0.html#ResponseModes>` 

974 and `here <https://openid.net/specs/oauth-v2-form-post-response-mode-1_0.html#FormPostResponseMode>` 

975 

976 :return: 

977 The auth code flow. It is a dict in this form:: 

978 

979 { 

980 "auth_uri": "https://...", // Guide user to visit this 

981 "state": "...", // You may choose to verify it by yourself, 

982 // or just let acquire_token_by_auth_code_flow() 

983 // do that for you. 

984 "...": "...", // Everything else are reserved and internal 

985 } 

986 

987 The caller is expected to: 

988 

989 1. somehow store this content, typically inside the current session, 

990 2. guide the end user (i.e. resource owner) to visit that auth_uri, 

991 3. and then relay this dict and subsequent auth response to 

992 :func:`~acquire_token_by_auth_code_flow()`. 

993 """ 

994 client = _ClientWithCcsRoutingInfo( 

995 {"authorization_endpoint": self.authority.authorization_endpoint}, 

996 self.client_id, 

997 http_client=self.http_client) 

998 flow = client.initiate_auth_code_flow( 

999 redirect_uri=redirect_uri, state=state, login_hint=login_hint, 

1000 prompt=prompt, 

1001 scope=self._decorate_scope(scopes), 

1002 domain_hint=domain_hint, 

1003 claims=_merge_claims_challenge_and_capabilities( 

1004 self._client_capabilities, claims_challenge), 

1005 max_age=max_age, 

1006 response_mode=response_mode, 

1007 ) 

1008 flow["claims_challenge"] = claims_challenge 

1009 return flow 

1010 

1011 def get_authorization_request_url( 

1012 self, 

1013 scopes, # type: list[str] 

1014 login_hint=None, # type: Optional[str] 

1015 state=None, # Recommended by OAuth2 for CSRF protection 

1016 redirect_uri=None, 

1017 response_type="code", # Could be "token" if you use Implicit Grant 

1018 prompt=None, 

1019 nonce=None, 

1020 domain_hint=None, # type: Optional[str] 

1021 claims_challenge=None, 

1022 **kwargs): 

1023 """Constructs a URL for you to start a Authorization Code Grant. 

1024 

1025 :param list[str] scopes: (Required) 

1026 Scopes requested to access a protected API (a resource). 

1027 :param str state: Recommended by OAuth2 for CSRF protection. 

1028 :param str login_hint: 

1029 Identifier of the user. Generally a User Principal Name (UPN). 

1030 :param str redirect_uri: 

1031 Address to return to upon receiving a response from the authority. 

1032 :param str response_type: 

1033 Default value is "code" for an OAuth2 Authorization Code grant. 

1034 

1035 You could use other content such as "id_token" or "token", 

1036 which would trigger an Implicit Grant, but that is 

1037 `not recommended <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-implicit-grant-flow#is-the-implicit-grant-suitable-for-my-app>`_. 

1038 

1039 :param str prompt: 

1040 By default, no prompt value will be sent, not even string ``"none"``. 

1041 You will have to specify a value explicitly. 

1042 Its valid values are the constants defined in 

1043 :class:`Prompt <msal.Prompt>`. 

1044 :param nonce: 

1045 A cryptographically random value used to mitigate replay attacks. See also 

1046 `OIDC specs <https://openid.net/specs/openid-connect-core-1_0.html#AuthRequest>`_. 

1047 :param domain_hint: 

1048 Can be one of "consumers" or "organizations" or your tenant domain "contoso.com". 

1049 If included, it will skip the email-based discovery process that user goes 

1050 through on the sign-in page, leading to a slightly more streamlined user experience. 

1051 More information on possible values available in 

1052 `Auth Code Flow doc <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-auth-code-flow#request-an-authorization-code>`_ and 

1053 `domain_hint doc <https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-oapx/86fb452d-e34a-494e-ac61-e526e263b6d8>`_. 

1054 :param claims_challenge: 

1055 The claims_challenge parameter requests specific claims requested by the resource provider 

1056 in the form of a claims_challenge directive in the www-authenticate header to be 

1057 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

1058 It is a string of a JSON object which contains lists of claims being requested from these locations. 

1059 

1060 :return: The authorization url as a string. 

1061 """ 

1062 authority = kwargs.pop("authority", None) # Historically we support this 

1063 if authority: 

1064 warnings.warn( 

1065 "We haven't decided if this method will accept authority parameter") 

1066 # The previous implementation is, it will use self.authority by default. 

1067 # Multi-tenant app can use new authority on demand 

1068 the_authority = Authority( 

1069 authority, 

1070 self.http_client, 

1071 instance_discovery=self._instance_discovery, 

1072 ) if authority else self.authority 

1073 

1074 client = _ClientWithCcsRoutingInfo( 

1075 {"authorization_endpoint": the_authority.authorization_endpoint}, 

1076 self.client_id, 

1077 http_client=self.http_client) 

1078 warnings.warn( 

1079 "Change your get_authorization_request_url() " 

1080 "to initiate_auth_code_flow()", DeprecationWarning) 

1081 with warnings.catch_warnings(record=True): 

1082 return client.build_auth_request_uri( 

1083 response_type=response_type, 

1084 redirect_uri=redirect_uri, state=state, login_hint=login_hint, 

1085 prompt=prompt, 

1086 scope=self._decorate_scope(scopes), 

1087 nonce=nonce, 

1088 domain_hint=domain_hint, 

1089 claims=_merge_claims_challenge_and_capabilities( 

1090 self._client_capabilities, claims_challenge), 

1091 ) 

1092 

1093 def acquire_token_by_auth_code_flow( 

1094 self, auth_code_flow, auth_response, scopes=None, **kwargs): 

1095 """Validate the auth response being redirected back, and obtain tokens. 

1096 

1097 It automatically provides nonce protection. 

1098 

1099 :param dict auth_code_flow: 

1100 The same dict returned by :func:`~initiate_auth_code_flow()`. 

1101 :param dict auth_response: 

1102 A dict of the query string received from auth server. 

1103 :param list[str] scopes: 

1104 Scopes requested to access a protected API (a resource). 

1105 

1106 Most of the time, you can leave it empty. 

1107 

1108 If you requested user consent for multiple resources, here you will 

1109 need to provide a subset of what you required in 

1110 :func:`~initiate_auth_code_flow()`. 

1111 

1112 OAuth2 was designed mostly for singleton services, 

1113 where tokens are always meant for the same resource and the only 

1114 changes are in the scopes. 

1115 In Microsoft Entra, tokens can be issued for multiple 3rd party resources. 

1116 You can ask authorization code for multiple resources, 

1117 but when you redeem it, the token is for only one intended 

1118 recipient, called audience. 

1119 So the developer need to specify a scope so that we can restrict the 

1120 token to be issued for the corresponding audience. 

1121 

1122 :return: 

1123 * A dict containing "access_token" and/or "id_token", among others, 

1124 depends on what scope was used. 

1125 (See https://tools.ietf.org/html/rfc6749#section-5.1) 

1126 * A dict containing "error", optionally "error_description", "error_uri". 

1127 (It is either `this <https://tools.ietf.org/html/rfc6749#section-4.1.2.1>`_ 

1128 or `that <https://tools.ietf.org/html/rfc6749#section-5.2>`_) 

1129 * Most client-side data error would result in ValueError exception. 

1130 So the usage pattern could be without any protocol details:: 

1131 

1132 def authorize(): # A controller in a web app 

1133 try: 

1134 result = msal_app.acquire_token_by_auth_code_flow( 

1135 session.get("flow", {}), request.args) 

1136 if "error" in result: 

1137 return render_template("error.html", result) 

1138 use(result) # Token(s) are available in result and cache 

1139 except ValueError: # Usually caused by CSRF 

1140 pass # Simply ignore them 

1141 return redirect(url_for("index")) 

1142 """ 

1143 self._validate_ssh_cert_input_data(kwargs.get("data", {})) 

1144 telemetry_context = self._build_telemetry_context( 

1145 self.ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID) 

1146 response = _clean_up(self.client.obtain_token_by_auth_code_flow( 

1147 auth_code_flow, 

1148 auth_response, 

1149 scope=self._decorate_scope(scopes) if scopes else None, 

1150 headers=telemetry_context.generate_headers(), 

1151 data=dict( 

1152 kwargs.pop("data", {}), 

1153 claims=_merge_claims_challenge_and_capabilities( 

1154 self._client_capabilities, 

1155 auth_code_flow.pop("claims_challenge", None))), 

1156 **kwargs)) 

1157 if "access_token" in response: 

1158 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

1159 telemetry_context.update_telemetry(response) 

1160 return response 

1161 

1162 def acquire_token_by_authorization_code( 

1163 self, 

1164 code, 

1165 scopes, # Syntactically required. STS accepts empty value though. 

1166 redirect_uri=None, 

1167 # REQUIRED, if the "redirect_uri" parameter was included in the 

1168 # authorization request as described in Section 4.1.1, and their 

1169 # values MUST be identical. 

1170 nonce=None, 

1171 claims_challenge=None, 

1172 **kwargs): 

1173 """The second half of the Authorization Code Grant. 

1174 

1175 :param code: The authorization code returned from Authorization Server. 

1176 :param list[str] scopes: (Required) 

1177 Scopes requested to access a protected API (a resource). 

1178 

1179 If you requested user consent for multiple resources, here you will 

1180 typically want to provide a subset of what you required in AuthCode. 

1181 

1182 OAuth2 was designed mostly for singleton services, 

1183 where tokens are always meant for the same resource and the only 

1184 changes are in the scopes. 

1185 In Microsoft Entra, tokens can be issued for multiple 3rd party resources. 

1186 You can ask authorization code for multiple resources, 

1187 but when you redeem it, the token is for only one intended 

1188 recipient, called audience. 

1189 So the developer need to specify a scope so that we can restrict the 

1190 token to be issued for the corresponding audience. 

1191 

1192 :param nonce: 

1193 If you provided a nonce when calling :func:`get_authorization_request_url`, 

1194 same nonce should also be provided here, so that we'll validate it. 

1195 An exception will be raised if the nonce in id token mismatches. 

1196 

1197 :param claims_challenge: 

1198 The claims_challenge parameter requests specific claims requested by the resource provider 

1199 in the form of a claims_challenge directive in the www-authenticate header to be 

1200 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

1201 It is a string of a JSON object which contains lists of claims being requested from these locations. 

1202 

1203 :return: A dict representing the json response from Microsoft Entra: 

1204 

1205 - A successful response would contain "access_token" key, 

1206 - an error response would contain "error" and usually "error_description". 

1207 """ 

1208 # If scope is absent on the wire, STS will give you a token associated 

1209 # to the FIRST scope sent during the authorization request. 

1210 # So in theory, you can omit scope here when you were working with only 

1211 # one scope. But, MSAL decorates your scope anyway, so they are never 

1212 # really empty. 

1213 assert isinstance(scopes, list), "Invalid parameter type" 

1214 self._validate_ssh_cert_input_data(kwargs.get("data", {})) 

1215 warnings.warn( 

1216 "Change your acquire_token_by_authorization_code() " 

1217 "to acquire_token_by_auth_code_flow()", DeprecationWarning) 

1218 with warnings.catch_warnings(record=True): 

1219 telemetry_context = self._build_telemetry_context( 

1220 self.ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID) 

1221 response = _clean_up(self.client.obtain_token_by_authorization_code( 

1222 code, redirect_uri=redirect_uri, 

1223 scope=self._decorate_scope(scopes), 

1224 headers=telemetry_context.generate_headers(), 

1225 data=dict( 

1226 kwargs.pop("data", {}), 

1227 claims=_merge_claims_challenge_and_capabilities( 

1228 self._client_capabilities, claims_challenge)), 

1229 nonce=nonce, 

1230 **kwargs)) 

1231 if "access_token" in response: 

1232 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

1233 telemetry_context.update_telemetry(response) 

1234 return response 

1235 

1236 def get_accounts(self, username=None): 

1237 """Get a list of accounts which previously signed in, i.e. exists in cache. 

1238 

1239 An account can later be used in :func:`~acquire_token_silent` 

1240 to find its tokens. 

1241 

1242 :param username: 

1243 Filter accounts with this username only. Case insensitive. 

1244 :return: A list of account objects. 

1245 Each account is a dict. For now, we only document its "username" field. 

1246 Your app can choose to display those information to end user, 

1247 and allow user to choose one of his/her accounts to proceed. 

1248 """ 

1249 accounts = self._find_msal_accounts(environment=self.authority.instance) 

1250 if not accounts: # Now try other aliases of this authority instance 

1251 for alias in self._get_authority_aliases(self.authority.instance): 

1252 accounts = self._find_msal_accounts(environment=alias) 

1253 if accounts: 

1254 break 

1255 if username: 

1256 # Federated account["username"] from AAD could contain mixed case 

1257 lowercase_username = username.lower() 

1258 accounts = [a for a in accounts 

1259 if a["username"].lower() == lowercase_username] 

1260 if not accounts: 

1261 logger.debug(( # This would also happen when the cache is empty 

1262 "get_accounts(username='{}') finds no account. " 

1263 "If tokens were acquired without 'profile' scope, " 

1264 "they would contain no username for filtering. " 

1265 "Consider calling get_accounts(username=None) instead." 

1266 ).format(username)) 

1267 # Does not further filter by existing RTs here. It probably won't matter. 

1268 # Because in most cases Accounts and RTs co-exist. 

1269 # Even in the rare case when an RT is revoked and then removed, 

1270 # acquire_token_silent() would then yield no result, 

1271 # apps would fall back to other acquire methods. This is the standard pattern. 

1272 return accounts 

1273 

1274 def _find_msal_accounts(self, environment): 

1275 interested_authority_types = [ 

1276 TokenCache.AuthorityType.ADFS, TokenCache.AuthorityType.MSSTS] 

1277 if _is_running_in_cloud_shell(): 

1278 interested_authority_types.append(_AUTHORITY_TYPE_CLOUDSHELL) 

1279 grouped_accounts = { 

1280 a.get("home_account_id"): # Grouped by home tenant's id 

1281 { # These are minimal amount of non-tenant-specific account info 

1282 "home_account_id": a.get("home_account_id"), 

1283 "environment": a.get("environment"), 

1284 "username": a.get("username"), 

1285 "account_source": a.get("account_source"), 

1286 

1287 # The following fields for backward compatibility, for now 

1288 "authority_type": a.get("authority_type"), 

1289 "local_account_id": a.get("local_account_id"), # Tenant-specific 

1290 "realm": a.get("realm"), # Tenant-specific 

1291 } 

1292 for a in self.token_cache.search( 

1293 TokenCache.CredentialType.ACCOUNT, 

1294 query={"environment": environment}) 

1295 if a["authority_type"] in interested_authority_types 

1296 } 

1297 return list(grouped_accounts.values()) 

1298 

1299 def _get_instance_metadata(self): # This exists so it can be mocked in unit test 

1300 resp = self.http_client.get( 

1301 "https://login.microsoftonline.com/common/discovery/instance?api-version=1.1&authorization_endpoint=https://login.microsoftonline.com/common/oauth2/authorize", # TBD: We may extend this to use self._instance_discovery endpoint 

1302 headers={'Accept': 'application/json'}) 

1303 resp.raise_for_status() 

1304 return json.loads(resp.text)['metadata'] 

1305 

1306 def _get_authority_aliases(self, instance): 

1307 if self._instance_discovery is False: 

1308 return [] 

1309 if self.authority._is_known_to_developer: 

1310 # Then it is an ADFS/B2C/known_authority_hosts situation 

1311 # which may not reach the central endpoint, so we skip it. 

1312 return [] 

1313 if not self.authority_groups: 

1314 self.authority_groups = [ 

1315 set(group['aliases']) for group in self._get_instance_metadata()] 

1316 for group in self.authority_groups: 

1317 if instance in group: 

1318 return [alias for alias in group if alias != instance] 

1319 return [] 

1320 

1321 def remove_account(self, account): 

1322 """Sign me out and forget me from token cache""" 

1323 if self._enable_broker: 

1324 from .broker import _signout_silently 

1325 error = _signout_silently(self.client_id, account["local_account_id"]) 

1326 if error: 

1327 logger.debug("_signout_silently() returns error: %s", error) 

1328 # Broker sign-out has been attempted, even if the _forget_me() below throws. 

1329 self._forget_me(account) 

1330 

1331 def _sign_out(self, home_account): 

1332 # Remove all relevant RTs and ATs from token cache 

1333 owned_by_home_account = { 

1334 "environment": home_account["environment"], 

1335 "home_account_id": home_account["home_account_id"],} # realm-independent 

1336 app_metadata = self._get_app_metadata(home_account["environment"]) 

1337 # Remove RTs/FRTs, and they are realm-independent 

1338 for rt in [ # Remove RTs from a static list (rather than from a dynamic generator), 

1339 # to avoid changing self.token_cache while it is being iterated 

1340 rt for rt in self.token_cache.search( 

1341 TokenCache.CredentialType.REFRESH_TOKEN, query=owned_by_home_account) 

1342 # Do RT's app ownership check as a precaution, in case family apps 

1343 # and 3rd-party apps share same token cache, although they should not. 

1344 if rt["client_id"] == self.client_id or ( 

1345 app_metadata.get("family_id") # Now let's settle family business 

1346 and rt.get("family_id") == app_metadata["family_id"]) 

1347 ]: 

1348 self.token_cache.remove_rt(rt) 

1349 for at in list(self.token_cache.search( # Remove ATs from a static list, 

1350 # to avoid changing self.token_cache while it is being iterated 

1351 TokenCache.CredentialType.ACCESS_TOKEN, query=owned_by_home_account, 

1352 # Regardless of realm, b/c we've removed realm-independent RTs anyway 

1353 )): 

1354 # To avoid the complexity of locating sibling family app's AT, 

1355 # we skip AT's app ownership check. 

1356 # It means ATs for other apps will also be removed, it is OK because: 

1357 # * non-family apps are not supposed to share token cache to begin with; 

1358 # * Even if it happens, we keep other app's RT already, so SSO still works 

1359 self.token_cache.remove_at(at) 

1360 

1361 def _forget_me(self, home_account): 

1362 # It implies signout, and then also remove all relevant accounts and IDTs 

1363 self._sign_out(home_account) 

1364 owned_by_home_account = { 

1365 "environment": home_account["environment"], 

1366 "home_account_id": home_account["home_account_id"],} # realm-independent 

1367 for idt in list(self.token_cache.search( # Remove IDTs from a static list, 

1368 # to avoid changing self.token_cache while it is being iterated 

1369 TokenCache.CredentialType.ID_TOKEN, query=owned_by_home_account, # regardless of realm 

1370 )): 

1371 self.token_cache.remove_idt(idt) 

1372 for a in list(self.token_cache.search( # Remove Accounts from a static list, 

1373 # to avoid changing self.token_cache while it is being iterated 

1374 TokenCache.CredentialType.ACCOUNT, query=owned_by_home_account, # regardless of realm 

1375 )): 

1376 self.token_cache.remove_account(a) 

1377 

1378 def _acquire_token_by_cloud_shell(self, scopes, data=None): 

1379 from .cloudshell import _obtain_token 

1380 response = _obtain_token( 

1381 self.http_client, scopes, client_id=self.client_id, data=data) 

1382 if "error" not in response: 

1383 self.token_cache.add(dict( 

1384 client_id=self.client_id, 

1385 scope=response["scope"].split() if "scope" in response else scopes, 

1386 token_endpoint=self.authority.token_endpoint, 

1387 response=response, 

1388 data=data or {}, 

1389 authority_type=_AUTHORITY_TYPE_CLOUDSHELL, 

1390 )) 

1391 if "access_token" in response: 

1392 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_BROKER 

1393 return response 

1394 

1395 def acquire_token_silent( 

1396 self, 

1397 scopes, # type: List[str] 

1398 account, # type: Optional[Account] 

1399 authority=None, # See get_authorization_request_url() 

1400 force_refresh=False, # type: Optional[boolean] 

1401 claims_challenge=None, 

1402 auth_scheme=None, 

1403 **kwargs): 

1404 """Acquire an access token for given account, without user interaction. 

1405 

1406 It has same parameters as the :func:`~acquire_token_silent_with_error`. 

1407 The difference is the behavior of the return value. 

1408 This method will combine the cache empty and refresh error 

1409 into one return value, `None`. 

1410 If your app does not care about the exact token refresh error during 

1411 token cache look-up, then this method is easier and recommended. 

1412 

1413 :return: 

1414 - A dict containing no "error" key, 

1415 and typically contains an "access_token" key, 

1416 if cache lookup succeeded. 

1417 - None when cache lookup does not yield a token. 

1418 """ 

1419 if not account: 

1420 return None # A backward-compatible NO-OP to drop the account=None usage 

1421 result = _clean_up(self._acquire_token_silent_with_error( 

1422 scopes, account, authority=authority, force_refresh=force_refresh, 

1423 claims_challenge=claims_challenge, auth_scheme=auth_scheme, **kwargs)) 

1424 return result if result and "error" not in result else None 

1425 

1426 def acquire_token_silent_with_error( 

1427 self, 

1428 scopes, # type: List[str] 

1429 account, # type: Optional[Account] 

1430 authority=None, # See get_authorization_request_url() 

1431 force_refresh=False, # type: Optional[boolean] 

1432 claims_challenge=None, 

1433 auth_scheme=None, 

1434 **kwargs): 

1435 """Acquire an access token for given account, without user interaction. 

1436 

1437 It is done either by finding a valid access token from cache, 

1438 or by finding a valid refresh token from cache and then automatically 

1439 use it to redeem a new access token. 

1440 

1441 This method will differentiate cache empty from token refresh error. 

1442 If your app cares the exact token refresh error during 

1443 token cache look-up, then this method is suitable. 

1444 Otherwise, the other method :func:`~acquire_token_silent` is recommended. 

1445 

1446 :param list[str] scopes: (Required) 

1447 Scopes requested to access a protected API (a resource). 

1448 :param account: (Required) 

1449 One of the account object returned by :func:`~get_accounts`. 

1450 Starting from MSAL Python 1.23, 

1451 a ``None`` input will become a NO-OP and always return ``None``. 

1452 :param force_refresh: 

1453 If True, it will skip Access Token look-up, 

1454 and try to find a Refresh Token to obtain a new Access Token. 

1455 :param claims_challenge: 

1456 The claims_challenge parameter requests specific claims requested by the resource provider 

1457 in the form of a claims_challenge directive in the www-authenticate header to be 

1458 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

1459 It is a string of a JSON object which contains lists of claims being requested from these locations. 

1460 :param object auth_scheme: 

1461 You can provide an ``msal.auth_scheme.PopAuthScheme`` object 

1462 so that MSAL will get a Proof-of-Possession (POP) token for you. 

1463 

1464 New in version 1.26.0. 

1465 

1466 :return: 

1467 - A dict containing no "error" key, 

1468 and typically contains an "access_token" key, 

1469 if cache lookup succeeded. 

1470 - None when there is simply no token in the cache. 

1471 - A dict containing an "error" key, when token refresh failed. 

1472 """ 

1473 if not account: 

1474 return None # A backward-compatible NO-OP to drop the account=None usage 

1475 return _clean_up(self._acquire_token_silent_with_error( 

1476 scopes, account, authority=authority, force_refresh=force_refresh, 

1477 claims_challenge=claims_challenge, auth_scheme=auth_scheme, **kwargs)) 

1478 

1479 def _acquire_token_silent_with_error( 

1480 self, 

1481 scopes, # type: List[str] 

1482 account, # type: Optional[Account] 

1483 authority=None, # See get_authorization_request_url() 

1484 force_refresh=False, # type: Optional[boolean] 

1485 claims_challenge=None, 

1486 auth_scheme=None, 

1487 **kwargs): 

1488 assert isinstance(scopes, list), "Invalid parameter type" 

1489 self._validate_ssh_cert_input_data(kwargs.get("data", {})) 

1490 correlation_id = msal.telemetry._get_new_correlation_id() 

1491 if authority: 

1492 warnings.warn("We haven't decided how/if this method will accept authority parameter") 

1493 # the_authority = Authority( 

1494 # authority, 

1495 # self.http_client, 

1496 # instance_discovery=self._instance_discovery, 

1497 # ) if authority else self.authority 

1498 result = self._acquire_token_silent_from_cache_and_possibly_refresh_it( 

1499 scopes, account, self.authority, force_refresh=force_refresh, 

1500 claims_challenge=claims_challenge, 

1501 correlation_id=correlation_id, 

1502 auth_scheme=auth_scheme, 

1503 **kwargs) 

1504 if result and "error" not in result: 

1505 return result 

1506 final_result = result 

1507 for alias in self._get_authority_aliases(self.authority.instance): 

1508 if not list(self.token_cache.search( # Need a list to test emptiness 

1509 self.token_cache.CredentialType.REFRESH_TOKEN, 

1510 # target=scopes, # MUST NOT filter by scopes, because: 

1511 # 1. AAD RTs are scope-independent; 

1512 # 2. therefore target is optional per schema; 

1513 query={"environment": alias})): 

1514 # Skip heavy weight logic when RT for this alias doesn't exist 

1515 continue 

1516 the_authority = Authority( 

1517 "https://" + alias + "/" + self.authority.tenant, 

1518 self.http_client, 

1519 instance_discovery=False, 

1520 ) 

1521 result = self._acquire_token_silent_from_cache_and_possibly_refresh_it( 

1522 scopes, account, the_authority, force_refresh=force_refresh, 

1523 claims_challenge=claims_challenge, 

1524 correlation_id=correlation_id, 

1525 auth_scheme=auth_scheme, 

1526 **kwargs) 

1527 if result: 

1528 if "error" not in result: 

1529 return result 

1530 final_result = result 

1531 if final_result and final_result.get("suberror"): 

1532 final_result["classification"] = { # Suppress these suberrors, per #57 

1533 "bad_token": "", 

1534 "token_expired": "", 

1535 "protection_policy_required": "", 

1536 "client_mismatch": "", 

1537 "device_authentication_failed": "", 

1538 }.get(final_result["suberror"], final_result["suberror"]) 

1539 return final_result 

1540 

1541 def _acquire_token_silent_from_cache_and_possibly_refresh_it( 

1542 self, 

1543 scopes, # type: List[str] 

1544 account, # type: Optional[Account] 

1545 authority, # This can be different than self.authority 

1546 force_refresh=False, # type: Optional[boolean] 

1547 claims_challenge=None, 

1548 correlation_id=None, 

1549 http_exceptions=None, 

1550 auth_scheme=None, 

1551 **kwargs): 

1552 # This internal method has two calling patterns: 

1553 # it accepts a non-empty account to find token for a user, 

1554 # and accepts account=None to find a token for the current app. 

1555 access_token_from_cache = None 

1556 if not (force_refresh or claims_challenge or auth_scheme): # Then attempt AT cache 

1557 query={ 

1558 "client_id": self.client_id, 

1559 "environment": authority.instance, 

1560 "realm": authority.tenant, 

1561 "home_account_id": (account or {}).get("home_account_id"), 

1562 } 

1563 key_id = kwargs.get("data", {}).get("key_id") 

1564 if key_id: # Some token types (SSH-certs, POP) are bound to a key 

1565 query["key_id"] = key_id 

1566 now = time.time() 

1567 refresh_reason = msal.telemetry.AT_ABSENT 

1568 for entry in self.token_cache.search( # A generator allows us to 

1569 # break early in cache-hit without finding a full list 

1570 self.token_cache.CredentialType.ACCESS_TOKEN, 

1571 target=scopes, 

1572 query=query, 

1573 ): # This loop is about token search, not about token deletion. 

1574 # Note that search() holds a lock during this loop; 

1575 # that is fine because this loop is fast 

1576 expires_in = int(entry["expires_on"]) - now 

1577 if expires_in < 5*60: # Then consider it expired 

1578 refresh_reason = msal.telemetry.AT_EXPIRED 

1579 continue # Removal is not necessary, it will be overwritten 

1580 logger.debug("Cache hit an AT") 

1581 access_token_from_cache = { # Mimic a real response 

1582 "access_token": entry["secret"], 

1583 "token_type": entry.get("token_type", "Bearer"), 

1584 "expires_in": int(expires_in), # OAuth2 specs defines it as int 

1585 self._TOKEN_SOURCE: self._TOKEN_SOURCE_CACHE, 

1586 } 

1587 if "refresh_on" in entry: 

1588 access_token_from_cache["refresh_on"] = int(entry["refresh_on"]) 

1589 if int(entry["refresh_on"]) < now: # aging 

1590 refresh_reason = msal.telemetry.AT_AGING 

1591 break # With a fallback in hand, we break here to go refresh 

1592 self._build_telemetry_context(-1).hit_an_access_token() 

1593 return access_token_from_cache # It is still good as new 

1594 else: 

1595 refresh_reason = msal.telemetry.FORCE_REFRESH # TODO: It could also mean claims_challenge 

1596 assert refresh_reason, "It should have been established at this point" 

1597 if not http_exceptions: # It can be a tuple of exceptions 

1598 # The exact HTTP exceptions are transportation-layer dependent 

1599 from requests.exceptions import RequestException # Lazy load 

1600 http_exceptions = (RequestException,) 

1601 try: 

1602 data = kwargs.get("data", {}) 

1603 if account and account.get("authority_type") == _AUTHORITY_TYPE_CLOUDSHELL: 

1604 if auth_scheme: 

1605 raise ValueError("auth_scheme is not supported in Cloud Shell") 

1606 return self._acquire_token_by_cloud_shell(scopes, data=data) 

1607 

1608 is_ssh_cert_or_pop_request = _is_ssh_cert_or_pop_request(data.get("token_type"), auth_scheme) 

1609 

1610 if self._enable_broker and account and account.get("account_source") in ( 

1611 _GRANT_TYPE_BROKER, # Broker successfully established this account previously. 

1612 None, # Unknown data from older MSAL. Broker might still work. 

1613 ) and (sys.platform in ("win32", "darwin") or not is_ssh_cert_or_pop_request): 

1614 from .broker import _acquire_token_silently 

1615 response = _acquire_token_silently( 

1616 "https://{}/{}".format(self.authority.instance, self.authority.tenant), 

1617 self.client_id, 

1618 account["local_account_id"], 

1619 scopes, 

1620 claims=_merge_claims_challenge_and_capabilities( 

1621 self._client_capabilities, claims_challenge), 

1622 correlation_id=correlation_id, 

1623 auth_scheme=auth_scheme, 

1624 **data) 

1625 if response: # Broker provides a decisive outcome 

1626 account_was_established_by_broker = account.get( 

1627 "account_source") == _GRANT_TYPE_BROKER 

1628 broker_attempt_succeeded_just_now = "error" not in response 

1629 if account_was_established_by_broker or broker_attempt_succeeded_just_now: 

1630 return self._process_broker_response(response, scopes, data) 

1631 

1632 if auth_scheme: 

1633 raise ValueError(self._AUTH_SCHEME_UNSUPPORTED) 

1634 if account: 

1635 result = self._acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family( 

1636 authority, self._decorate_scope(scopes), account, 

1637 refresh_reason=refresh_reason, claims_challenge=claims_challenge, 

1638 correlation_id=correlation_id, 

1639 **kwargs) 

1640 else: # The caller is acquire_token_for_client() 

1641 result = self._acquire_token_for_client( 

1642 scopes, refresh_reason, claims_challenge=claims_challenge, 

1643 **kwargs) 

1644 if result and "access_token" in result: 

1645 result[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

1646 if (result and "error" not in result) or (not access_token_from_cache): 

1647 return result 

1648 except http_exceptions: 

1649 # Typically network error. Potential AAD outage? 

1650 if not access_token_from_cache: # It means there is no fall back option 

1651 raise # We choose to bubble up the exception 

1652 return access_token_from_cache 

1653 

1654 def _process_broker_response(self, response, scopes, data): 

1655 if "error" not in response: 

1656 self.token_cache.add(dict( 

1657 client_id=self.client_id, 

1658 scope=response["scope"].split() if "scope" in response else scopes, 

1659 token_endpoint=self.authority.token_endpoint, 

1660 response=response, 

1661 data=data, 

1662 _account_id=response["_account_id"], 

1663 environment=self.authority.instance, # Be consistent with non-broker flows 

1664 grant_type=_GRANT_TYPE_BROKER, # A pseudo grant type for TokenCache to mark account_source as broker 

1665 )) 

1666 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_BROKER 

1667 return _clean_up(response) 

1668 

1669 def _acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family( 

1670 self, authority, scopes, account, **kwargs): 

1671 query = { 

1672 "environment": authority.instance, 

1673 "home_account_id": (account or {}).get("home_account_id"), 

1674 # "realm": authority.tenant, # AAD RTs are tenant-independent 

1675 } 

1676 app_metadata = self._get_app_metadata(authority.instance) 

1677 if not app_metadata: # Meaning this app is now used for the first time. 

1678 # When/if we have a way to directly detect current app's family, 

1679 # we'll rewrite this block, to support multiple families. 

1680 # For now, we try existing RTs (*). If it works, we are in that family. 

1681 # (*) RTs of a different app/family are not supposed to be 

1682 # shared with or accessible by us in the first place. 

1683 at = self._acquire_token_silent_by_finding_specific_refresh_token( 

1684 authority, scopes, 

1685 dict(query, family_id="1"), # A hack, we have only 1 family for now 

1686 rt_remover=lambda rt_item: None, # NO-OP b/c RTs are likely not mine 

1687 break_condition=lambda response: # Break loop when app not in family 

1688 # Based on an AAD-only behavior mentioned in internal doc here 

1689 # https://msazure.visualstudio.com/One/_git/ESTS-Docs/pullrequest/1138595 

1690 "client_mismatch" in response.get("error_additional_info", []), 

1691 **kwargs) 

1692 if at and "error" not in at: 

1693 return at 

1694 last_resp = None 

1695 if app_metadata.get("family_id"): # Meaning this app belongs to this family 

1696 last_resp = at = self._acquire_token_silent_by_finding_specific_refresh_token( 

1697 authority, scopes, dict(query, family_id=app_metadata["family_id"]), 

1698 **kwargs) 

1699 if at and "error" not in at: 

1700 return at 

1701 # Either this app is an orphan, so we will naturally use its own RT; 

1702 # or all attempts above have failed, so we fall back to non-foci behavior. 

1703 return self._acquire_token_silent_by_finding_specific_refresh_token( 

1704 authority, scopes, dict(query, client_id=self.client_id), 

1705 **kwargs) or last_resp 

1706 

1707 def _get_app_metadata(self, environment): 

1708 return self.token_cache._get_app_metadata( 

1709 environment=environment, client_id=self.client_id, default={}) 

1710 

1711 def _acquire_token_silent_by_finding_specific_refresh_token( 

1712 self, authority, scopes, query, 

1713 rt_remover=None, break_condition=lambda response: False, 

1714 refresh_reason=None, correlation_id=None, claims_challenge=None, 

1715 **kwargs): 

1716 matches = list(self.token_cache.search( # We want a list to test emptiness 

1717 self.token_cache.CredentialType.REFRESH_TOKEN, 

1718 # target=scopes, # AAD RTs are scope-independent 

1719 query=query)) 

1720 logger.debug("Found %d RTs matching %s", len(matches), { 

1721 k: _pii_less_home_account_id(v) if k == "home_account_id" and v else v 

1722 for k, v in query.items() 

1723 }) 

1724 

1725 response = None # A distinguishable value to mean cache is empty 

1726 if not matches: # Then exit early to avoid expensive operations 

1727 return response 

1728 client, _ = self._build_client( 

1729 # Potentially expensive if building regional client 

1730 self.client_credential, authority, skip_regional_client=True) 

1731 telemetry_context = self._build_telemetry_context( 

1732 self.ACQUIRE_TOKEN_SILENT_ID, 

1733 correlation_id=correlation_id, refresh_reason=refresh_reason) 

1734 for entry in sorted( # Since unfit RTs would not be aggressively removed, 

1735 # we start from newer RTs which are more likely fit. 

1736 matches, 

1737 key=lambda e: int(e.get("last_modification_time", "0")), 

1738 reverse=True): 

1739 logger.debug("Cache attempts an RT") 

1740 headers = telemetry_context.generate_headers() 

1741 if query.get("home_account_id"): # Then use it as CCS Routing info 

1742 headers["X-AnchorMailbox"] = "Oid:{}".format( # case-insensitive value 

1743 query["home_account_id"].replace(".", "@")) 

1744 response = client.obtain_token_by_refresh_token( 

1745 entry, rt_getter=lambda token_item: token_item["secret"], 

1746 on_removing_rt=lambda rt_item: None, # Disable RT removal, 

1747 # because an invalid_grant could be caused by new MFA policy, 

1748 # the RT could still be useful for other MFA-less scope or tenant 

1749 on_obtaining_tokens=lambda event: self.token_cache.add(dict( 

1750 event, 

1751 environment=authority.instance, 

1752 skip_account_creation=True, # To honor a concurrent remove_account() 

1753 )), 

1754 scope=scopes, 

1755 headers=headers, 

1756 data=dict( 

1757 kwargs.pop("data", {}), 

1758 claims=_merge_claims_challenge_and_capabilities( 

1759 self._client_capabilities, claims_challenge)), 

1760 **kwargs) 

1761 telemetry_context.update_telemetry(response) 

1762 if "error" not in response: 

1763 return response 

1764 logger.debug("Refresh failed. {error}: {error_description}".format( 

1765 error=response.get("error"), 

1766 error_description=response.get("error_description"), 

1767 )) 

1768 if break_condition(response): 

1769 break 

1770 return response # Returns the latest error (if any), or just None 

1771 

1772 def _validate_ssh_cert_input_data(self, data): 

1773 if data.get("token_type") == "ssh-cert": 

1774 if not data.get("req_cnf"): 

1775 raise ValueError( 

1776 "When requesting an SSH certificate, " 

1777 "you must include a string parameter named 'req_cnf' " 

1778 "containing the public key in JWK format " 

1779 "(https://tools.ietf.org/html/rfc7517).") 

1780 if not data.get("key_id"): 

1781 raise ValueError( 

1782 "When requesting an SSH certificate, " 

1783 "you must include a string parameter named 'key_id' " 

1784 "which identifies the key in the 'req_cnf' argument.") 

1785 

1786 def acquire_token_by_refresh_token(self, refresh_token, scopes, **kwargs): 

1787 """Acquire token(s) based on a refresh token (RT) obtained from elsewhere. 

1788 

1789 You use this method only when you have old RTs from elsewhere, 

1790 and now you want to migrate them into MSAL. 

1791 Calling this method results in new tokens automatically storing into MSAL. 

1792 

1793 You do NOT need to use this method if you are already using MSAL. 

1794 MSAL maintains RT automatically inside its token cache, 

1795 and an access token can be retrieved 

1796 when you call :func:`~acquire_token_silent`. 

1797 

1798 :param str refresh_token: The old refresh token, as a string. 

1799 

1800 :param list scopes: 

1801 The scopes associate with this old RT. 

1802 Each scope needs to be in the Microsoft identity platform (v2) format. 

1803 See `Scopes not resources <https://docs.microsoft.com/en-us/azure/active-directory/develop/migrate-python-adal-msal#scopes-not-resources>`_. 

1804 

1805 :return: 

1806 * A dict contains "error" and some other keys, when error happened. 

1807 * A dict contains no "error" key means migration was successful. 

1808 """ 

1809 self._validate_ssh_cert_input_data(kwargs.get("data", {})) 

1810 telemetry_context = self._build_telemetry_context( 

1811 self.ACQUIRE_TOKEN_BY_REFRESH_TOKEN, 

1812 refresh_reason=msal.telemetry.FORCE_REFRESH) 

1813 response = _clean_up(self.client.obtain_token_by_refresh_token( 

1814 refresh_token, 

1815 scope=self._decorate_scope(scopes), 

1816 headers=telemetry_context.generate_headers(), 

1817 rt_getter=lambda rt: rt, 

1818 on_updating_rt=False, 

1819 on_removing_rt=lambda rt_item: None, # No OP 

1820 **kwargs)) 

1821 if "access_token" in response: 

1822 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

1823 telemetry_context.update_telemetry(response) 

1824 return response 

1825 

1826 def acquire_token_by_username_password( 

1827 self, username, password, scopes, claims_challenge=None, 

1828 # Note: We shouldn't need to surface enable_msa_passthrough, 

1829 # because this ROPC won't work with MSA account anyway. 

1830 auth_scheme=None, 

1831 **kwargs): 

1832 """Gets a token for a given resource via user credentials. 

1833 

1834 See this page for constraints of Username Password Flow. 

1835 https://github.com/AzureAD/microsoft-authentication-library-for-python/wiki/Username-Password-Authentication 

1836 

1837 :param str username: Typically a UPN in the form of an email address. 

1838 :param str password: The password. 

1839 :param list[str] scopes: 

1840 Scopes requested to access a protected API (a resource). 

1841 :param claims_challenge: 

1842 The claims_challenge parameter requests specific claims requested by the resource provider 

1843 in the form of a claims_challenge directive in the www-authenticate header to be 

1844 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

1845 It is a string of a JSON object which contains lists of claims being requested from these locations. 

1846 

1847 :param object auth_scheme: 

1848 You can provide an ``msal.auth_scheme.PopAuthScheme`` object 

1849 so that MSAL will get a Proof-of-Possession (POP) token for you. 

1850 

1851 New in version 1.26.0. 

1852 

1853 :return: A dict representing the json response from Microsoft Entra: 

1854 

1855 - A successful response would contain "access_token" key, 

1856 - an error response would contain "error" and usually "error_description". 

1857 

1858 [Deprecated] This API is deprecated for public client flows and will be 

1859 removed in a future release. Use a more secure flow instead. 

1860 Migration guide: https://aka.ms/msal-ropc-migration 

1861 

1862 """ 

1863 is_confidential_app = self.client_credential or isinstance( 

1864 self, ConfidentialClientApplication) 

1865 if not is_confidential_app: 

1866 warnings.warn("""This API has been deprecated for public client flows, please use a more secure flow. 

1867 See https://aka.ms/msal-ropc-migration for migration guidance""", DeprecationWarning) 

1868 claims = _merge_claims_challenge_and_capabilities( 

1869 self._client_capabilities, claims_challenge) 

1870 if self._enable_broker and sys.platform in ("win32", "darwin"): 

1871 from .broker import _signin_silently 

1872 response = _signin_silently( 

1873 "https://{}/{}".format(self.authority.instance, self.authority.tenant), 

1874 self.client_id, 

1875 scopes, # Decorated scopes won't work due to offline_access 

1876 MSALRuntime_Username=username, 

1877 MSALRuntime_Password=password, 

1878 validateAuthority="no" if ( 

1879 self.authority._is_known_to_developer 

1880 or self._instance_discovery is False) else None, 

1881 claims=claims, 

1882 auth_scheme=auth_scheme, 

1883 ) 

1884 return self._process_broker_response(response, scopes, kwargs.get("data", {})) 

1885 

1886 if auth_scheme: 

1887 raise ValueError(self._AUTH_SCHEME_UNSUPPORTED) 

1888 scopes = self._decorate_scope(scopes) 

1889 telemetry_context = self._build_telemetry_context( 

1890 self.ACQUIRE_TOKEN_BY_USERNAME_PASSWORD_ID) 

1891 headers = telemetry_context.generate_headers() 

1892 data = dict(kwargs.pop("data", {}), claims=claims) 

1893 response = None 

1894 if not self.authority.is_adfs: 

1895 user_realm_result = self.authority.user_realm_discovery( 

1896 username, correlation_id=headers[msal.telemetry.CLIENT_REQUEST_ID]) 

1897 if user_realm_result.get("account_type") == "Federated": 

1898 response = _clean_up(self._acquire_token_by_username_password_federated( 

1899 user_realm_result, username, password, scopes=scopes, 

1900 data=data, 

1901 headers=headers, **kwargs)) 

1902 if response is None: # Either ADFS or not federated 

1903 response = _clean_up(self.client.obtain_token_by_username_password( 

1904 username, password, scope=scopes, 

1905 headers=headers, 

1906 data=data, 

1907 **kwargs)) 

1908 if "access_token" in response: 

1909 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

1910 telemetry_context.update_telemetry(response) 

1911 return response 

1912 

1913 def _acquire_token_by_username_password_federated( 

1914 self, user_realm_result, username, password, scopes=None, **kwargs): 

1915 wstrust_endpoint = {} 

1916 if user_realm_result.get("federation_metadata_url"): 

1917 wstrust_endpoint = mex_send_request( 

1918 user_realm_result["federation_metadata_url"], 

1919 self.http_client) 

1920 if wstrust_endpoint is None: 

1921 raise ValueError("Unable to find wstrust endpoint from MEX. " 

1922 "This typically happens when attempting MSA accounts. " 

1923 "More details available here. " 

1924 "https://github.com/AzureAD/microsoft-authentication-library-for-python/wiki/Username-Password-Authentication") 

1925 logger.debug("wstrust_endpoint = %s", wstrust_endpoint) 

1926 wstrust_result = wst_send_request( 

1927 username, password, 

1928 user_realm_result.get("cloud_audience_urn", "urn:federation:MicrosoftOnline"), 

1929 wstrust_endpoint.get("address", 

1930 # Fallback to an AAD supplied endpoint 

1931 user_realm_result.get("federation_active_auth_url")), 

1932 wstrust_endpoint.get("action"), self.http_client) 

1933 if not ("token" in wstrust_result and "type" in wstrust_result): 

1934 raise RuntimeError("Unsuccessful RSTR. %s" % wstrust_result) 

1935 GRANT_TYPE_SAML1_1 = 'urn:ietf:params:oauth:grant-type:saml1_1-bearer' 

1936 grant_type = { 

1937 SAML_TOKEN_TYPE_V1: GRANT_TYPE_SAML1_1, 

1938 SAML_TOKEN_TYPE_V2: self.client.GRANT_TYPE_SAML2, 

1939 WSS_SAML_TOKEN_PROFILE_V1_1: GRANT_TYPE_SAML1_1, 

1940 WSS_SAML_TOKEN_PROFILE_V2: self.client.GRANT_TYPE_SAML2 

1941 }.get(wstrust_result.get("type")) 

1942 if not grant_type: 

1943 raise RuntimeError( 

1944 "RSTR returned unknown token type: %s", wstrust_result.get("type")) 

1945 self.client.grant_assertion_encoders.setdefault( # Register a non-standard type 

1946 grant_type, self.client.encode_saml_assertion) 

1947 return self.client.obtain_token_by_assertion( 

1948 wstrust_result["token"], grant_type, scope=scopes, 

1949 on_obtaining_tokens=lambda event: self.token_cache.add(dict( 

1950 event, 

1951 environment=self.authority.instance, 

1952 username=username, # Useful in case IDT contains no such info 

1953 )), 

1954 **kwargs) 

1955 

1956 

1957class PublicClientApplication(ClientApplication): # browser app or mobile app 

1958 

1959 DEVICE_FLOW_CORRELATION_ID = "_correlation_id" 

1960 CONSOLE_WINDOW_HANDLE = object() 

1961 

1962 def __init__( 

1963 self, client_id, client_credential=None, 

1964 *, 

1965 enable_broker_on_windows=None, 

1966 enable_broker_on_mac=None, 

1967 enable_broker_on_linux=None, 

1968 enable_broker_on_wsl=None, 

1969 **kwargs): 

1970 """Same as :func:`ClientApplication.__init__`, 

1971 except that ``client_credential`` parameter shall remain ``None``. 

1972 

1973 .. note:: 

1974 

1975 **What is a broker, and why use it?** 

1976 

1977 A broker is a component installed on your device. 

1978 Broker implicitly gives your device an identity. By using a broker, 

1979 your device becomes a factor that can satisfy MFA (Multi-factor authentication). 

1980 This factor would become mandatory 

1981 if a tenant's admin enables a corresponding Conditional Access (CA) policy. 

1982 The broker's presence allows Microsoft identity platform 

1983 to have higher confidence that the tokens are being issued to your device, 

1984 and that is more secure. 

1985 

1986 An additional benefit of broker is, 

1987 it runs as a long-lived process with your device's OS, 

1988 and maintains its own cache, 

1989 so that your broker-enabled apps (even a CLI) 

1990 could automatically SSO from a previously established signed-in session. 

1991 

1992 **How to opt in to use broker?** 

1993 

1994 1. You can set any combination of the following opt-in parameters to true: 

1995 

1996 +--------------------------+-----------------------------------+------------------------------------------------------------------------------------+ 

1997 | Opt-in flag | If app will run on | App has registered this as a Desktop platform redirect URI in Azure Portal | 

1998 +==========================+===================================+====================================================================================+ 

1999 | enable_broker_on_windows | Windows 10+ | ms-appx-web://Microsoft.AAD.BrokerPlugin/your_client_id | 

2000 +--------------------------+-----------------------------------+------------------------------------------------------------------------------------+ 

2001 | enable_broker_on_wsl | WSL | ms-appx-web://Microsoft.AAD.BrokerPlugin/your_client_id | 

2002 +--------------------------+-----------------------------------+------------------------------------------------------------------------------------+ 

2003 | enable_broker_on_mac | Mac with Company Portal installed | msauth.com.msauth.unsignedapp://auth | 

2004 +--------------------------+-----------------------------------+------------------------------------------------------------------------------------+ 

2005 | enable_broker_on_linux | Linux with Intune installed | ``https://login.microsoftonline.com/common/oauth2/nativeclient`` (MUST be enabled) | 

2006 +--------------------------+-----------------------------------+------------------------------------------------------------------------------------+ 

2007 

2008 2. Install broker dependency, 

2009 e.g. ``pip install msal[broker]>=1.33,<2``. 

2010 

2011 3. Test with ``acquire_token_interactive()`` and ``acquire_token_silent()``. 

2012 

2013 **The fallback behaviors of MSAL Python's broker support** 

2014 

2015 MSAL will either error out, or silently fallback to non-broker flows. 

2016 

2017 1. MSAL will ignore the `enable_broker_...` and bypass broker 

2018 on those auth flows that are known to be NOT supported by broker. 

2019 This includes ADFS, B2C, etc.. 

2020 For other "could-use-broker" scenarios, please see below. 

2021 2. MSAL errors out when app developer opted-in to use broker 

2022 but a direct dependency "mid-tier" package is not installed. 

2023 Error message guides app developer to declare the correct dependency 

2024 ``msal[broker]``. 

2025 We error out here because the error is actionable to app developers. 

2026 3. MSAL silently "deactivates" the broker and fallback to non-broker, 

2027 when opted-in, dependency installed yet failed to initialize. 

2028 We anticipate this would happen on a device whose OS is too old 

2029 or the underlying broker component is somehow unavailable. 

2030 There is not much an app developer or the end user can do here. 

2031 Eventually, the conditional access policy shall 

2032 force the user to switch to a different device. 

2033 4. MSAL errors out when broker is opted in, installed, initialized, 

2034 but subsequent token request(s) failed. 

2035 

2036 :param boolean enable_broker_on_windows: 

2037 This setting is only effective if your app is running on Windows 10+. 

2038 This parameter defaults to None, which means MSAL will not utilize a broker. 

2039 

2040 New in MSAL Python 1.25.0. 

2041 

2042 :param boolean enable_broker_on_mac: 

2043 This setting is only effective if your app is running on Mac. 

2044 This parameter defaults to None, which means MSAL will not utilize a broker. 

2045 

2046 New in MSAL Python 1.31.0. 

2047 

2048 :param boolean enable_broker_on_linux: 

2049 This setting is only effective if your app is running on Linux, including WSL. 

2050 This parameter defaults to None, which means MSAL will not utilize a broker. 

2051 

2052 New in MSAL Python 1.33.0. 

2053 

2054 :param boolean enable_broker_on_wsl: 

2055 This setting is only effective if your app is running on WSL. 

2056 This parameter defaults to None, which means MSAL will not utilize a broker. 

2057 

2058 New in MSAL Python 1.33.0. 

2059 """ 

2060 if client_credential is not None: 

2061 raise ValueError("Public Client should not possess credentials") 

2062 

2063 self._enable_broker = bool( 

2064 enable_broker_on_windows and sys.platform == "win32" 

2065 or enable_broker_on_mac and sys.platform == "darwin" 

2066 or enable_broker_on_linux and sys.platform == "linux" 

2067 or enable_broker_on_wsl and is_wsl() 

2068 ) 

2069 

2070 super(PublicClientApplication, self).__init__( 

2071 client_id, client_credential=None, **kwargs) 

2072 

2073 def acquire_token_interactive( 

2074 self, 

2075 scopes, # type: list[str] 

2076 prompt=None, 

2077 login_hint=None, # type: Optional[str] 

2078 domain_hint=None, # type: Optional[str] 

2079 claims_challenge=None, 

2080 timeout=None, 

2081 port=None, 

2082 extra_scopes_to_consent=None, 

2083 max_age=None, 

2084 parent_window_handle=None, 

2085 on_before_launching_ui=None, 

2086 auth_scheme=None, 

2087 **kwargs): 

2088 """Acquire token interactively i.e. via a local browser. 

2089 

2090 Prerequisite: In Azure Portal, configure the Redirect URI of your 

2091 "Mobile and Desktop application" as ``http://localhost``. 

2092 If you opts in to use broker during ``PublicClientApplication`` creation, 

2093 your app also need this Redirect URI: 

2094 ``ms-appx-web://Microsoft.AAD.BrokerPlugin/YOUR_CLIENT_ID`` 

2095 

2096 :param list scopes: 

2097 It is a list of case-sensitive strings. 

2098 :param str prompt: 

2099 By default, no prompt value will be sent, not even string ``"none"``. 

2100 You will have to specify a value explicitly. 

2101 Its valid values are the constants defined in 

2102 :class:`Prompt <msal.Prompt>`. 

2103 :param str login_hint: 

2104 Optional. Identifier of the user. Generally a User Principal Name (UPN). 

2105 :param domain_hint: 

2106 Can be one of "consumers" or "organizations" or your tenant domain "contoso.com". 

2107 If included, it will skip the email-based discovery process that user goes 

2108 through on the sign-in page, leading to a slightly more streamlined user experience. 

2109 More information on possible values available in 

2110 `Auth Code Flow doc <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-auth-code-flow#request-an-authorization-code>`_ and 

2111 `domain_hint doc <https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-oapx/86fb452d-e34a-494e-ac61-e526e263b6d8>`_. 

2112 

2113 :param claims_challenge: 

2114 The claims_challenge parameter requests specific claims requested by the resource provider 

2115 in the form of a claims_challenge directive in the www-authenticate header to be 

2116 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

2117 It is a string of a JSON object which contains lists of claims being requested from these locations. 

2118 

2119 :param int timeout: 

2120 This method will block the current thread. 

2121 This parameter specifies the timeout value in seconds. 

2122 Default value ``None`` means wait indefinitely. 

2123 

2124 :param int port: 

2125 The port to be used to listen to an incoming auth response. 

2126 By default we will use a system-allocated port. 

2127 (The rest of the redirect_uri is hard coded as ``http://localhost``.) 

2128 

2129 :param list extra_scopes_to_consent: 

2130 "Extra scopes to consent" is a concept only available in Microsoft Entra. 

2131 It refers to other resources you might want to prompt to consent for, 

2132 in the same interaction, but for which you won't get back a 

2133 token for in this particular operation. 

2134 

2135 :param int max_age: 

2136 OPTIONAL. Maximum Authentication Age. 

2137 Specifies the allowable elapsed time in seconds 

2138 since the last time the End-User was actively authenticated. 

2139 If the elapsed time is greater than this value, 

2140 Microsoft identity platform will actively re-authenticate the End-User. 

2141 

2142 MSAL Python will also automatically validate the auth_time in ID token. 

2143 

2144 New in version 1.15. 

2145 

2146 :param int parent_window_handle: 

2147 OPTIONAL. 

2148 

2149 * If your app does not opt in to use broker, 

2150 you do not need to provide a ``parent_window_handle`` here. 

2151 

2152 * If your app opts in to use broker, 

2153 ``parent_window_handle`` is required. 

2154 

2155 - If your app is a GUI app running on Windows or Mac system, 

2156 you are required to also provide its window handle, 

2157 so that the sign-in window will pop up on top of your window. 

2158 - If your app is a console app running on Windows or Mac system, 

2159 you can use a placeholder 

2160 ``PublicClientApplication.CONSOLE_WINDOW_HANDLE``. 

2161 

2162 Most Python scripts are console apps. 

2163 

2164 New in version 1.20.0. 

2165 

2166 :param function on_before_launching_ui: 

2167 A callback with the form of 

2168 ``lambda ui="xyz", **kwargs: print("A {} will be launched".format(ui))``, 

2169 where ``ui`` will be either "browser" or "broker". 

2170 You can use it to inform your end user to expect a pop-up window. 

2171 

2172 New in version 1.20.0. 

2173 

2174 :param object auth_scheme: 

2175 You can provide an ``msal.auth_scheme.PopAuthScheme`` object 

2176 so that MSAL will get a Proof-of-Possession (POP) token for you. 

2177 

2178 New in version 1.26.0. 

2179 

2180 :return: 

2181 - A dict containing no "error" key, 

2182 and typically contains an "access_token" key. 

2183 - A dict containing an "error" key, when token refresh failed. 

2184 """ 

2185 data = kwargs.pop("data", {}) 

2186 enable_msa_passthrough = kwargs.pop( # MUST remove it from kwargs 

2187 "enable_msa_passthrough", # Keep it as a hidden param, for now. 

2188 # OPTIONAL. MSA-Passthrough is a legacy configuration, 

2189 # needed by a small amount of Microsoft first-party apps, 

2190 # which would login MSA accounts via ".../organizations" authority. 

2191 # If you app belongs to this category, AND you are enabling broker, 

2192 # you would want to enable this flag. Default value is False. 

2193 # More background of MSA-PT is available from this internal docs: 

2194 # https://microsoft.sharepoint.com/:w:/t/Identity-DevEx/EatIUauX3c9Ctw1l7AQ6iM8B5CeBZxc58eoQCE0IuZ0VFw?e=tgc3jP&CID=39c853be-76ea-79d7-ee73-f1b2706ede05 

2195 False 

2196 ) and data.get("token_type") != "ssh-cert" # Work around a known issue as of PyMsalRuntime 0.8 

2197 self._validate_ssh_cert_input_data(data) 

2198 is_ssh_cert_or_pop_request = _is_ssh_cert_or_pop_request(data.get("token_type"), auth_scheme) 

2199 

2200 if not on_before_launching_ui: 

2201 on_before_launching_ui = lambda **kwargs: None 

2202 if _is_running_in_cloud_shell() and prompt == "none": 

2203 # Note: _acquire_token_by_cloud_shell() is always silent, 

2204 # so we would not fire on_before_launching_ui() 

2205 return self._acquire_token_by_cloud_shell(scopes, data=data) 

2206 claims = _merge_claims_challenge_and_capabilities( 

2207 self._client_capabilities, claims_challenge) 

2208 if self._enable_broker and (sys.platform in ("win32", "darwin") or not is_ssh_cert_or_pop_request): 

2209 if parent_window_handle is None: 

2210 raise ValueError( 

2211 "parent_window_handle is required when you opted into using broker. " 

2212 "You need to provide the window handle of your GUI application, " 

2213 "or use msal.PublicClientApplication.CONSOLE_WINDOW_HANDLE " 

2214 "when and only when your application is a console app.") 

2215 if extra_scopes_to_consent: 

2216 logger.warning( 

2217 "Ignoring parameter extra_scopes_to_consent, " 

2218 "which is not supported by broker") 

2219 response = self._acquire_token_interactive_via_broker( 

2220 scopes, 

2221 parent_window_handle, 

2222 enable_msa_passthrough, 

2223 claims, 

2224 data, 

2225 on_before_launching_ui, 

2226 auth_scheme, 

2227 prompt=prompt, 

2228 login_hint=login_hint, 

2229 max_age=max_age, 

2230 ) 

2231 return self._process_broker_response(response, scopes, data) 

2232 

2233 if isinstance(auth_scheme, msal.auth_scheme.PopAuthScheme) and sys.platform == "linux": 

2234 raise ValueError("POP is not supported on Linux") 

2235 elif auth_scheme: 

2236 raise ValueError(self._AUTH_SCHEME_UNSUPPORTED) 

2237 on_before_launching_ui(ui="browser") 

2238 telemetry_context = self._build_telemetry_context( 

2239 self.ACQUIRE_TOKEN_INTERACTIVE) 

2240 response = _clean_up(self.client.obtain_token_by_browser( 

2241 scope=self._decorate_scope(scopes) if scopes else None, 

2242 extra_scope_to_consent=extra_scopes_to_consent, 

2243 redirect_uri="http://localhost:{port}".format( 

2244 # Hardcode the host, for now. AAD portal rejects 127.0.0.1 anyway 

2245 port=port or 0), 

2246 prompt=prompt, 

2247 login_hint=login_hint, 

2248 max_age=max_age, 

2249 timeout=timeout, 

2250 auth_params={ 

2251 "claims": claims, 

2252 "domain_hint": domain_hint, 

2253 }, 

2254 data=dict(data, claims=claims), 

2255 headers=telemetry_context.generate_headers(), 

2256 browser_name=_preferred_browser(), 

2257 **kwargs)) 

2258 if "access_token" in response: 

2259 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

2260 telemetry_context.update_telemetry(response) 

2261 return response 

2262 

2263 def _acquire_token_interactive_via_broker( 

2264 self, 

2265 scopes, # type: list[str] 

2266 parent_window_handle, # type: int 

2267 enable_msa_passthrough, # type: boolean 

2268 claims, # type: str 

2269 data, # type: dict 

2270 on_before_launching_ui, # type: callable 

2271 auth_scheme, # type: object 

2272 prompt=None, 

2273 login_hint=None, # type: Optional[str] 

2274 max_age=None, 

2275 **kwargs): 

2276 from .broker import _signin_interactively, _signin_silently, _acquire_token_silently 

2277 if "welcome_template" in kwargs: 

2278 logger.debug(kwargs["welcome_template"]) # Experimental 

2279 authority = "https://{}/{}".format( 

2280 self.authority.instance, self.authority.tenant) 

2281 validate_authority = "no" if ( 

2282 self.authority._is_known_to_developer 

2283 or self._instance_discovery is False) else None 

2284 # Calls different broker methods to mimic the OIDC behaviors 

2285 if login_hint and prompt != "select_account": # OIDC prompts when the user did not sign in 

2286 accounts = self.get_accounts(username=login_hint) 

2287 if len(accounts) == 1: # Unambiguously proceed with this account 

2288 logger.debug("Calling broker._acquire_token_silently()") 

2289 response = _acquire_token_silently( # When it works, it bypasses prompt 

2290 authority, 

2291 self.client_id, 

2292 accounts[0]["local_account_id"], 

2293 scopes, 

2294 claims=claims, 

2295 auth_scheme=auth_scheme, 

2296 **data) 

2297 if response and "error" not in response: 

2298 return response 

2299 # login_hint undecisive or not exists 

2300 if prompt == "none" or not prompt: # Must/Can attempt _signin_silently() 

2301 logger.debug("Calling broker._signin_silently()") 

2302 response = _signin_silently( # Unlike OIDC, it doesn't honor login_hint 

2303 authority, self.client_id, scopes, 

2304 validateAuthority=validate_authority, 

2305 claims=claims, 

2306 max_age=max_age, 

2307 enable_msa_pt=enable_msa_passthrough, 

2308 auth_scheme=auth_scheme, 

2309 **data) 

2310 is_wrong_account = bool( 

2311 # _signin_silently() only gets tokens for default account, 

2312 # but this seems to have been fixed in PyMsalRuntime 0.11.2 

2313 "access_token" in response and login_hint 

2314 and login_hint != response.get( 

2315 "id_token_claims", {}).get("preferred_username")) 

2316 wrong_account_error_message = ( 

2317 'prompt="none" will not work for login_hint="non-default-user"') 

2318 if is_wrong_account: 

2319 logger.debug(wrong_account_error_message) 

2320 if prompt == "none": 

2321 return response if not is_wrong_account else { 

2322 "error": "broker_error", 

2323 "error_description": wrong_account_error_message, 

2324 } 

2325 else: 

2326 assert bool(prompt) is False 

2327 from pymsalruntime import Response_Status 

2328 recoverable_errors = frozenset([ 

2329 Response_Status.Status_AccountUnusable, 

2330 Response_Status.Status_InteractionRequired, 

2331 ]) 

2332 if is_wrong_account or "error" in response and response.get( 

2333 "_broker_status") in recoverable_errors: 

2334 pass # It will fall back to the _signin_interactively() 

2335 else: 

2336 return response 

2337 

2338 logger.debug("Falls back to broker._signin_interactively()") 

2339 on_before_launching_ui(ui="broker") 

2340 return _signin_interactively( 

2341 authority, self.client_id, scopes, 

2342 None if parent_window_handle is self.CONSOLE_WINDOW_HANDLE 

2343 else parent_window_handle, 

2344 validateAuthority=validate_authority, 

2345 login_hint=login_hint, 

2346 prompt=prompt, 

2347 claims=claims, 

2348 max_age=max_age, 

2349 enable_msa_pt=enable_msa_passthrough, 

2350 auth_scheme=auth_scheme, 

2351 **data) 

2352 

2353 def initiate_device_flow(self, scopes=None, *, claims_challenge=None, **kwargs): 

2354 """Initiate a Device Flow instance, 

2355 which will be used in :func:`~acquire_token_by_device_flow`. 

2356 

2357 :param list[str] scopes: 

2358 Scopes requested to access a protected API (a resource). 

2359 :return: A dict representing a newly created Device Flow object. 

2360 

2361 - A successful response would contain "user_code" key, among others 

2362 - an error response would contain some other readable key/value pairs. 

2363 """ 

2364 correlation_id = msal.telemetry._get_new_correlation_id() 

2365 flow = self.client.initiate_device_flow( 

2366 scope=self._decorate_scope(scopes or []), 

2367 headers={msal.telemetry.CLIENT_REQUEST_ID: correlation_id}, 

2368 data={"claims": _merge_claims_challenge_and_capabilities( 

2369 self._client_capabilities, claims_challenge)}, 

2370 **kwargs) 

2371 flow[self.DEVICE_FLOW_CORRELATION_ID] = correlation_id 

2372 return flow 

2373 

2374 def acquire_token_by_device_flow(self, flow, claims_challenge=None, **kwargs): 

2375 """Obtain token by a device flow object, with customizable polling effect. 

2376 

2377 :param dict flow: 

2378 A dict previously generated by :func:`~initiate_device_flow`. 

2379 By default, this method's polling effect will block current thread. 

2380 You can abort the polling loop at any time, 

2381 by changing the value of the flow's "expires_at" key to 0. 

2382 :param claims_challenge: 

2383 The claims_challenge parameter requests specific claims requested by the resource provider 

2384 in the form of a claims_challenge directive in the www-authenticate header to be 

2385 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

2386 It is a string of a JSON object which contains lists of claims being requested from these locations. 

2387 

2388 :return: A dict representing the json response from Microsoft Entra: 

2389 

2390 - A successful response would contain "access_token" key, 

2391 - an error response would contain "error" and usually "error_description". 

2392 """ 

2393 telemetry_context = self._build_telemetry_context( 

2394 self.ACQUIRE_TOKEN_BY_DEVICE_FLOW_ID, 

2395 correlation_id=flow.get(self.DEVICE_FLOW_CORRELATION_ID)) 

2396 response = _clean_up(self.client.obtain_token_by_device_flow( 

2397 flow, 

2398 data=dict( 

2399 kwargs.pop("data", {}), 

2400 code=flow["device_code"], # 2018-10-4 Hack: 

2401 # during transition period, 

2402 # service seemingly need both device_code and code parameter. 

2403 claims=_merge_claims_challenge_and_capabilities( 

2404 self._client_capabilities, claims_challenge), 

2405 ), 

2406 headers=telemetry_context.generate_headers(), 

2407 **kwargs)) 

2408 if "access_token" in response: 

2409 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

2410 telemetry_context.update_telemetry(response) 

2411 return response 

2412 

2413 

2414class ConfidentialClientApplication(ClientApplication): # server-side web app 

2415 """Same as :func:`ClientApplication.__init__`, 

2416 except that ``allow_broker`` parameter shall remain ``None``. 

2417 """ 

2418 

2419 def acquire_token_for_client(self, scopes, claims_challenge=None, **kwargs): 

2420 """Acquires token for the current confidential client, not for an end user. 

2421 

2422 Since MSAL Python 1.23, it will automatically look for token from cache, 

2423 and only send request to Identity Provider when cache misses. 

2424 

2425 :param list[str] scopes: (Required) 

2426 Scopes requested to access a protected API (a resource). 

2427 :param claims_challenge: 

2428 The claims_challenge parameter requests specific claims requested by the resource provider 

2429 in the form of a claims_challenge directive in the www-authenticate header to be 

2430 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

2431 It is a string of a JSON object which contains lists of claims being requested from these locations. 

2432 

2433 :return: A dict representing the json response from Microsoft Entra: 

2434 

2435 - A successful response would contain "access_token" key, 

2436 - an error response would contain "error" and usually "error_description". 

2437 """ 

2438 if kwargs.get("force_refresh"): 

2439 raise ValueError( # We choose to disallow force_refresh 

2440 "Historically, this method does not support force_refresh behavior. " 

2441 ) 

2442 return _clean_up(self._acquire_token_silent_with_error( 

2443 scopes, None, claims_challenge=claims_challenge, **kwargs)) 

2444 

2445 def _acquire_token_for_client( 

2446 self, 

2447 scopes, 

2448 refresh_reason, 

2449 claims_challenge=None, 

2450 **kwargs 

2451 ): 

2452 if self.authority.tenant.lower() in ["common", "organizations"]: 

2453 warnings.warn( 

2454 "Using /common or /organizations authority " 

2455 "in acquire_token_for_client() is unreliable. " 

2456 "Please use a specific tenant instead.", DeprecationWarning) 

2457 self._validate_ssh_cert_input_data(kwargs.get("data", {})) 

2458 telemetry_context = self._build_telemetry_context( 

2459 self.ACQUIRE_TOKEN_FOR_CLIENT_ID, refresh_reason=refresh_reason) 

2460 client = self._regional_client or self.client 

2461 response = client.obtain_token_for_client( 

2462 scope=scopes, # This grant flow requires no scope decoration 

2463 headers=telemetry_context.generate_headers(), 

2464 data=dict( 

2465 kwargs.pop("data", {}), 

2466 claims=_merge_claims_challenge_and_capabilities( 

2467 self._client_capabilities, claims_challenge)), 

2468 **kwargs) 

2469 telemetry_context.update_telemetry(response) 

2470 return response 

2471 

2472 def remove_tokens_for_client(self): 

2473 """Remove all tokens that were previously acquired via 

2474 :func:`~acquire_token_for_client()` for the current client.""" 

2475 for env in [self.authority.instance] + self._get_authority_aliases( 

2476 self.authority.instance): 

2477 for at in list(self.token_cache.search( # Remove ATs from a snapshot 

2478 TokenCache.CredentialType.ACCESS_TOKEN, query={ 

2479 "client_id": self.client_id, 

2480 "environment": env, 

2481 "home_account_id": None, # These are mostly app-only tokens 

2482 })): 

2483 self.token_cache.remove_at(at) 

2484 # acquire_token_for_client() obtains no RTs, so we have no RT to remove 

2485 

2486 def acquire_token_on_behalf_of(self, user_assertion, scopes, claims_challenge=None, **kwargs): 

2487 """Acquires token using on-behalf-of (OBO) flow. 

2488 

2489 The current app is a middle-tier service which was called with a token 

2490 representing an end user. 

2491 The current app can use such token (a.k.a. a user assertion) to request 

2492 another token to access downstream web API, on behalf of that user. 

2493 See `detail docs here <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-on-behalf-of-flow>`_ . 

2494 

2495 The current middle-tier app has no user interaction to obtain consent. 

2496 See how to gain consent upfront for your middle-tier app from this article. 

2497 https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-on-behalf-of-flow#gaining-consent-for-the-middle-tier-application 

2498 

2499 :param str user_assertion: The incoming token already received by this app 

2500 :param list[str] scopes: Scopes required by downstream API (a resource). 

2501 :param claims_challenge: 

2502 The claims_challenge parameter requests specific claims requested by the resource provider 

2503 in the form of a claims_challenge directive in the www-authenticate header to be 

2504 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

2505 It is a string of a JSON object which contains lists of claims being requested from these locations. 

2506 

2507 :return: A dict representing the json response from Microsoft Entra: 

2508 

2509 - A successful response would contain "access_token" key, 

2510 - an error response would contain "error" and usually "error_description". 

2511 """ 

2512 telemetry_context = self._build_telemetry_context( 

2513 self.ACQUIRE_TOKEN_ON_BEHALF_OF_ID) 

2514 # The implementation is NOT based on Token Exchange (RFC 8693) 

2515 response = _clean_up(self.client.obtain_token_by_assertion( # bases on assertion RFC 7521 

2516 user_assertion, 

2517 self.client.GRANT_TYPE_JWT, # IDTs and AAD ATs are all JWTs 

2518 scope=self._decorate_scope(scopes), # Decoration is used for: 

2519 # 1. Explicitly requesting an RT, without relying on AAD default 

2520 # behavior, even though it currently still issues an RT. 

2521 # 2. Requesting an IDT (which would otherwise be unavailable) 

2522 # so that the calling app could use id_token_claims to implement 

2523 # their own cache mapping, which is likely needed in web apps. 

2524 data=dict( 

2525 kwargs.pop("data", {}), 

2526 requested_token_use="on_behalf_of", 

2527 claims=_merge_claims_challenge_and_capabilities( 

2528 self._client_capabilities, claims_challenge)), 

2529 headers=telemetry_context.generate_headers(), 

2530 # TBD: Expose a login_hint (or ccs_routing_hint) param for web app 

2531 **kwargs)) 

2532 if "access_token" in response: 

2533 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

2534 telemetry_context.update_telemetry(response) 

2535 return response