Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/msal/application.py: 16%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

673 statements  

1import functools 

2import json 

3import time 

4import logging 

5import sys 

6import warnings 

7from threading import Lock 

8from typing import Optional # Needed in Python 3.7 & 3.8 

9from urllib.parse import urlparse 

10import os 

11 

12from .oauth2cli import Client, JwtAssertionCreator 

13from .oauth2cli.oidc import decode_part 

14from .authority import ( 

15 Authority, 

16 WORLD_WIDE, 

17 _get_instance_discovery_endpoint, 

18 _get_instance_discovery_host, 

19) 

20from .mex import send_request as mex_send_request 

21from .wstrust_request import send_request as wst_send_request 

22from .wstrust_response import * 

23from .token_cache import TokenCache, _get_username, _GRANT_TYPE_BROKER 

24import msal.telemetry 

25from .region import _detect_region 

26from .throttled_http_client import ThrottledHttpClient 

27from .cloudshell import _is_running_in_cloud_shell 

28from .sku import SKU, __version__ 

29from .oauth2cli.authcode import is_wsl 

30 

31 

32logger = logging.getLogger(__name__) 

33_AUTHORITY_TYPE_CLOUDSHELL = "CLOUDSHELL" 

34 

35def _init_broker(enable_pii_log): # Make it a function to allow mocking 

36 from . import broker # Trigger Broker's initialization, lazily 

37 if enable_pii_log: 

38 broker._enable_pii_log() 

39 

40def extract_certs(public_cert_content): 

41 # Parses raw public certificate file contents and returns a list of strings 

42 # Usage: headers = {"x5c": extract_certs(open("my_cert.pem").read())} 

43 public_certificates = re.findall( 

44 r'-----BEGIN CERTIFICATE-----(?P<cert_value>[^-]+)-----END CERTIFICATE-----', 

45 public_cert_content, re.I) 

46 if public_certificates: 

47 return [cert.strip() for cert in public_certificates] 

48 # The public cert tags are not found in the input, 

49 # let's make best effort to exclude a private key pem file. 

50 if "PRIVATE KEY" in public_cert_content: 

51 raise ValueError( 

52 "We expect your public key but detect a private key instead") 

53 return [public_cert_content.strip()] 

54 

55 

56def _merge_claims_challenge_and_capabilities(capabilities, claims_challenge): 

57 # Represent capabilities as {"access_token": {"xms_cc": {"values": capabilities}}} 

58 # and then merge/add it into incoming claims 

59 if not capabilities: 

60 return claims_challenge 

61 claims_dict = json.loads(claims_challenge) if claims_challenge else {} 

62 for key in ["access_token"]: # We could add "id_token" if we'd decide to 

63 claims_dict.setdefault(key, {}).update(xms_cc={"values": capabilities}) 

64 return json.dumps(claims_dict) 

65 

66 

67def _str2bytes(raw): 

68 # A conversion based on duck-typing rather than six.text_type 

69 try: 

70 return raw.encode(encoding="utf-8") 

71 except: 

72 return raw 

73 

74def _extract_cert_and_thumbprints(cert): 

75 # Cert concepts https://security.stackexchange.com/a/226758/125264 

76 from cryptography.hazmat.primitives import hashes, serialization 

77 cert_pem = cert.public_bytes( # Requires cryptography 1.0+ 

78 encoding=serialization.Encoding.PEM).decode() 

79 x5c = [ 

80 '\n'.join( 

81 cert_pem.splitlines() 

82 [1:-1] # Strip the "--- header ---" and "--- footer ---" 

83 ) 

84 ] 

85 # https://cryptography.io/en/latest/x509/reference/#x-509-certificate-object - Requires cryptography 0.7+ 

86 sha256_thumbprint = cert.fingerprint(hashes.SHA256()).hex() 

87 sha1_thumbprint = cert.fingerprint(hashes.SHA1()).hex() # CodeQL [SM02167] for legacy support such as ADFS 

88 return sha256_thumbprint, sha1_thumbprint, x5c 

89 

90def _parse_pfx(pfx_path, passphrase_bytes): 

91 # Cert concepts https://security.stackexchange.com/a/226758/125264 

92 from cryptography.hazmat.primitives.serialization import pkcs12 

93 with open(pfx_path, 'rb') as f: 

94 private_key, cert, _ = pkcs12.load_key_and_certificates( # cryptography 2.5+ 

95 # https://cryptography.io/en/latest/hazmat/primitives/asymmetric/serialization/#cryptography.hazmat.primitives.serialization.pkcs12.load_key_and_certificates 

96 f.read(), passphrase_bytes) 

97 if not (private_key and cert): 

98 raise ValueError("Your PFX file shall contain both private key and cert") 

99 sha256_thumbprint, sha1_thumbprint, x5c = _extract_cert_and_thumbprints(cert) 

100 return private_key, sha256_thumbprint, sha1_thumbprint, x5c 

101 

102 

103def _load_private_key_from_pem_str(private_key_pem_str, passphrase_bytes): 

104 from cryptography.hazmat.primitives import serialization 

105 from cryptography.hazmat.backends import default_backend 

106 return serialization.load_pem_private_key( # cryptography 0.6+ 

107 _str2bytes(private_key_pem_str), 

108 passphrase_bytes, 

109 backend=default_backend(), # It was a required param until 2020 

110 ) 

111 

112 

113def _pii_less_home_account_id(home_account_id): 

114 parts = home_account_id.split(".") # It could contain one or two parts 

115 parts[0] = "********" 

116 return ".".join(parts) 

117 

118 

119def _clean_up(result): 

120 if isinstance(result, dict): 

121 if "_msalruntime_telemetry" in result or "_msal_python_telemetry" in result: 

122 result["msal_telemetry"] = json.dumps({ # Telemetry as an opaque string 

123 "msalruntime_telemetry": result.get("_msalruntime_telemetry"), 

124 "msal_python_telemetry": result.get("_msal_python_telemetry"), 

125 }, separators=(",", ":")) 

126 return_value = { 

127 k: result[k] for k in result 

128 if k != "refresh_in" # MSAL handled refresh_in, customers need not 

129 and not k.startswith('_') # Skim internal properties 

130 } 

131 if "refresh_in" in result: # To encourage proactive refresh 

132 return_value["refresh_on"] = int(time.time() + result["refresh_in"]) 

133 return return_value 

134 return result # It could be None 

135 

136 

137def _preferred_browser(): 

138 """Register Edge and return a name suitable for subsequent webbrowser.get(...) 

139 when appropriate. Otherwise return None. 

140 """ 

141 # On Linux, only Edge will provide device-based Conditional Access support 

142 if sys.platform != "linux": # On other platforms, we have no browser preference 

143 return None 

144 browser_path = "/usr/bin/microsoft-edge" # Use a full path owned by sys admin 

145 # Note: /usr/bin/microsoft-edge, /usr/bin/microsoft-edge-stable, etc. 

146 # are symlinks that point to the actual binaries which are found under 

147 # /opt/microsoft/msedge/msedge or /opt/microsoft/msedge-beta/msedge. 

148 # Either method can be used to detect an Edge installation. 

149 user_has_no_preference = "BROWSER" not in os.environ 

150 user_wont_mind_edge = "microsoft-edge" in os.environ.get("BROWSER", "") # Note: 

151 # BROWSER could contain "microsoft-edge" or "/path/to/microsoft-edge". 

152 # Python documentation (https://docs.python.org/3/library/webbrowser.html) 

153 # does not document the name being implicitly register, 

154 # so there is no public API to know whether the ENV VAR browser would work. 

155 # Therefore, we would not bother examine the env var browser's type. 

156 # We would just register our own Edge instance. 

157 if (user_has_no_preference or user_wont_mind_edge) and os.path.exists(browser_path): 

158 try: 

159 import webbrowser # Lazy import. Some distro may not have this. 

160 browser_name = "msal-edge" # Avoid popular name "microsoft-edge" 

161 # otherwise `BROWSER="microsoft-edge"; webbrowser.get("microsoft-edge")` 

162 # would return a GenericBrowser instance which won't work. 

163 try: 

164 registration_available = isinstance( 

165 webbrowser.get(browser_name), webbrowser.BackgroundBrowser) 

166 except webbrowser.Error: 

167 registration_available = False 

168 if not registration_available: 

169 logger.debug("Register %s with %s", browser_name, browser_path) 

170 # By registering our own browser instance with our own name, 

171 # rather than populating a process-wide BROWSER enn var, 

172 # this approach does not have side effect on non-MSAL code path. 

173 webbrowser.register( # Even double-register happens to work fine 

174 browser_name, None, webbrowser.BackgroundBrowser(browser_path)) 

175 return browser_name 

176 except ImportError: 

177 pass # We may still proceed 

178 return None 

179 

180def _is_ssh_cert_or_pop_request(token_type, auth_scheme) -> bool: 

181 return token_type == "ssh-cert" or token_type == "pop" or isinstance(auth_scheme, msal.auth_scheme.PopAuthScheme) 

182 

183class _ClientWithCcsRoutingInfo(Client): 

184 

185 def initiate_auth_code_flow(self, **kwargs): 

186 if kwargs.get("login_hint"): # eSTS could have utilized this as-is, but nope 

187 kwargs["X-AnchorMailbox"] = "UPN:%s" % kwargs["login_hint"] 

188 return super(_ClientWithCcsRoutingInfo, self).initiate_auth_code_flow( 

189 client_info=1, # To be used as CSS Routing info 

190 **kwargs) 

191 

192 def obtain_token_by_auth_code_flow( 

193 self, auth_code_flow, auth_response, **kwargs): 

194 # Note: the obtain_token_by_browser() is also covered by this 

195 assert isinstance(auth_code_flow, dict) and isinstance(auth_response, dict) 

196 headers = kwargs.pop("headers", {}) 

197 client_info = json.loads( 

198 decode_part(auth_response["client_info"]) 

199 ) if auth_response.get("client_info") else {} 

200 if "uid" in client_info and "utid" in client_info: 

201 # Note: The value of X-AnchorMailbox is also case-insensitive 

202 headers["X-AnchorMailbox"] = "Oid:{uid}@{utid}".format(**client_info) 

203 return super(_ClientWithCcsRoutingInfo, self).obtain_token_by_auth_code_flow( 

204 auth_code_flow, auth_response, headers=headers, **kwargs) 

205 

206 def obtain_token_by_username_password(self, username, password, **kwargs): 

207 headers = kwargs.pop("headers", {}) 

208 headers["X-AnchorMailbox"] = "upn:{}".format(username) 

209 return super(_ClientWithCcsRoutingInfo, self).obtain_token_by_username_password( 

210 username, password, headers=headers, **kwargs) 

211 

212 

213def _msal_extension_check(): 

214 # Can't run this in module or class level otherwise you'll get circular import error 

215 try: 

216 from msal_extensions import __version__ as v 

217 major, minor, _ = v.split(".", maxsplit=3) 

218 if not (int(major) >= 1 and int(minor) >= 2): 

219 warnings.warn( 

220 "Please upgrade msal-extensions. " 

221 "Only msal-extensions 1.2+ can work with msal 1.30+") 

222 except ImportError: 

223 pass # The optional msal_extensions is not installed. Business as usual. 

224 except ValueError: 

225 logger.exception(f"msal_extensions version {v} not in major.minor.patch format") 

226 except: 

227 logger.exception( 

228 "Unable to import msal_extensions during an optional check. " 

229 "This exception can be safely ignored." 

230 ) 

231 

232 

233class ClientApplication(object): 

234 """You do not usually directly use this class. Use its subclasses instead: 

235 :class:`PublicClientApplication` and :class:`ConfidentialClientApplication`. 

236 """ 

237 ACQUIRE_TOKEN_SILENT_ID = "84" 

238 ACQUIRE_TOKEN_BY_REFRESH_TOKEN = "85" 

239 ACQUIRE_TOKEN_BY_USERNAME_PASSWORD_ID = "301" 

240 ACQUIRE_TOKEN_ON_BEHALF_OF_ID = "523" 

241 ACQUIRE_TOKEN_BY_DEVICE_FLOW_ID = "622" 

242 ACQUIRE_TOKEN_FOR_CLIENT_ID = "730" 

243 ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID = "832" 

244 ACQUIRE_TOKEN_INTERACTIVE = "169" 

245 GET_ACCOUNTS_ID = "902" 

246 REMOVE_ACCOUNT_ID = "903" 

247 

248 ATTEMPT_REGION_DISCOVERY = True # "TryAutoDetect" 

249 DISABLE_MSAL_FORCE_REGION = False # Used in azure_region to disable MSAL_FORCE_REGION behavior 

250 _TOKEN_SOURCE = "token_source" 

251 _TOKEN_SOURCE_IDP = "identity_provider" 

252 _TOKEN_SOURCE_CACHE = "cache" 

253 _TOKEN_SOURCE_BROKER = "broker" 

254 

255 _enable_broker = False 

256 _AUTH_SCHEME_UNSUPPORTED = ( 

257 "auth_scheme is currently only available from broker. " 

258 "You can enable broker by following these instructions. " 

259 "https://msal-python.readthedocs.io/en/latest/#publicclientapplication") 

260 

261 def __init__( 

262 self, client_id, 

263 client_credential=None, authority=None, validate_authority=True, 

264 token_cache=None, 

265 http_client=None, 

266 verify=True, proxies=None, timeout=None, 

267 client_claims=None, app_name=None, app_version=None, 

268 client_capabilities=None, 

269 azure_region=None, # Note: We choose to add this param in this base class, 

270 # despite it is currently only needed by ConfidentialClientApplication. 

271 # This way, it holds the same positional param place for PCA, 

272 # when we would eventually want to add this feature to PCA in future. 

273 exclude_scopes=None, 

274 http_cache=None, 

275 instance_discovery=None, 

276 allow_broker=None, 

277 enable_pii_log=None, 

278 oidc_authority=None, 

279 ): 

280 """Create an instance of application. 

281 

282 :param str client_id: Your app has a client_id after you register it on Microsoft Entra admin center. 

283 

284 :param client_credential: 

285 For :class:`PublicClientApplication`, you use `None` here. 

286 

287 For :class:`ConfidentialClientApplication`, 

288 it supports many different input formats for different scenarios. 

289 

290 .. admonition:: Support using a client secret. 

291 

292 Just feed in a string, such as ``"your client secret"``. 

293 

294 .. admonition:: Support using a certificate in X.509 (.pem) format 

295 

296 Deprecated because it uses SHA-1 thumbprint, 

297 unless you are still using ADFS which supports SHA-1 thumbprint only. 

298 Please use the .pfx option documented later in this page. 

299 

300 Feed in a dict in this form:: 

301 

302 { 

303 "private_key": "...-----BEGIN PRIVATE KEY-----... in PEM format", 

304 "thumbprint": "An SHA-1 thumbprint such as A1B2C3D4E5F6..." 

305 "Changed in version 1.35.0, if thumbprint is absent" 

306 "and a public_certificate is present, MSAL will" 

307 "automatically calculate an SHA-256 thumbprint instead.", 

308 "passphrase": "Needed if the private_key is encrypted (Added in version 1.6.0)", 

309 "public_certificate": "...-----BEGIN CERTIFICATE-----...", # Needed if you use Subject Name/Issuer auth. Added in version 0.5.0. 

310 } 

311 

312 MSAL Python requires a "private_key" in PEM format. 

313 If your cert is in PKCS12 (.pfx) format, 

314 you can convert it to X.509 (.pem) format, 

315 by ``openssl pkcs12 -in file.pfx -out file.pem -nodes``. 

316 

317 The thumbprint is available in your app's registration in Azure Portal. 

318 Alternatively, you can `calculate the thumbprint <https://github.com/Azure/azure-sdk-for-python/blob/07d10639d7e47f4852eaeb74aef5d569db499d6e/sdk/identity/azure-identity/azure/identity/_credentials/certificate.py#L94-L97>`_. 

319 

320 ``public_certificate`` (optional) is public key certificate 

321 which will be sent through 'x5c' JWT header. 

322 This is useful when you use `Subject Name/Issuer Authentication 

323 <https://github.com/AzureAD/microsoft-authentication-library-for-python/issues/60>`_ 

324 which is an approach to allow easier certificate rotation. 

325 Per `specs <https://tools.ietf.org/html/rfc7515#section-4.1.6>`_, 

326 "the certificate containing 

327 the public key corresponding to the key used to digitally sign the 

328 JWS MUST be the first certificate. This MAY be followed by 

329 additional certificates, with each subsequent certificate being the 

330 one used to certify the previous one." 

331 However, your certificate's issuer may use a different order. 

332 So, if your attempt ends up with an error AADSTS700027 - 

333 "The provided signature value did not match the expected signature value", 

334 you may try use only the leaf cert (in PEM/str format) instead. 

335 

336 .. admonition:: Supporting raw assertion obtained from elsewhere 

337 

338 *Added in version 1.13.0*: 

339 It can also be a completely pre-signed assertion that you've assembled yourself. 

340 Simply pass a container containing only the key "client_assertion", like this:: 

341 

342 { 

343 "client_assertion": "...a JWT with claims aud, exp, iss, jti, nbf, and sub..." 

344 } 

345 

346 .. admonition:: Supporting reading client certificates from PFX files 

347 

348 This usage will automatically use SHA-256 thumbprint of the certificate. 

349 

350 *Added in version 1.29.0*: 

351 Feed in a dictionary containing the path to a PFX file:: 

352 

353 { 

354 "private_key_pfx_path": "/path/to/your.pfx", # Added in version 1.29.0 

355 "public_certificate": True, # Only needed if you use Subject Name/Issuer auth. Added in version 1.30.0 

356 "passphrase": "Passphrase if the private_key is encrypted (Optional)", 

357 } 

358 

359 The following command will generate a .pfx file from your .key and .pem file:: 

360 

361 openssl pkcs12 -export -out certificate.pfx -inkey privateKey.key -in certificate.pem 

362 

363 `Subject Name/Issuer Auth 

364 <https://github.com/AzureAD/microsoft-authentication-library-for-python/issues/60>`_ 

365 is an approach to allow easier certificate rotation. 

366 If your .pfx file contains both the private key and public cert, 

367 you can opt in for Subject Name/Issuer Auth by setting "public_certificate" to ``True``. 

368 

369 :type client_credential: Union[dict, str, None] 

370 

371 :param dict client_claims: 

372 *Added in version 0.5.0*: 

373 It is a dictionary of extra claims that would be signed by 

374 by this :class:`ConfidentialClientApplication` 's private key. 

375 For example, you can use {"client_ip": "x.x.x.x"}. 

376 You may also override any of the following default claims:: 

377 

378 { 

379 "aud": the_token_endpoint, 

380 "iss": self.client_id, 

381 "sub": same_as_issuer, 

382 "exp": now + 10_min, 

383 "iat": now, 

384 "jti": a_random_uuid 

385 } 

386 

387 :param str authority: 

388 A URL that identifies a token authority. It should be of the format 

389 ``https://login.microsoftonline.com/your_tenant`` 

390 By default, we will use ``https://login.microsoftonline.com/common`` 

391 

392 *Changed in version 1.17*: you can also use predefined constant 

393 and a builder like this:: 

394 

395 from msal.authority import ( 

396 AuthorityBuilder, 

397 AZURE_US_GOVERNMENT, AZURE_CHINA, AZURE_PUBLIC) 

398 my_authority = AuthorityBuilder(AZURE_PUBLIC, "contoso.onmicrosoft.com") 

399 # Now you get an equivalent of 

400 # "https://login.microsoftonline.com/contoso.onmicrosoft.com" 

401 

402 # You can feed such an authority to msal's ClientApplication 

403 from msal import PublicClientApplication 

404 app = PublicClientApplication("my_client_id", authority=my_authority, ...) 

405 

406 :param bool validate_authority: (optional) Turns authority validation 

407 on or off. This parameter default to true. 

408 :param TokenCache token_cache: 

409 Sets the token cache used by this ClientApplication instance. 

410 By default, an in-memory cache will be created and used. 

411 :param http_client: (optional) 

412 Your implementation of abstract class HttpClient <msal.oauth2cli.http.http_client> 

413 Defaults to a requests session instance. 

414 Since MSAL 1.11.0, the default session would be configured 

415 to attempt one retry on connection error. 

416 If you are providing your own http_client, 

417 it will be your http_client's duty to decide whether to perform retry. 

418 

419 :param verify: (optional) 

420 It will be passed to the 

421 `verify parameter in the underlying requests library 

422 <http://docs.python-requests.org/en/v2.9.1/user/advanced/#ssl-cert-verification>`_ 

423 This does not apply if you have chosen to pass your own Http client 

424 :param proxies: (optional) 

425 It will be passed to the 

426 `proxies parameter in the underlying requests library 

427 <http://docs.python-requests.org/en/v2.9.1/user/advanced/#proxies>`_ 

428 This does not apply if you have chosen to pass your own Http client 

429 :param timeout: (optional) 

430 It will be passed to the 

431 `timeout parameter in the underlying requests library 

432 <http://docs.python-requests.org/en/v2.9.1/user/advanced/#timeouts>`_ 

433 This does not apply if you have chosen to pass your own Http client 

434 :param app_name: (optional) 

435 You can provide your application name for Microsoft telemetry purposes. 

436 Default value is None, means it will not be passed to Microsoft. 

437 :param app_version: (optional) 

438 You can provide your application version for Microsoft telemetry purposes. 

439 Default value is None, means it will not be passed to Microsoft. 

440 :param list[str] client_capabilities: (optional) 

441 Allows configuration of one or more client capabilities, e.g. ["CP1"]. 

442 

443 Client capability is meant to inform the Microsoft identity platform 

444 (STS) what this client is capable for, 

445 so STS can decide to turn on certain features. 

446 For example, if client is capable to handle *claims challenge*, 

447 STS may issue 

448 `Continuous Access Evaluation (CAE) <https://learn.microsoft.com/entra/identity/conditional-access/concept-continuous-access-evaluation>`_ 

449 access tokens to resources, 

450 knowing that when the resource emits a *claims challenge* 

451 the client will be able to handle those challenges. 

452 

453 Implementation details: 

454 Client capability is implemented using "claims" parameter on the wire, 

455 for now. 

456 MSAL will combine them into 

457 `claims parameter <https://openid.net/specs/openid-connect-core-1_0-final.html#ClaimsParameter>`_ 

458 which you will later provide via one of the acquire-token request. 

459 

460 :param str azure_region: (optional) 

461 Instructs MSAL to use the Entra regional token service. This legacy feature is only available to 

462 first-party applications. Only ``acquire_token_for_client()`` is supported. 

463 

464 Supports 4 values: 

465 

466 1. ``azure_region=None`` - This default value means no region is configured. 

467 MSAL will use the region defined in env var ``MSAL_FORCE_REGION``. 

468 2. ``azure_region="some_region"`` - meaning the specified region is used. 

469 3. ``azure_region=True`` - meaning 

470 MSAL will try to auto-detect the region. This is not recommended. 

471 4. ``azure_region=False`` - meaning MSAL will use no region. 

472 

473 .. note:: 

474 Region auto-discovery has been tested on VMs and on Azure Functions. It is unreliable. 

475 Applications using this option should configure a short timeout. 

476 

477 For more details and for the values of the region string 

478 see https://learn.microsoft.com/entra/msal/dotnet/resources/region-discovery-troubleshooting 

479 

480 New in version 1.12.0. 

481 

482 :param list[str] exclude_scopes: (optional) 

483 Historically MSAL hardcodes `offline_access` scope, 

484 which would allow your app to have prolonged access to user's data. 

485 If that is unnecessary or undesirable for your app, 

486 now you can use this parameter to supply an exclusion list of scopes, 

487 such as ``exclude_scopes = ["offline_access"]``. 

488 

489 :param dict http_cache: 

490 MSAL has long been caching tokens in the ``token_cache``. 

491 Recently, MSAL also introduced a concept of ``http_cache``, 

492 by automatically caching some finite amount of non-token http responses, 

493 so that *long-lived* 

494 ``PublicClientApplication`` and ``ConfidentialClientApplication`` 

495 would be more performant and responsive in some situations. 

496 

497 This ``http_cache`` parameter accepts any dict-like object. 

498 If not provided, MSAL will use an in-memory dict. 

499 

500 If your app is a command-line app (CLI), 

501 you would want to persist your http_cache across different CLI runs. 

502 The persisted file's format may change due to, but not limited to, 

503 `unstable protocol <https://docs.python.org/3/library/pickle.html#data-stream-format>`_, 

504 so your implementation shall tolerate unexpected loading errors. 

505 The following recipe shows a way to do so:: 

506 

507 # Just add the following lines at the beginning of your CLI script 

508 import sys, atexit, pickle, logging 

509 http_cache_filename = sys.argv[0] + ".http_cache" 

510 try: 

511 with open(http_cache_filename, "rb") as f: 

512 persisted_http_cache = pickle.load(f) # Take a snapshot 

513 except ( 

514 FileNotFoundError, # Or IOError in Python 2 

515 pickle.UnpicklingError, # A corrupted http cache file 

516 AttributeError, # Cache created by a different version of MSAL 

517 ): 

518 persisted_http_cache = {} # Recover by starting afresh 

519 except: # Unexpected exceptions 

520 logging.exception("You may want to debug this") 

521 persisted_http_cache = {} # Recover by starting afresh 

522 atexit.register(lambda: pickle.dump( 

523 # When exit, flush it back to the file. 

524 # It may occasionally overwrite another process's concurrent write, 

525 # but that is fine. Subsequent runs will reach eventual consistency. 

526 persisted_http_cache, open(http_cache_file, "wb"))) 

527 

528 # And then you can implement your app as you normally would 

529 app = msal.PublicClientApplication( 

530 "your_client_id", 

531 ..., 

532 http_cache=persisted_http_cache, # Utilize persisted_http_cache 

533 ..., 

534 #token_cache=..., # You may combine the old token_cache trick 

535 # Please refer to token_cache recipe at 

536 # https://msal-python.readthedocs.io/en/latest/#msal.SerializableTokenCache 

537 ) 

538 app.acquire_token_interactive(["your", "scope"], ...) 

539 

540 Content inside ``http_cache`` are cheap to obtain. 

541 There is no need to share them among different apps. 

542 

543 Content inside ``http_cache`` will contain no tokens nor 

544 Personally Identifiable Information (PII). Encryption is unnecessary. 

545 

546 New in version 1.16.0. 

547 

548 :param boolean instance_discovery: 

549 Historically, MSAL would connect to a central endpoint located at 

550 ``https://login.microsoftonline.com`` to acquire some metadata, 

551 especially when using an unfamiliar authority. 

552 This behavior is known as Instance Discovery. 

553 

554 This parameter defaults to None, which enables the Instance Discovery. 

555 

556 If you know some authorities which you allow MSAL to operate with as-is, 

557 without involving any Instance Discovery, the recommended pattern is:: 

558 

559 known_authorities = frozenset([ # Treat your known authorities as const 

560 "https://contoso.com/adfs", "https://login.azs/foo"]) 

561 ... 

562 authority = "https://contoso.com/adfs" # Assuming your app will use this 

563 app1 = PublicClientApplication( 

564 "client_id", 

565 authority=authority, 

566 # Conditionally disable Instance Discovery for known authorities 

567 instance_discovery=authority not in known_authorities, 

568 ) 

569 

570 If you do not know some authorities beforehand, 

571 yet still want MSAL to accept any authority that you will provide, 

572 you can use a ``False`` to unconditionally disable Instance Discovery. 

573 

574 New in version 1.19.0. 

575 

576 :param boolean allow_broker: 

577 Deprecated. Please use ``enable_broker_on_windows`` instead. 

578 

579 :param boolean enable_pii_log: 

580 When enabled, logs may include PII (Personal Identifiable Information). 

581 This can be useful in troubleshooting broker behaviors. 

582 The default behavior is False. 

583 

584 New in version 1.24.0. 

585 

586 :param str oidc_authority: 

587 *Added in version 1.28.0*: 

588 It is a URL that identifies an OpenID Connect (OIDC) authority of 

589 the format ``https://contoso.com/tenant``. 

590 MSAL will append ".well-known/openid-configuration" to the authority 

591 and retrieve the OIDC metadata from there, to figure out the endpoints. 

592 

593 Note: Broker will NOT be used for OIDC authority. 

594 """ 

595 self.client_id = client_id 

596 self.client_credential = client_credential 

597 self.client_claims = client_claims 

598 self._client_capabilities = client_capabilities 

599 self._instance_discovery = instance_discovery 

600 

601 if exclude_scopes and not isinstance(exclude_scopes, list): 

602 raise ValueError( 

603 "Invalid exclude_scopes={}. It need to be a list of strings.".format( 

604 repr(exclude_scopes))) 

605 self._exclude_scopes = frozenset(exclude_scopes or []) 

606 if "openid" in self._exclude_scopes: 

607 raise ValueError( 

608 'Invalid exclude_scopes={}. You can not opt out "openid" scope'.format( 

609 repr(exclude_scopes))) 

610 

611 if http_client: 

612 self.http_client = http_client 

613 else: 

614 import requests # Lazy load 

615 

616 self.http_client = requests.Session() 

617 self.http_client.verify = verify 

618 self.http_client.proxies = proxies 

619 # Requests, does not support session - wide timeout 

620 # But you can patch that (https://github.com/psf/requests/issues/3341): 

621 self.http_client.request = functools.partial( 

622 self.http_client.request, timeout=timeout) 

623 

624 # Enable a minimal retry. Better than nothing. 

625 # https://github.com/psf/requests/blob/v2.25.1/requests/adapters.py#L94-L108 

626 a = requests.adapters.HTTPAdapter(max_retries=1) 

627 self.http_client.mount("http://", a) 

628 self.http_client.mount("https://", a) 

629 self.http_client = ThrottledHttpClient( 

630 self.http_client, 

631 http_cache=http_cache, 

632 default_throttle_time=60 

633 # The default value 60 was recommended mainly for PCA at the end of 

634 # https://identitydivision.visualstudio.com/devex/_git/AuthLibrariesApiReview?version=GBdev&path=%2FService%20protection%2FIntial%20set%20of%20protection%20measures.md&_a=preview 

635 if isinstance(self, PublicClientApplication) else 5, 

636 ) 

637 

638 self.app_name = app_name 

639 self.app_version = app_version 

640 

641 # Here the self.authority will not be the same type as authority in input 

642 if oidc_authority and authority: 

643 raise ValueError("You can not provide both authority and oidc_authority") 

644 if isinstance(authority, str) and urlparse(authority).path.startswith( 

645 "/dstsv2"): # dSTS authority's path always starts with "/dstsv2" 

646 oidc_authority = authority # So we treat it as if an oidc_authority 

647 try: 

648 authority_to_use = authority or "https://{}/common/".format(WORLD_WIDE) 

649 self.authority = Authority( 

650 authority_to_use, 

651 self.http_client, 

652 validate_authority=validate_authority, 

653 instance_discovery=self._instance_discovery, 

654 oidc_authority_url=oidc_authority, 

655 ) 

656 except ValueError: # Those are explicit authority validation errors 

657 raise 

658 except Exception: # The rest are typically connection errors 

659 if validate_authority and not oidc_authority and ( 

660 azure_region # Opted in to use region 

661 or (azure_region is None and os.getenv("MSAL_FORCE_REGION")) # Will use region 

662 ): 

663 # Since caller opts in to use region, here we tolerate connection 

664 # errors happened during authority validation at non-region endpoint 

665 self.authority = Authority( 

666 authority_to_use, 

667 self.http_client, 

668 instance_discovery=False, 

669 ) 

670 else: 

671 raise 

672 

673 self._decide_broker(allow_broker, enable_pii_log) 

674 self.token_cache = token_cache or TokenCache() 

675 self._region_configured = azure_region 

676 self._region_detected = None 

677 self.client, self._regional_client = self._build_client( 

678 client_credential, self.authority) 

679 self.authority_groups = {} 

680 self._telemetry_buffer = {} 

681 self._telemetry_lock = Lock() 

682 _msal_extension_check() 

683 

684 

685 def _decide_broker(self, allow_broker, enable_pii_log): 

686 is_confidential_app = self.client_credential or isinstance( 

687 self, ConfidentialClientApplication) 

688 if is_confidential_app and allow_broker: 

689 raise ValueError("allow_broker=True is only supported in PublicClientApplication") 

690 # Historically, we chose to support ClientApplication("client_id", allow_broker=True) 

691 if allow_broker: 

692 warnings.warn( 

693 "allow_broker is deprecated. " 

694 "Please use PublicClientApplication(..., " 

695 "enable_broker_on_windows=True, " 

696 # No need to mention non-Windows platforms, because allow_broker is only for Windows 

697 "...)", 

698 DeprecationWarning) 

699 opted_in_for_broker = ( 

700 self._enable_broker # True means Opted-in from PCA 

701 or ( 

702 # When we started the broker project on Windows platform, 

703 # the allow_broker was meant to be cross-platform. Now we realize 

704 # that other platforms have different redirect_uri requirements, 

705 # so the old allow_broker is deprecated and will only for Windows. 

706 allow_broker and sys.platform == "win32") 

707 ) 

708 self._enable_broker = ( # This same variable will also store the state 

709 opted_in_for_broker 

710 and not is_confidential_app 

711 and not self.authority.is_adfs 

712 and not self.authority._is_b2c 

713 ) 

714 if self._enable_broker: 

715 try: 

716 _init_broker(enable_pii_log) 

717 except RuntimeError: 

718 self._enable_broker = False 

719 logger.warning( # It is common on Mac and Linux where broker is not built-in 

720 "Broker is unavailable on this platform. " 

721 "We will fallback to non-broker.") 

722 logger.debug("Broker enabled? %s", self._enable_broker) 

723 

724 def is_pop_supported(self): 

725 """Returns True if this client supports Proof-of-Possession Access Token.""" 

726 return self._enable_broker and sys.platform in ("win32", "darwin") 

727 

728 def _decorate_scope( 

729 self, scopes, 

730 reserved_scope=frozenset(['openid', 'profile', 'offline_access'])): 

731 if not isinstance(scopes, (list, set, tuple)): 

732 raise ValueError("The input scopes should be a list, tuple, or set") 

733 scope_set = set(scopes) # Input scopes is typically a list. Copy it to a set. 

734 if scope_set & reserved_scope: 

735 # These scopes are reserved for the API to provide good experience. 

736 # We could make the developer pass these and then if they do they will 

737 # come back asking why they don't see refresh token or user information. 

738 raise ValueError( 

739 """You cannot use any scope value that is reserved. 

740Your input: {} 

741The reserved list: {}""".format(list(scope_set), list(reserved_scope))) 

742 raise ValueError( 

743 "You cannot use any scope value that is in this reserved list: {}".format( 

744 list(reserved_scope))) 

745 

746 # client_id can also be used as a scope in B2C 

747 decorated = scope_set | reserved_scope 

748 decorated -= self._exclude_scopes 

749 return list(decorated) 

750 

751 def _build_telemetry_context( 

752 self, api_id, correlation_id=None, refresh_reason=None): 

753 return msal.telemetry._TelemetryContext( 

754 self._telemetry_buffer, self._telemetry_lock, api_id, 

755 correlation_id=correlation_id, refresh_reason=refresh_reason) 

756 

757 def _get_regional_authority(self, central_authority) -> Optional[Authority]: 

758 if self._region_configured is False: # User opts out of ESTS-R 

759 return None # Short circuit to completely bypass region detection 

760 if self._region_configured is None: # User did not make an ESTS-R choice 

761 self._region_configured = os.getenv("MSAL_FORCE_REGION") or None 

762 self._region_detected = self._region_detected or _detect_region( 

763 self.http_client if self._region_configured is not None else None) 

764 if (self._region_configured != self.ATTEMPT_REGION_DISCOVERY 

765 and self._region_configured != self._region_detected): 

766 logger.warning('Region configured ({}) != region detected ({})'.format( 

767 repr(self._region_configured), repr(self._region_detected))) 

768 region_to_use = ( 

769 self._region_detected 

770 if self._region_configured == self.ATTEMPT_REGION_DISCOVERY 

771 else self._region_configured) # It will retain the None i.e. opted out 

772 logger.debug('Region to be used: {}'.format(repr(region_to_use))) 

773 if region_to_use: 

774 regional_host = ("{}.login.microsoft.com".format(region_to_use) 

775 if central_authority.instance in ( 

776 # The list came from point 3 of the algorithm section in this internal doc 

777 # https://identitydivision.visualstudio.com/DevEx/_git/AuthLibrariesApiReview?path=/PinAuthToRegion/AAD%20SDK%20Proposal%20to%20Pin%20Auth%20to%20region.md&anchor=algorithm&_a=preview 

778 "login.microsoftonline.com", 

779 "login.microsoft.com", 

780 "login.windows.net", 

781 "sts.windows.net", 

782 ) 

783 else "{}.{}".format(region_to_use, central_authority.instance)) 

784 return Authority( # The central_authority has already been validated 

785 "https://{}/{}".format(regional_host, central_authority.tenant), 

786 self.http_client, 

787 instance_discovery=False, 

788 ) 

789 return None 

790 

791 def _build_client(self, client_credential, authority, skip_regional_client=False): 

792 client_assertion = None 

793 client_assertion_type = None 

794 default_headers = { 

795 "x-client-sku": SKU, "x-client-ver": __version__, 

796 "x-client-os": sys.platform, 

797 "x-ms-lib-capability": "retry-after, h429", 

798 } 

799 if self.app_name: 

800 default_headers['x-app-name'] = self.app_name 

801 if self.app_version: 

802 default_headers['x-app-ver'] = self.app_version 

803 default_body = {"client_info": 1} 

804 if isinstance(client_credential, dict): 

805 client_assertion_type = Client.CLIENT_ASSERTION_TYPE_JWT 

806 # Use client_credential.get("...") rather than "..." in client_credential 

807 # so that we can ignore an empty string came from an empty ENV VAR. 

808 if client_credential.get("client_assertion"): 

809 client_assertion = client_credential['client_assertion'] 

810 else: 

811 headers = {} 

812 sha1_thumbprint = sha256_thumbprint = None 

813 passphrase_bytes = _str2bytes( 

814 client_credential["passphrase"] 

815 ) if client_credential.get("passphrase") else None 

816 if client_credential.get("private_key_pfx_path"): 

817 private_key, sha256_thumbprint, sha1_thumbprint, x5c = _parse_pfx( 

818 client_credential["private_key_pfx_path"], 

819 passphrase_bytes) 

820 if client_credential.get("public_certificate") is True and x5c: 

821 headers["x5c"] = x5c 

822 elif client_credential.get("private_key"): # PEM blob 

823 private_key = ( # handles both encrypted and unencrypted 

824 _load_private_key_from_pem_str( 

825 client_credential['private_key'], passphrase_bytes) 

826 if passphrase_bytes 

827 else client_credential['private_key'] 

828 ) 

829 

830 # Determine thumbprints based on what's provided 

831 if client_credential.get("thumbprint"): 

832 # User provided a thumbprint - use it as SHA-1 (legacy/manual approach) 

833 sha1_thumbprint = client_credential["thumbprint"] 

834 sha256_thumbprint = None 

835 elif isinstance(client_credential.get('public_certificate'), str): 

836 # No thumbprint provided, but we have a certificate to calculate thumbprints 

837 from cryptography import x509 

838 cert = x509.load_pem_x509_certificate( 

839 _str2bytes(client_credential['public_certificate'])) 

840 sha256_thumbprint, sha1_thumbprint, headers["x5c"] = ( 

841 _extract_cert_and_thumbprints(cert)) 

842 else: 

843 raise ValueError( 

844 "You must provide either 'thumbprint' or 'public_certificate' " 

845 "from which the thumbprint can be calculated.") 

846 else: 

847 raise ValueError( 

848 "client_credential needs to follow this format " 

849 "https://msal-python.readthedocs.io/en/latest/#msal.ClientApplication.params.client_credential") 

850 if ("x5c" not in headers # So the .pfx file contains no certificate 

851 and isinstance(client_credential.get('public_certificate'), str) 

852 ): # Then we treat the public_certificate value as PEM content 

853 headers["x5c"] = extract_certs(client_credential['public_certificate']) 

854 if sha256_thumbprint and not authority.is_adfs: 

855 assertion_params = { 

856 "algorithm": "PS256", "sha256_thumbprint": sha256_thumbprint, 

857 } 

858 else: # Fall back 

859 if not sha1_thumbprint: 

860 raise ValueError("You shall provide a thumbprint in SHA1.") 

861 assertion_params = { 

862 "algorithm": "RS256", "sha1_thumbprint": sha1_thumbprint, 

863 } 

864 assertion = JwtAssertionCreator( 

865 private_key, headers=headers, **assertion_params) 

866 client_assertion = assertion.create_regenerative_assertion( 

867 audience=authority.token_endpoint, issuer=self.client_id, 

868 additional_claims=self.client_claims or {}) 

869 else: 

870 default_body['client_secret'] = client_credential 

871 central_configuration = { 

872 "authorization_endpoint": authority.authorization_endpoint, 

873 "token_endpoint": authority.token_endpoint, 

874 "device_authorization_endpoint": authority.device_authorization_endpoint, 

875 } 

876 central_client = _ClientWithCcsRoutingInfo( 

877 central_configuration, 

878 self.client_id, 

879 http_client=self.http_client, 

880 default_headers=default_headers, 

881 default_body=default_body, 

882 client_assertion=client_assertion, 

883 client_assertion_type=client_assertion_type, 

884 on_obtaining_tokens=lambda event: self.token_cache.add(dict( 

885 event, environment=authority.instance)), 

886 on_removing_rt=self.token_cache.remove_rt, 

887 on_updating_rt=self.token_cache.update_rt) 

888 

889 regional_client = None 

890 if (client_credential # Currently regional endpoint only serves some CCA flows 

891 and not skip_regional_client): 

892 regional_authority = self._get_regional_authority(authority) 

893 if regional_authority: 

894 regional_configuration = { 

895 "authorization_endpoint": regional_authority.authorization_endpoint, 

896 "token_endpoint": regional_authority.token_endpoint, 

897 "device_authorization_endpoint": 

898 regional_authority.device_authorization_endpoint, 

899 } 

900 regional_client = _ClientWithCcsRoutingInfo( 

901 regional_configuration, 

902 self.client_id, 

903 http_client=self.http_client, 

904 default_headers=default_headers, 

905 default_body=default_body, 

906 client_assertion=client_assertion, 

907 client_assertion_type=client_assertion_type, 

908 on_obtaining_tokens=lambda event: self.token_cache.add(dict( 

909 event, environment=authority.instance)), 

910 on_removing_rt=self.token_cache.remove_rt, 

911 on_updating_rt=self.token_cache.update_rt) 

912 return central_client, regional_client 

913 

914 def initiate_auth_code_flow( 

915 self, 

916 scopes, # type: list[str] 

917 redirect_uri=None, 

918 state=None, # Recommended by OAuth2 for CSRF protection 

919 prompt=None, 

920 login_hint=None, # type: Optional[str] 

921 domain_hint=None, # type: Optional[str] 

922 claims_challenge=None, 

923 max_age=None, 

924 response_mode=None, # type: Optional[str] 

925 ): 

926 """Initiate an auth code flow. 

927 

928 Later when the response reaches your redirect_uri, 

929 you can use :func:`~acquire_token_by_auth_code_flow()` 

930 to complete the authentication/authorization. 

931 

932 :param list scopes: 

933 It is a list of case-sensitive strings. 

934 :param str redirect_uri: 

935 Optional. If not specified, server will use the pre-registered one. 

936 :param str state: 

937 An opaque value used by the client to 

938 maintain state between the request and callback. 

939 If absent, this library will automatically generate one internally. 

940 :param str prompt: 

941 By default, no prompt value will be sent, not even string ``"none"``. 

942 You will have to specify a value explicitly. 

943 Its valid values are the constants defined in 

944 :class:`Prompt <msal.Prompt>`. 

945 

946 :param str login_hint: 

947 Optional. Identifier of the user. Generally a User Principal Name (UPN). 

948 :param domain_hint: 

949 Can be one of "consumers" or "organizations" or your tenant domain "contoso.com". 

950 If included, it will skip the email-based discovery process that user goes 

951 through on the sign-in page, leading to a slightly more streamlined user experience. 

952 More information on possible values available in 

953 `Auth Code Flow doc <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-auth-code-flow#request-an-authorization-code>`_ and 

954 `domain_hint doc <https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-oapx/86fb452d-e34a-494e-ac61-e526e263b6d8>`_. 

955 

956 :param int max_age: 

957 OPTIONAL. Maximum Authentication Age. 

958 Specifies the allowable elapsed time in seconds 

959 since the last time the End-User was actively authenticated. 

960 If the elapsed time is greater than this value, 

961 Microsoft identity platform will actively re-authenticate the End-User. 

962 

963 MSAL Python will also automatically validate the auth_time in ID token. 

964 

965 New in version 1.15. 

966 

967 :param str response_mode: 

968 OPTIONAL. Specifies the method with which response parameters should be returned. 

969 The default value is equivalent to ``query``, which was still secure enough in MSAL Python 

970 (because MSAL Python does not transfer tokens via query parameter in the first place). 

971 For even better security, we recommend using the value ``form_post``. 

972 In "form_post" mode, response parameters 

973 will be encoded as HTML form values that are transmitted via the HTTP POST method and 

974 encoded in the body using the application/x-www-form-urlencoded format. 

975 Valid values can be either "form_post" for HTTP POST to callback URI or 

976 "query" (the default) for HTTP GET with parameters encoded in query string. 

977 More information on possible values 

978 `here <https://openid.net/specs/oauth-v2-multiple-response-types-1_0.html#ResponseModes>` 

979 and `here <https://openid.net/specs/oauth-v2-form-post-response-mode-1_0.html#FormPostResponseMode>` 

980 

981 .. note:: 

982 You should configure your web framework to accept form_post responses instead of query responses. 

983 While this parameter still works, it will be removed in a future version. 

984 Using query-based response modes is less secure and should be avoided. 

985 

986 :return: 

987 The auth code flow. It is a dict in this form:: 

988 

989 { 

990 "auth_uri": "https://...", // Guide user to visit this 

991 "state": "...", // You may choose to verify it by yourself, 

992 // or just let acquire_token_by_auth_code_flow() 

993 // do that for you. 

994 "...": "...", // Everything else are reserved and internal 

995 } 

996 

997 The caller is expected to: 

998 

999 1. somehow store this content, typically inside the current session, 

1000 2. guide the end user (i.e. resource owner) to visit that auth_uri, 

1001 3. and then relay this dict and subsequent auth response to 

1002 :func:`~acquire_token_by_auth_code_flow()`. 

1003 """ 

1004 # Note to maintainers: Do not emit warning for the use of response_mode here, 

1005 # because response_mode=form_post is still the recommended usage for MSAL Python 1.x. 

1006 # App developers making the right call shall not be disturbed by unactionable warnings. 

1007 client = _ClientWithCcsRoutingInfo( 

1008 {"authorization_endpoint": self.authority.authorization_endpoint}, 

1009 self.client_id, 

1010 http_client=self.http_client) 

1011 flow = client.initiate_auth_code_flow( 

1012 redirect_uri=redirect_uri, state=state, login_hint=login_hint, 

1013 prompt=prompt, 

1014 scope=self._decorate_scope(scopes), 

1015 domain_hint=domain_hint, 

1016 claims=_merge_claims_challenge_and_capabilities( 

1017 self._client_capabilities, claims_challenge), 

1018 max_age=max_age, 

1019 response_mode=response_mode, 

1020 ) 

1021 flow["claims_challenge"] = claims_challenge 

1022 return flow 

1023 

1024 def get_authorization_request_url( 

1025 self, 

1026 scopes, # type: list[str] 

1027 login_hint=None, # type: Optional[str] 

1028 state=None, # Recommended by OAuth2 for CSRF protection 

1029 redirect_uri=None, 

1030 response_type="code", # Could be "token" if you use Implicit Grant 

1031 prompt=None, 

1032 nonce=None, 

1033 domain_hint=None, # type: Optional[str] 

1034 claims_challenge=None, 

1035 **kwargs): 

1036 """Constructs a URL for you to start a Authorization Code Grant. 

1037 

1038 :param list[str] scopes: (Required) 

1039 Scopes requested to access a protected API (a resource). 

1040 :param str state: Recommended by OAuth2 for CSRF protection. 

1041 :param str login_hint: 

1042 Identifier of the user. Generally a User Principal Name (UPN). 

1043 :param str redirect_uri: 

1044 Address to return to upon receiving a response from the authority. 

1045 :param str response_type: 

1046 Default value is "code" for an OAuth2 Authorization Code grant. 

1047 

1048 You could use other content such as "id_token" or "token", 

1049 which would trigger an Implicit Grant, but that is 

1050 `not recommended <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-implicit-grant-flow#is-the-implicit-grant-suitable-for-my-app>`_. 

1051 

1052 :param str prompt: 

1053 By default, no prompt value will be sent, not even string ``"none"``. 

1054 You will have to specify a value explicitly. 

1055 Its valid values are the constants defined in 

1056 :class:`Prompt <msal.Prompt>`. 

1057 :param nonce: 

1058 A cryptographically random value used to mitigate replay attacks. See also 

1059 `OIDC specs <https://openid.net/specs/openid-connect-core-1_0.html#AuthRequest>`_. 

1060 :param domain_hint: 

1061 Can be one of "consumers" or "organizations" or your tenant domain "contoso.com". 

1062 If included, it will skip the email-based discovery process that user goes 

1063 through on the sign-in page, leading to a slightly more streamlined user experience. 

1064 More information on possible values available in 

1065 `Auth Code Flow doc <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-auth-code-flow#request-an-authorization-code>`_ and 

1066 `domain_hint doc <https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-oapx/86fb452d-e34a-494e-ac61-e526e263b6d8>`_. 

1067 :param claims_challenge: 

1068 The claims_challenge parameter requests specific claims requested by the resource provider 

1069 in the form of a claims_challenge directive in the www-authenticate header to be 

1070 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

1071 It is a string of a JSON object which contains lists of claims being requested from these locations. 

1072 

1073 :return: The authorization url as a string. 

1074 """ 

1075 authority = kwargs.pop("authority", None) # Historically we support this 

1076 if authority: 

1077 warnings.warn( 

1078 "We haven't decided if this method will accept authority parameter") 

1079 # The previous implementation is, it will use self.authority by default. 

1080 # Multi-tenant app can use new authority on demand 

1081 the_authority = Authority( 

1082 authority, 

1083 self.http_client, 

1084 instance_discovery=self._instance_discovery, 

1085 ) if authority else self.authority 

1086 

1087 client = _ClientWithCcsRoutingInfo( 

1088 {"authorization_endpoint": the_authority.authorization_endpoint}, 

1089 self.client_id, 

1090 http_client=self.http_client) 

1091 warnings.warn( 

1092 "Change your get_authorization_request_url() " 

1093 "to initiate_auth_code_flow()", DeprecationWarning) 

1094 with warnings.catch_warnings(record=True): 

1095 return client.build_auth_request_uri( 

1096 response_type=response_type, 

1097 redirect_uri=redirect_uri, state=state, login_hint=login_hint, 

1098 prompt=prompt, 

1099 scope=self._decorate_scope(scopes), 

1100 nonce=nonce, 

1101 domain_hint=domain_hint, 

1102 claims=_merge_claims_challenge_and_capabilities( 

1103 self._client_capabilities, claims_challenge), 

1104 ) 

1105 

1106 def acquire_token_by_auth_code_flow( 

1107 self, auth_code_flow, auth_response, scopes=None, **kwargs): 

1108 """Validate the auth response being redirected back, and obtain tokens. 

1109 

1110 It automatically provides nonce protection. 

1111 

1112 :param dict auth_code_flow: 

1113 The same dict returned by :func:`~initiate_auth_code_flow()`. 

1114 :param dict auth_response: 

1115 A dict of the query string received from auth server. 

1116 :param list[str] scopes: 

1117 Scopes requested to access a protected API (a resource). 

1118 

1119 Most of the time, you can leave it empty. 

1120 

1121 If you requested user consent for multiple resources, here you will 

1122 need to provide a subset of what you required in 

1123 :func:`~initiate_auth_code_flow()`. 

1124 

1125 OAuth2 was designed mostly for singleton services, 

1126 where tokens are always meant for the same resource and the only 

1127 changes are in the scopes. 

1128 In Microsoft Entra, tokens can be issued for multiple 3rd party resources. 

1129 You can ask authorization code for multiple resources, 

1130 but when you redeem it, the token is for only one intended 

1131 recipient, called audience. 

1132 So the developer need to specify a scope so that we can restrict the 

1133 token to be issued for the corresponding audience. 

1134 

1135 :return: 

1136 * A dict containing "access_token" and/or "id_token", among others, 

1137 depends on what scope was used. 

1138 (See https://tools.ietf.org/html/rfc6749#section-5.1) 

1139 * A dict containing "error", optionally "error_description", "error_uri". 

1140 (It is either `this <https://tools.ietf.org/html/rfc6749#section-4.1.2.1>`_ 

1141 or `that <https://tools.ietf.org/html/rfc6749#section-5.2>`_) 

1142 * Most client-side data error would result in ValueError exception. 

1143 So the usage pattern could be without any protocol details:: 

1144 

1145 def authorize(): # A controller in a web app 

1146 try: 

1147 result = msal_app.acquire_token_by_auth_code_flow( 

1148 session.get("flow", {}), request.args) 

1149 if "error" in result: 

1150 return render_template("error.html", result) 

1151 use(result) # Token(s) are available in result and cache 

1152 except ValueError: # Usually caused by CSRF 

1153 pass # Simply ignore them 

1154 return redirect(url_for("index")) 

1155 """ 

1156 self._validate_ssh_cert_input_data(kwargs.get("data", {})) 

1157 telemetry_context = self._build_telemetry_context( 

1158 self.ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID) 

1159 response = _clean_up(self.client.obtain_token_by_auth_code_flow( 

1160 auth_code_flow, 

1161 auth_response, 

1162 scope=self._decorate_scope(scopes) if scopes else None, 

1163 headers=telemetry_context.generate_headers(), 

1164 data=dict( 

1165 kwargs.pop("data", {}), 

1166 claims=_merge_claims_challenge_and_capabilities( 

1167 self._client_capabilities, 

1168 auth_code_flow.pop("claims_challenge", None))), 

1169 **kwargs)) 

1170 if "access_token" in response: 

1171 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

1172 telemetry_context.update_telemetry(response) 

1173 return response 

1174 

1175 def acquire_token_by_authorization_code( 

1176 self, 

1177 code, 

1178 scopes, # Syntactically required. STS accepts empty value though. 

1179 redirect_uri=None, 

1180 # REQUIRED, if the "redirect_uri" parameter was included in the 

1181 # authorization request as described in Section 4.1.1, and their 

1182 # values MUST be identical. 

1183 nonce=None, 

1184 claims_challenge=None, 

1185 **kwargs): 

1186 """The second half of the Authorization Code Grant. 

1187 

1188 :param code: The authorization code returned from Authorization Server. 

1189 :param list[str] scopes: (Required) 

1190 Scopes requested to access a protected API (a resource). 

1191 

1192 If you requested user consent for multiple resources, here you will 

1193 typically want to provide a subset of what you required in AuthCode. 

1194 

1195 OAuth2 was designed mostly for singleton services, 

1196 where tokens are always meant for the same resource and the only 

1197 changes are in the scopes. 

1198 In Microsoft Entra, tokens can be issued for multiple 3rd party resources. 

1199 You can ask authorization code for multiple resources, 

1200 but when you redeem it, the token is for only one intended 

1201 recipient, called audience. 

1202 So the developer need to specify a scope so that we can restrict the 

1203 token to be issued for the corresponding audience. 

1204 

1205 :param nonce: 

1206 If you provided a nonce when calling :func:`get_authorization_request_url`, 

1207 same nonce should also be provided here, so that we'll validate it. 

1208 An exception will be raised if the nonce in id token mismatches. 

1209 

1210 :param claims_challenge: 

1211 The claims_challenge parameter requests specific claims requested by the resource provider 

1212 in the form of a claims_challenge directive in the www-authenticate header to be 

1213 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

1214 It is a string of a JSON object which contains lists of claims being requested from these locations. 

1215 

1216 :return: A dict representing the json response from Microsoft Entra: 

1217 

1218 - A successful response would contain "access_token" key, 

1219 - an error response would contain "error" and usually "error_description". 

1220 """ 

1221 # If scope is absent on the wire, STS will give you a token associated 

1222 # to the FIRST scope sent during the authorization request. 

1223 # So in theory, you can omit scope here when you were working with only 

1224 # one scope. But, MSAL decorates your scope anyway, so they are never 

1225 # really empty. 

1226 assert isinstance(scopes, list), "Invalid parameter type" 

1227 self._validate_ssh_cert_input_data(kwargs.get("data", {})) 

1228 warnings.warn( 

1229 "Change your acquire_token_by_authorization_code() " 

1230 "to acquire_token_by_auth_code_flow()", DeprecationWarning) 

1231 with warnings.catch_warnings(record=True): 

1232 telemetry_context = self._build_telemetry_context( 

1233 self.ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID) 

1234 response = _clean_up(self.client.obtain_token_by_authorization_code( 

1235 code, redirect_uri=redirect_uri, 

1236 scope=self._decorate_scope(scopes), 

1237 headers=telemetry_context.generate_headers(), 

1238 data=dict( 

1239 kwargs.pop("data", {}), 

1240 claims=_merge_claims_challenge_and_capabilities( 

1241 self._client_capabilities, claims_challenge)), 

1242 nonce=nonce, 

1243 **kwargs)) 

1244 if "access_token" in response: 

1245 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

1246 telemetry_context.update_telemetry(response) 

1247 return response 

1248 

1249 def get_accounts(self, username=None): 

1250 """Get a list of accounts which previously signed in, i.e. exists in cache. 

1251 

1252 An account can later be used in :func:`~acquire_token_silent` 

1253 to find its tokens. 

1254 

1255 :param username: 

1256 Filter accounts with this username only. Case insensitive. 

1257 :return: A list of account objects. 

1258 Each account is a dict. For now, we only document its "username" field. 

1259 Your app can choose to display those information to end user, 

1260 and allow user to choose one of his/her accounts to proceed. 

1261 """ 

1262 accounts = self._find_msal_accounts(environment=self.authority.instance) 

1263 if not accounts: # Now try other aliases of this authority instance 

1264 for alias in self._get_authority_aliases(self.authority.instance): 

1265 accounts = self._find_msal_accounts(environment=alias) 

1266 if accounts: 

1267 break 

1268 if username: 

1269 # Federated account["username"] from AAD could contain mixed case 

1270 lowercase_username = username.lower() 

1271 accounts = [a for a in accounts 

1272 if a["username"].lower() == lowercase_username] 

1273 if not accounts: 

1274 logger.debug(( # This would also happen when the cache is empty 

1275 "get_accounts(username='{}') finds no account. " 

1276 "If tokens were acquired without 'profile' scope, " 

1277 "they would contain no username for filtering. " 

1278 "Consider calling get_accounts(username=None) instead." 

1279 ).format(username)) 

1280 # Does not further filter by existing RTs here. It probably won't matter. 

1281 # Because in most cases Accounts and RTs co-exist. 

1282 # Even in the rare case when an RT is revoked and then removed, 

1283 # acquire_token_silent() would then yield no result, 

1284 # apps would fall back to other acquire methods. This is the standard pattern. 

1285 return accounts 

1286 

1287 def _find_msal_accounts(self, environment): 

1288 interested_authority_types = [ 

1289 TokenCache.AuthorityType.ADFS, TokenCache.AuthorityType.MSSTS] 

1290 if _is_running_in_cloud_shell(): 

1291 interested_authority_types.append(_AUTHORITY_TYPE_CLOUDSHELL) 

1292 grouped_accounts = { 

1293 a.get("home_account_id"): # Grouped by home tenant's id 

1294 { # These are minimal amount of non-tenant-specific account info 

1295 "home_account_id": a.get("home_account_id"), 

1296 "environment": a.get("environment"), 

1297 "username": a.get("username"), 

1298 "account_source": a.get("account_source"), 

1299 

1300 # The following fields for backward compatibility, for now 

1301 "authority_type": a.get("authority_type"), 

1302 "local_account_id": a.get("local_account_id"), # Tenant-specific 

1303 "realm": a.get("realm"), # Tenant-specific 

1304 } 

1305 for a in self.token_cache.search( 

1306 TokenCache.CredentialType.ACCOUNT, 

1307 query={"environment": environment}) 

1308 if a["authority_type"] in interested_authority_types 

1309 } 

1310 return list(grouped_accounts.values()) 

1311 

1312 def _get_instance_metadata(self, instance): # This exists so it can be mocked in unit test 

1313 instance_discovery_host = _get_instance_discovery_host(instance) 

1314 resp = self.http_client.get( 

1315 _get_instance_discovery_endpoint(instance), 

1316 params={ 

1317 'api-version': '1.1', 

1318 'authorization_endpoint': ( 

1319 "https://{}/common/oauth2/authorize".format(instance_discovery_host) 

1320 ), 

1321 }, 

1322 headers={'Accept': 'application/json'}) 

1323 resp.raise_for_status() 

1324 return json.loads(resp.text)['metadata'] 

1325 

1326 def _get_authority_aliases(self, instance): 

1327 if self._instance_discovery is False: 

1328 return [] 

1329 if self.authority._is_known_to_developer: 

1330 # Then it is an ADFS/B2C/known_authority_hosts situation 

1331 # which may not reach the central endpoint, so we skip it. 

1332 return [] 

1333 if instance not in self.authority_groups: 

1334 self.authority_groups[instance] = [ 

1335 set(group['aliases']) for group in self._get_instance_metadata(instance)] 

1336 for group in self.authority_groups[instance]: 

1337 if instance in group: 

1338 return [alias for alias in group if alias != instance] 

1339 return [] 

1340 

1341 def remove_account(self, account): 

1342 """Sign me out and forget me from token cache""" 

1343 if self._enable_broker: 

1344 from .broker import _signout_silently 

1345 error = _signout_silently(self.client_id, account["local_account_id"]) 

1346 if error: 

1347 logger.debug("_signout_silently() returns error: %s", error) 

1348 # Broker sign-out has been attempted, even if the _forget_me() below throws. 

1349 self._forget_me(account) 

1350 

1351 def _sign_out(self, home_account): 

1352 # Remove all relevant RTs and ATs from token cache 

1353 owned_by_home_account = { 

1354 "environment": home_account["environment"], 

1355 "home_account_id": home_account["home_account_id"],} # realm-independent 

1356 app_metadata = self._get_app_metadata(home_account["environment"]) 

1357 # Remove RTs/FRTs, and they are realm-independent 

1358 for rt in [ # Remove RTs from a static list (rather than from a dynamic generator), 

1359 # to avoid changing self.token_cache while it is being iterated 

1360 rt for rt in self.token_cache.search( 

1361 TokenCache.CredentialType.REFRESH_TOKEN, query=owned_by_home_account) 

1362 # Do RT's app ownership check as a precaution, in case family apps 

1363 # and 3rd-party apps share same token cache, although they should not. 

1364 if rt["client_id"] == self.client_id or ( 

1365 app_metadata.get("family_id") # Now let's settle family business 

1366 and rt.get("family_id") == app_metadata["family_id"]) 

1367 ]: 

1368 self.token_cache.remove_rt(rt) 

1369 for at in list(self.token_cache.search( # Remove ATs from a static list, 

1370 # to avoid changing self.token_cache while it is being iterated 

1371 TokenCache.CredentialType.ACCESS_TOKEN, query=owned_by_home_account, 

1372 # Regardless of realm, b/c we've removed realm-independent RTs anyway 

1373 )): 

1374 # To avoid the complexity of locating sibling family app's AT, 

1375 # we skip AT's app ownership check. 

1376 # It means ATs for other apps will also be removed, it is OK because: 

1377 # * non-family apps are not supposed to share token cache to begin with; 

1378 # * Even if it happens, we keep other app's RT already, so SSO still works 

1379 self.token_cache.remove_at(at) 

1380 

1381 def _forget_me(self, home_account): 

1382 # It implies signout, and then also remove all relevant accounts and IDTs 

1383 self._sign_out(home_account) 

1384 owned_by_home_account = { 

1385 "environment": home_account["environment"], 

1386 "home_account_id": home_account["home_account_id"],} # realm-independent 

1387 for idt in list(self.token_cache.search( # Remove IDTs from a static list, 

1388 # to avoid changing self.token_cache while it is being iterated 

1389 TokenCache.CredentialType.ID_TOKEN, query=owned_by_home_account, # regardless of realm 

1390 )): 

1391 self.token_cache.remove_idt(idt) 

1392 for a in list(self.token_cache.search( # Remove Accounts from a static list, 

1393 # to avoid changing self.token_cache while it is being iterated 

1394 TokenCache.CredentialType.ACCOUNT, query=owned_by_home_account, # regardless of realm 

1395 )): 

1396 self.token_cache.remove_account(a) 

1397 

1398 def _acquire_token_by_cloud_shell(self, scopes, data=None): 

1399 from .cloudshell import _obtain_token 

1400 response = _obtain_token( 

1401 self.http_client, scopes, client_id=self.client_id, data=data) 

1402 if "error" not in response: 

1403 self.token_cache.add(dict( 

1404 client_id=self.client_id, 

1405 scope=response["scope"].split() if "scope" in response else scopes, 

1406 token_endpoint=self.authority.token_endpoint, 

1407 response=response, 

1408 data=data or {}, 

1409 authority_type=_AUTHORITY_TYPE_CLOUDSHELL, 

1410 )) 

1411 if "access_token" in response: 

1412 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_BROKER 

1413 return response 

1414 

1415 def acquire_token_silent( 

1416 self, 

1417 scopes, # type: List[str] 

1418 account, # type: Optional[Account] 

1419 authority=None, # See get_authorization_request_url() 

1420 force_refresh=False, # type: Optional[boolean] 

1421 claims_challenge=None, 

1422 auth_scheme=None, 

1423 **kwargs): 

1424 """Acquire an access token for given account, without user interaction. 

1425 

1426 It has same parameters as the :func:`~acquire_token_silent_with_error`. 

1427 The difference is the behavior of the return value. 

1428 This method will combine the cache empty and refresh error 

1429 into one return value, `None`. 

1430 If your app does not care about the exact token refresh error during 

1431 token cache look-up, then this method is easier and recommended. 

1432 

1433 :return: 

1434 - A dict containing no "error" key, 

1435 and typically contains an "access_token" key, 

1436 if cache lookup succeeded. 

1437 - None when cache lookup does not yield a token. 

1438 """ 

1439 if not account: 

1440 return None # A backward-compatible NO-OP to drop the account=None usage 

1441 result = _clean_up(self._acquire_token_silent_with_error( 

1442 scopes, account, authority=authority, force_refresh=force_refresh, 

1443 claims_challenge=claims_challenge, auth_scheme=auth_scheme, **kwargs)) 

1444 return result if result and "error" not in result else None 

1445 

1446 def acquire_token_silent_with_error( 

1447 self, 

1448 scopes, # type: List[str] 

1449 account, # type: Optional[Account] 

1450 authority=None, # See get_authorization_request_url() 

1451 force_refresh=False, # type: Optional[boolean] 

1452 claims_challenge=None, 

1453 auth_scheme=None, 

1454 **kwargs): 

1455 """Acquire an access token for given account, without user interaction. 

1456 

1457 It is done either by finding a valid access token from cache, 

1458 or by finding a valid refresh token from cache and then automatically 

1459 use it to redeem a new access token. 

1460 

1461 This method will differentiate cache empty from token refresh error. 

1462 If your app cares the exact token refresh error during 

1463 token cache look-up, then this method is suitable. 

1464 Otherwise, the other method :func:`~acquire_token_silent` is recommended. 

1465 

1466 :param list[str] scopes: (Required) 

1467 Scopes requested to access a protected API (a resource). 

1468 :param account: (Required) 

1469 One of the account object returned by :func:`~get_accounts`. 

1470 Starting from MSAL Python 1.23, 

1471 a ``None`` input will become a NO-OP and always return ``None``. 

1472 :param force_refresh: 

1473 If True, it will skip Access Token look-up, 

1474 and try to find a Refresh Token to obtain a new Access Token. 

1475 :param claims_challenge: 

1476 The claims_challenge parameter requests specific claims requested by the resource provider 

1477 in the form of a claims_challenge directive in the www-authenticate header to be 

1478 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

1479 It is a string of a JSON object which contains lists of claims being requested from these locations. 

1480 :param object auth_scheme: 

1481 You can provide an ``msal.auth_scheme.PopAuthScheme`` object 

1482 so that MSAL will get a Proof-of-Possession (POP) token for you. 

1483 

1484 New in version 1.26.0. 

1485 

1486 :return: 

1487 - A dict containing no "error" key, 

1488 and typically contains an "access_token" key, 

1489 if cache lookup succeeded. 

1490 - None when there is simply no token in the cache. 

1491 - A dict containing an "error" key, when token refresh failed. 

1492 """ 

1493 if not account: 

1494 return None # A backward-compatible NO-OP to drop the account=None usage 

1495 return _clean_up(self._acquire_token_silent_with_error( 

1496 scopes, account, authority=authority, force_refresh=force_refresh, 

1497 claims_challenge=claims_challenge, auth_scheme=auth_scheme, **kwargs)) 

1498 

1499 def _acquire_token_silent_with_error( 

1500 self, 

1501 scopes, # type: List[str] 

1502 account, # type: Optional[Account] 

1503 authority=None, # See get_authorization_request_url() 

1504 force_refresh=False, # type: Optional[boolean] 

1505 claims_challenge=None, 

1506 auth_scheme=None, 

1507 **kwargs): 

1508 assert isinstance(scopes, list), "Invalid parameter type" 

1509 self._validate_ssh_cert_input_data(kwargs.get("data", {})) 

1510 correlation_id = msal.telemetry._get_new_correlation_id() 

1511 if authority: 

1512 warnings.warn("We haven't decided how/if this method will accept authority parameter") 

1513 # the_authority = Authority( 

1514 # authority, 

1515 # self.http_client, 

1516 # instance_discovery=self._instance_discovery, 

1517 # ) if authority else self.authority 

1518 result = self._acquire_token_silent_from_cache_and_possibly_refresh_it( 

1519 scopes, account, self.authority, force_refresh=force_refresh, 

1520 claims_challenge=claims_challenge, 

1521 correlation_id=correlation_id, 

1522 auth_scheme=auth_scheme, 

1523 **kwargs) 

1524 if result and "error" not in result: 

1525 return result 

1526 final_result = result 

1527 for alias in self._get_authority_aliases(self.authority.instance): 

1528 if not list(self.token_cache.search( # Need a list to test emptiness 

1529 self.token_cache.CredentialType.REFRESH_TOKEN, 

1530 # target=scopes, # MUST NOT filter by scopes, because: 

1531 # 1. AAD RTs are scope-independent; 

1532 # 2. therefore target is optional per schema; 

1533 query={"environment": alias})): 

1534 # Skip heavy weight logic when RT for this alias doesn't exist 

1535 continue 

1536 the_authority = Authority( 

1537 "https://" + alias + "/" + self.authority.tenant, 

1538 self.http_client, 

1539 instance_discovery=False, 

1540 ) 

1541 result = self._acquire_token_silent_from_cache_and_possibly_refresh_it( 

1542 scopes, account, the_authority, force_refresh=force_refresh, 

1543 claims_challenge=claims_challenge, 

1544 correlation_id=correlation_id, 

1545 auth_scheme=auth_scheme, 

1546 **kwargs) 

1547 if result: 

1548 if "error" not in result: 

1549 return result 

1550 final_result = result 

1551 if final_result and final_result.get("suberror"): 

1552 final_result["classification"] = { # Suppress these suberrors, per #57 

1553 "bad_token": "", 

1554 "token_expired": "", 

1555 "protection_policy_required": "", 

1556 "client_mismatch": "", 

1557 "device_authentication_failed": "", 

1558 }.get(final_result["suberror"], final_result["suberror"]) 

1559 return final_result 

1560 

1561 def _acquire_token_silent_from_cache_and_possibly_refresh_it( 

1562 self, 

1563 scopes, # type: List[str] 

1564 account, # type: Optional[Account] 

1565 authority, # This can be different than self.authority 

1566 force_refresh=False, # type: Optional[boolean] 

1567 claims_challenge=None, 

1568 correlation_id=None, 

1569 http_exceptions=None, 

1570 auth_scheme=None, 

1571 **kwargs): 

1572 # This internal method has two calling patterns: 

1573 # it accepts a non-empty account to find token for a user, 

1574 # and accepts account=None to find a token for the current app. 

1575 access_token_from_cache = None 

1576 if not (force_refresh or claims_challenge or auth_scheme): # Then attempt AT cache 

1577 query={ 

1578 "client_id": self.client_id, 

1579 "environment": authority.instance, 

1580 "realm": authority.tenant, 

1581 "home_account_id": (account or {}).get("home_account_id"), 

1582 } 

1583 key_id = kwargs.get("data", {}).get("key_id") 

1584 if key_id: # Some token types (SSH-certs, POP) are bound to a key 

1585 query["key_id"] = key_id 

1586 now = time.time() 

1587 refresh_reason = msal.telemetry.AT_ABSENT 

1588 for entry in self.token_cache.search( # A generator allows us to 

1589 # break early in cache-hit without finding a full list 

1590 self.token_cache.CredentialType.ACCESS_TOKEN, 

1591 target=scopes, 

1592 query=query, 

1593 ): # This loop is about token search, not about token deletion. 

1594 # Note that search() holds a lock during this loop; 

1595 # that is fine because this loop is fast 

1596 expires_in = int(entry["expires_on"]) - now 

1597 if expires_in < 5*60: # Then consider it expired 

1598 refresh_reason = msal.telemetry.AT_EXPIRED 

1599 continue # Removal is not necessary, it will be overwritten 

1600 logger.debug("Cache hit an AT") 

1601 access_token_from_cache = { # Mimic a real response 

1602 "access_token": entry["secret"], 

1603 "token_type": entry.get("token_type", "Bearer"), 

1604 "expires_in": int(expires_in), # OAuth2 specs defines it as int 

1605 self._TOKEN_SOURCE: self._TOKEN_SOURCE_CACHE, 

1606 } 

1607 if "refresh_on" in entry: 

1608 access_token_from_cache["refresh_on"] = int(entry["refresh_on"]) 

1609 if int(entry["refresh_on"]) < now: # aging 

1610 refresh_reason = msal.telemetry.AT_AGING 

1611 break # With a fallback in hand, we break here to go refresh 

1612 self._build_telemetry_context(-1).hit_an_access_token() 

1613 return access_token_from_cache # It is still good as new 

1614 else: 

1615 refresh_reason = msal.telemetry.FORCE_REFRESH # TODO: It could also mean claims_challenge 

1616 assert refresh_reason, "It should have been established at this point" 

1617 if not http_exceptions: # It can be a tuple of exceptions 

1618 # The exact HTTP exceptions are transportation-layer dependent 

1619 from requests.exceptions import RequestException # Lazy load 

1620 http_exceptions = (RequestException,) 

1621 try: 

1622 data = kwargs.get("data", {}) 

1623 if account and account.get("authority_type") == _AUTHORITY_TYPE_CLOUDSHELL: 

1624 if auth_scheme: 

1625 raise ValueError("auth_scheme is not supported in Cloud Shell") 

1626 return self._acquire_token_by_cloud_shell(scopes, data=data) 

1627 

1628 is_ssh_cert_or_pop_request = _is_ssh_cert_or_pop_request(data.get("token_type"), auth_scheme) 

1629 

1630 if self._enable_broker and account and account.get("account_source") in ( 

1631 _GRANT_TYPE_BROKER, # Broker successfully established this account previously. 

1632 None, # Unknown data from older MSAL. Broker might still work. 

1633 ) and (sys.platform in ("win32", "darwin") or not is_ssh_cert_or_pop_request): 

1634 from .broker import _acquire_token_silently 

1635 response = _acquire_token_silently( 

1636 "https://{}/{}".format(self.authority.instance, self.authority.tenant), 

1637 self.client_id, 

1638 account["local_account_id"], 

1639 scopes, 

1640 claims=_merge_claims_challenge_and_capabilities( 

1641 self._client_capabilities, claims_challenge), 

1642 correlation_id=correlation_id, 

1643 auth_scheme=auth_scheme, 

1644 **data) 

1645 if response: # Broker provides a decisive outcome 

1646 account_was_established_by_broker = account.get( 

1647 "account_source") == _GRANT_TYPE_BROKER 

1648 broker_attempt_succeeded_just_now = "error" not in response 

1649 if account_was_established_by_broker or broker_attempt_succeeded_just_now: 

1650 return self._process_broker_response(response, scopes, data) 

1651 

1652 if auth_scheme: 

1653 raise ValueError(self._AUTH_SCHEME_UNSUPPORTED) 

1654 if account: 

1655 result = self._acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family( 

1656 authority, self._decorate_scope(scopes), account, 

1657 refresh_reason=refresh_reason, claims_challenge=claims_challenge, 

1658 correlation_id=correlation_id, 

1659 **kwargs) 

1660 else: # The caller is acquire_token_for_client() 

1661 result = self._acquire_token_for_client( 

1662 scopes, refresh_reason, claims_challenge=claims_challenge, 

1663 **kwargs) 

1664 if result and "access_token" in result: 

1665 result[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

1666 if (result and "error" not in result) or (not access_token_from_cache): 

1667 return result 

1668 except http_exceptions: 

1669 # Typically network error. Potential AAD outage? 

1670 if not access_token_from_cache: # It means there is no fall back option 

1671 raise # We choose to bubble up the exception 

1672 return access_token_from_cache 

1673 

1674 def _process_broker_response(self, response, scopes, data): 

1675 if "error" not in response: 

1676 self.token_cache.add(dict( 

1677 client_id=self.client_id, 

1678 scope=response["scope"].split() if "scope" in response else scopes, 

1679 token_endpoint=self.authority.token_endpoint, 

1680 response=response, 

1681 data=data, 

1682 _account_id=response["_account_id"], 

1683 environment=self.authority.instance, # Be consistent with non-broker flows 

1684 grant_type=_GRANT_TYPE_BROKER, # A pseudo grant type for TokenCache to mark account_source as broker 

1685 )) 

1686 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_BROKER 

1687 return _clean_up(response) 

1688 

1689 def _acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family( 

1690 self, authority, scopes, account, **kwargs): 

1691 query = { 

1692 "environment": authority.instance, 

1693 "home_account_id": (account or {}).get("home_account_id"), 

1694 # "realm": authority.tenant, # AAD RTs are tenant-independent 

1695 } 

1696 app_metadata = self._get_app_metadata(authority.instance) 

1697 if not app_metadata: # Meaning this app is now used for the first time. 

1698 # When/if we have a way to directly detect current app's family, 

1699 # we'll rewrite this block, to support multiple families. 

1700 # For now, we try existing RTs (*). If it works, we are in that family. 

1701 # (*) RTs of a different app/family are not supposed to be 

1702 # shared with or accessible by us in the first place. 

1703 at = self._acquire_token_silent_by_finding_specific_refresh_token( 

1704 authority, scopes, 

1705 dict(query, family_id="1"), # A hack, we have only 1 family for now 

1706 rt_remover=lambda rt_item: None, # NO-OP b/c RTs are likely not mine 

1707 break_condition=lambda response: # Break loop when app not in family 

1708 # Based on an AAD-only behavior mentioned in internal doc here 

1709 # https://msazure.visualstudio.com/One/_git/ESTS-Docs/pullrequest/1138595 

1710 "client_mismatch" in response.get("error_additional_info", []), 

1711 **kwargs) 

1712 if at and "error" not in at: 

1713 return at 

1714 last_resp = None 

1715 if app_metadata.get("family_id"): # Meaning this app belongs to this family 

1716 last_resp = at = self._acquire_token_silent_by_finding_specific_refresh_token( 

1717 authority, scopes, dict(query, family_id=app_metadata["family_id"]), 

1718 **kwargs) 

1719 if at and "error" not in at: 

1720 return at 

1721 # Either this app is an orphan, so we will naturally use its own RT; 

1722 # or all attempts above have failed, so we fall back to non-foci behavior. 

1723 return self._acquire_token_silent_by_finding_specific_refresh_token( 

1724 authority, scopes, dict(query, client_id=self.client_id), 

1725 **kwargs) or last_resp 

1726 

1727 def _get_app_metadata(self, environment): 

1728 return self.token_cache._get_app_metadata( 

1729 environment=environment, client_id=self.client_id, default={}) 

1730 

1731 def _acquire_token_silent_by_finding_specific_refresh_token( 

1732 self, authority, scopes, query, 

1733 rt_remover=None, break_condition=lambda response: False, 

1734 refresh_reason=None, correlation_id=None, claims_challenge=None, 

1735 **kwargs): 

1736 matches = list(self.token_cache.search( # We want a list to test emptiness 

1737 self.token_cache.CredentialType.REFRESH_TOKEN, 

1738 # target=scopes, # AAD RTs are scope-independent 

1739 query=query)) 

1740 logger.debug("Found %d RTs matching %s", len(matches), { 

1741 k: _pii_less_home_account_id(v) if k == "home_account_id" and v else v 

1742 for k, v in query.items() 

1743 }) 

1744 

1745 response = None # A distinguishable value to mean cache is empty 

1746 if not matches: # Then exit early to avoid expensive operations 

1747 return response 

1748 client, _ = self._build_client( 

1749 # Potentially expensive if building regional client 

1750 self.client_credential, authority, skip_regional_client=True) 

1751 telemetry_context = self._build_telemetry_context( 

1752 self.ACQUIRE_TOKEN_SILENT_ID, 

1753 correlation_id=correlation_id, refresh_reason=refresh_reason) 

1754 for entry in sorted( # Since unfit RTs would not be aggressively removed, 

1755 # we start from newer RTs which are more likely fit. 

1756 matches, 

1757 key=lambda e: int(e.get("last_modification_time", "0")), 

1758 reverse=True): 

1759 logger.debug("Cache attempts an RT") 

1760 headers = telemetry_context.generate_headers() 

1761 if query.get("home_account_id"): # Then use it as CCS Routing info 

1762 headers["X-AnchorMailbox"] = "Oid:{}".format( # case-insensitive value 

1763 query["home_account_id"].replace(".", "@")) 

1764 response = client.obtain_token_by_refresh_token( 

1765 entry, rt_getter=lambda token_item: token_item["secret"], 

1766 on_removing_rt=lambda rt_item: None, # Disable RT removal, 

1767 # because an invalid_grant could be caused by new MFA policy, 

1768 # the RT could still be useful for other MFA-less scope or tenant 

1769 on_obtaining_tokens=lambda event: self.token_cache.add(dict( 

1770 event, 

1771 environment=authority.instance, 

1772 skip_account_creation=True, # To honor a concurrent remove_account() 

1773 )), 

1774 scope=scopes, 

1775 headers=headers, 

1776 data=dict( 

1777 kwargs.pop("data", {}), 

1778 claims=_merge_claims_challenge_and_capabilities( 

1779 self._client_capabilities, claims_challenge)), 

1780 **kwargs) 

1781 telemetry_context.update_telemetry(response) 

1782 if "error" not in response: 

1783 return response 

1784 logger.debug("Refresh failed. {error}: {error_description}".format( 

1785 error=response.get("error"), 

1786 error_description=response.get("error_description"), 

1787 )) 

1788 if break_condition(response): 

1789 break 

1790 return response # Returns the latest error (if any), or just None 

1791 

1792 def _validate_ssh_cert_input_data(self, data): 

1793 if data.get("token_type") == "ssh-cert": 

1794 if not data.get("req_cnf"): 

1795 raise ValueError( 

1796 "When requesting an SSH certificate, " 

1797 "you must include a string parameter named 'req_cnf' " 

1798 "containing the public key in JWK format " 

1799 "(https://tools.ietf.org/html/rfc7517).") 

1800 if not data.get("key_id"): 

1801 raise ValueError( 

1802 "When requesting an SSH certificate, " 

1803 "you must include a string parameter named 'key_id' " 

1804 "which identifies the key in the 'req_cnf' argument.") 

1805 

1806 def acquire_token_by_refresh_token(self, refresh_token, scopes, **kwargs): 

1807 """Acquire token(s) based on a refresh token (RT) obtained from elsewhere. 

1808 

1809 You use this method only when you have old RTs from elsewhere, 

1810 and now you want to migrate them into MSAL. 

1811 Calling this method results in new tokens automatically storing into MSAL. 

1812 

1813 You do NOT need to use this method if you are already using MSAL. 

1814 MSAL maintains RT automatically inside its token cache, 

1815 and an access token can be retrieved 

1816 when you call :func:`~acquire_token_silent`. 

1817 

1818 :param str refresh_token: The old refresh token, as a string. 

1819 

1820 :param list scopes: 

1821 The scopes associate with this old RT. 

1822 Each scope needs to be in the Microsoft identity platform (v2) format. 

1823 See `Scopes not resources <https://docs.microsoft.com/en-us/azure/active-directory/develop/migrate-python-adal-msal#scopes-not-resources>`_. 

1824 

1825 :return: 

1826 * A dict contains "error" and some other keys, when error happened. 

1827 * A dict contains no "error" key means migration was successful. 

1828 """ 

1829 self._validate_ssh_cert_input_data(kwargs.get("data", {})) 

1830 telemetry_context = self._build_telemetry_context( 

1831 self.ACQUIRE_TOKEN_BY_REFRESH_TOKEN, 

1832 refresh_reason=msal.telemetry.FORCE_REFRESH) 

1833 response = _clean_up(self.client.obtain_token_by_refresh_token( 

1834 refresh_token, 

1835 scope=self._decorate_scope(scopes), 

1836 headers=telemetry_context.generate_headers(), 

1837 rt_getter=lambda rt: rt, 

1838 on_updating_rt=False, 

1839 on_removing_rt=lambda rt_item: None, # No OP 

1840 **kwargs)) 

1841 if "access_token" in response: 

1842 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

1843 telemetry_context.update_telemetry(response) 

1844 return response 

1845 

1846 def acquire_token_by_username_password( 

1847 self, username, password, scopes, claims_challenge=None, 

1848 # Note: We shouldn't need to surface enable_msa_passthrough, 

1849 # because this ROPC won't work with MSA account anyway. 

1850 auth_scheme=None, 

1851 **kwargs): 

1852 """Gets a token for a given resource via user credentials. 

1853 

1854 See this page for constraints of Username Password Flow. 

1855 https://github.com/AzureAD/microsoft-authentication-library-for-python/wiki/Username-Password-Authentication 

1856 

1857 :param str username: Typically a UPN in the form of an email address. 

1858 :param str password: The password. 

1859 :param list[str] scopes: 

1860 Scopes requested to access a protected API (a resource). 

1861 :param claims_challenge: 

1862 The claims_challenge parameter requests specific claims requested by the resource provider 

1863 in the form of a claims_challenge directive in the www-authenticate header to be 

1864 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

1865 It is a string of a JSON object which contains lists of claims being requested from these locations. 

1866 

1867 :param object auth_scheme: 

1868 You can provide an ``msal.auth_scheme.PopAuthScheme`` object 

1869 so that MSAL will get a Proof-of-Possession (POP) token for you. 

1870 

1871 New in version 1.26.0. 

1872 

1873 :return: A dict representing the json response from Microsoft Entra: 

1874 

1875 - A successful response would contain "access_token" key, 

1876 - an error response would contain "error" and usually "error_description". 

1877 

1878 [Deprecated] This API is deprecated for public client flows and will be 

1879 removed in a future release. Use a more secure flow instead. 

1880 Migration guide: https://aka.ms/msal-ropc-migration 

1881 

1882 """ 

1883 is_confidential_app = self.client_credential or isinstance( 

1884 self, ConfidentialClientApplication) 

1885 if not is_confidential_app: 

1886 warnings.warn("""This API has been deprecated for public client flows, please use a more secure flow. 

1887 See https://aka.ms/msal-ropc-migration for migration guidance""", DeprecationWarning) 

1888 claims = _merge_claims_challenge_and_capabilities( 

1889 self._client_capabilities, claims_challenge) 

1890 if self._enable_broker and sys.platform in ("win32", "darwin"): 

1891 from .broker import _signin_silently 

1892 response = _signin_silently( 

1893 "https://{}/{}".format(self.authority.instance, self.authority.tenant), 

1894 self.client_id, 

1895 scopes, # Decorated scopes won't work due to offline_access 

1896 MSALRuntime_Username=username, 

1897 MSALRuntime_Password=password, 

1898 validateAuthority="no" if ( 

1899 self.authority._is_known_to_developer 

1900 or self._instance_discovery is False) else None, 

1901 claims=claims, 

1902 auth_scheme=auth_scheme, 

1903 ) 

1904 return self._process_broker_response(response, scopes, kwargs.get("data", {})) 

1905 

1906 if auth_scheme: 

1907 raise ValueError(self._AUTH_SCHEME_UNSUPPORTED) 

1908 scopes = self._decorate_scope(scopes) 

1909 telemetry_context = self._build_telemetry_context( 

1910 self.ACQUIRE_TOKEN_BY_USERNAME_PASSWORD_ID) 

1911 headers = telemetry_context.generate_headers() 

1912 data = dict(kwargs.pop("data", {}), claims=claims) 

1913 response = None 

1914 if not self.authority.is_adfs: 

1915 user_realm_result = self.authority.user_realm_discovery( 

1916 username, correlation_id=headers[msal.telemetry.CLIENT_REQUEST_ID]) 

1917 if user_realm_result.get("account_type") == "Federated": 

1918 response = _clean_up(self._acquire_token_by_username_password_federated( 

1919 user_realm_result, username, password, scopes=scopes, 

1920 data=data, 

1921 headers=headers, **kwargs)) 

1922 if response is None: # Either ADFS or not federated 

1923 response = _clean_up(self.client.obtain_token_by_username_password( 

1924 username, password, scope=scopes, 

1925 headers=headers, 

1926 data=data, 

1927 **kwargs)) 

1928 if "access_token" in response: 

1929 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

1930 telemetry_context.update_telemetry(response) 

1931 return response 

1932 

1933 def _acquire_token_by_username_password_federated( 

1934 self, user_realm_result, username, password, scopes=None, **kwargs): 

1935 wstrust_endpoint = {} 

1936 if user_realm_result.get("federation_metadata_url"): 

1937 wstrust_endpoint = mex_send_request( 

1938 user_realm_result["federation_metadata_url"], 

1939 self.http_client) 

1940 if wstrust_endpoint is None: 

1941 raise ValueError("Unable to find wstrust endpoint from MEX. " 

1942 "This typically happens when attempting MSA accounts. " 

1943 "More details available here. " 

1944 "https://github.com/AzureAD/microsoft-authentication-library-for-python/wiki/Username-Password-Authentication") 

1945 logger.debug("wstrust_endpoint = %s", wstrust_endpoint) 

1946 wstrust_result = wst_send_request( 

1947 username, password, 

1948 user_realm_result.get("cloud_audience_urn", "urn:federation:MicrosoftOnline"), 

1949 wstrust_endpoint.get("address", 

1950 # Fallback to an AAD supplied endpoint 

1951 user_realm_result.get("federation_active_auth_url")), 

1952 wstrust_endpoint.get("action"), self.http_client) 

1953 if not ("token" in wstrust_result and "type" in wstrust_result): 

1954 raise RuntimeError("Unsuccessful RSTR. %s" % wstrust_result) 

1955 GRANT_TYPE_SAML1_1 = 'urn:ietf:params:oauth:grant-type:saml1_1-bearer' 

1956 grant_type = { 

1957 SAML_TOKEN_TYPE_V1: GRANT_TYPE_SAML1_1, 

1958 SAML_TOKEN_TYPE_V2: self.client.GRANT_TYPE_SAML2, 

1959 WSS_SAML_TOKEN_PROFILE_V1_1: GRANT_TYPE_SAML1_1, 

1960 WSS_SAML_TOKEN_PROFILE_V2: self.client.GRANT_TYPE_SAML2 

1961 }.get(wstrust_result.get("type")) 

1962 if not grant_type: 

1963 raise RuntimeError( 

1964 "RSTR returned unknown token type: %s", wstrust_result.get("type")) 

1965 self.client.grant_assertion_encoders.setdefault( # Register a non-standard type 

1966 grant_type, self.client.encode_saml_assertion) 

1967 return self.client.obtain_token_by_assertion( 

1968 wstrust_result["token"], grant_type, scope=scopes, 

1969 on_obtaining_tokens=lambda event: self.token_cache.add(dict( 

1970 event, 

1971 environment=self.authority.instance, 

1972 username=username, # Useful in case IDT contains no such info 

1973 )), 

1974 **kwargs) 

1975 

1976 

1977class PublicClientApplication(ClientApplication): # browser app or mobile app 

1978 

1979 DEVICE_FLOW_CORRELATION_ID = "_correlation_id" 

1980 CONSOLE_WINDOW_HANDLE = object() 

1981 

1982 def __init__( 

1983 self, client_id, client_credential=None, 

1984 *, 

1985 enable_broker_on_windows=None, 

1986 enable_broker_on_mac=None, 

1987 enable_broker_on_linux=None, 

1988 enable_broker_on_wsl=None, 

1989 **kwargs): 

1990 """Same as :func:`ClientApplication.__init__`, 

1991 except that ``client_credential`` parameter shall remain ``None``. 

1992 

1993 .. note:: 

1994 

1995 **What is a broker, and why use it?** 

1996 

1997 A broker is a component installed on your device. 

1998 Broker implicitly gives your device an identity. By using a broker, 

1999 your device becomes a factor that can satisfy MFA (Multi-factor authentication). 

2000 This factor would become mandatory 

2001 if a tenant's admin enables a corresponding Conditional Access (CA) policy. 

2002 The broker's presence allows Microsoft identity platform 

2003 to have higher confidence that the tokens are being issued to your device, 

2004 and that is more secure. 

2005 

2006 An additional benefit of broker is, 

2007 it runs as a long-lived process with your device's OS, 

2008 and maintains its own cache, 

2009 so that your broker-enabled apps (even a CLI) 

2010 could automatically SSO from a previously established signed-in session. 

2011 

2012 **How to opt in to use broker?** 

2013 

2014 1. You can set any combination of the following opt-in parameters to true: 

2015 

2016 +--------------------------+-----------------------------------+------------------------------------------------------------------------------------+ 

2017 | Opt-in flag | If app will run on | App has registered this as a Desktop platform redirect URI in Azure Portal | 

2018 +==========================+===================================+====================================================================================+ 

2019 | enable_broker_on_windows | Windows 10+ | ms-appx-web://Microsoft.AAD.BrokerPlugin/your_client_id | 

2020 +--------------------------+-----------------------------------+------------------------------------------------------------------------------------+ 

2021 | enable_broker_on_wsl | WSL | ms-appx-web://Microsoft.AAD.BrokerPlugin/your_client_id | 

2022 +--------------------------+-----------------------------------+------------------------------------------------------------------------------------+ 

2023 | enable_broker_on_mac | Mac with Company Portal installed | msauth.com.msauth.unsignedapp://auth | 

2024 +--------------------------+-----------------------------------+------------------------------------------------------------------------------------+ 

2025 | enable_broker_on_linux | Linux with Intune installed | ``https://login.microsoftonline.com/common/oauth2/nativeclient`` (MUST be enabled) | 

2026 +--------------------------+-----------------------------------+------------------------------------------------------------------------------------+ 

2027 

2028 2. Install broker dependency, 

2029 e.g. ``pip install msal[broker]>=1.33,<2``. 

2030 

2031 3. Test with ``acquire_token_interactive()`` and ``acquire_token_silent()``. 

2032 

2033 **The fallback behaviors of MSAL Python's broker support** 

2034 

2035 MSAL will either error out, or silently fallback to non-broker flows. 

2036 

2037 1. MSAL will ignore the `enable_broker_...` and bypass broker 

2038 on those auth flows that are known to be NOT supported by broker. 

2039 This includes ADFS, B2C, etc.. 

2040 For other "could-use-broker" scenarios, please see below. 

2041 2. MSAL errors out when app developer opted-in to use broker 

2042 but a direct dependency "mid-tier" package is not installed. 

2043 Error message guides app developer to declare the correct dependency 

2044 ``msal[broker]``. 

2045 We error out here because the error is actionable to app developers. 

2046 3. MSAL silently "deactivates" the broker and fallback to non-broker, 

2047 when opted-in, dependency installed yet failed to initialize. 

2048 We anticipate this would happen on a device whose OS is too old 

2049 or the underlying broker component is somehow unavailable. 

2050 There is not much an app developer or the end user can do here. 

2051 Eventually, the conditional access policy shall 

2052 force the user to switch to a different device. 

2053 4. MSAL errors out when broker is opted in, installed, initialized, 

2054 but subsequent token request(s) failed. 

2055 

2056 :param boolean enable_broker_on_windows: 

2057 This setting is only effective if your app is running on Windows 10+. 

2058 This parameter defaults to None, which means MSAL will not utilize a broker. 

2059 

2060 New in MSAL Python 1.25.0. 

2061 

2062 :param boolean enable_broker_on_mac: 

2063 This setting is only effective if your app is running on Mac. 

2064 This parameter defaults to None, which means MSAL will not utilize a broker. 

2065 

2066 New in MSAL Python 1.31.0. 

2067 

2068 :param boolean enable_broker_on_linux: 

2069 This setting is only effective if your app is running on Linux, including WSL. 

2070 This parameter defaults to None, which means MSAL will not utilize a broker. 

2071 

2072 New in MSAL Python 1.33.0. 

2073 

2074 :param boolean enable_broker_on_wsl: 

2075 This setting is only effective if your app is running on WSL. 

2076 This parameter defaults to None, which means MSAL will not utilize a broker. 

2077 

2078 New in MSAL Python 1.33.0. 

2079 """ 

2080 if client_credential is not None: 

2081 raise ValueError("Public Client should not possess credentials") 

2082 

2083 self._enable_broker = bool( 

2084 enable_broker_on_windows and sys.platform == "win32" 

2085 or enable_broker_on_mac and sys.platform == "darwin" 

2086 or enable_broker_on_linux and sys.platform == "linux" 

2087 or enable_broker_on_wsl and is_wsl() 

2088 ) 

2089 

2090 super(PublicClientApplication, self).__init__( 

2091 client_id, client_credential=None, **kwargs) 

2092 

2093 def acquire_token_interactive( 

2094 self, 

2095 scopes, # type: list[str] 

2096 prompt=None, 

2097 login_hint=None, # type: Optional[str] 

2098 domain_hint=None, # type: Optional[str] 

2099 claims_challenge=None, 

2100 timeout=None, 

2101 port=None, 

2102 extra_scopes_to_consent=None, 

2103 max_age=None, 

2104 parent_window_handle=None, 

2105 on_before_launching_ui=None, 

2106 auth_scheme=None, 

2107 **kwargs): 

2108 """Acquire token interactively i.e. via a local browser. 

2109 

2110 Prerequisite: In Azure Portal, configure the Redirect URI of your 

2111 "Mobile and Desktop application" as ``http://localhost``. 

2112 If you opts in to use broker during ``PublicClientApplication`` creation, 

2113 your app also need this Redirect URI: 

2114 ``ms-appx-web://Microsoft.AAD.BrokerPlugin/YOUR_CLIENT_ID`` 

2115 

2116 :param list scopes: 

2117 It is a list of case-sensitive strings. 

2118 :param str prompt: 

2119 By default, no prompt value will be sent, not even string ``"none"``. 

2120 You will have to specify a value explicitly. 

2121 Its valid values are the constants defined in 

2122 :class:`Prompt <msal.Prompt>`. 

2123 :param str login_hint: 

2124 Optional. Identifier of the user. Generally a User Principal Name (UPN). 

2125 :param domain_hint: 

2126 Can be one of "consumers" or "organizations" or your tenant domain "contoso.com". 

2127 If included, it will skip the email-based discovery process that user goes 

2128 through on the sign-in page, leading to a slightly more streamlined user experience. 

2129 More information on possible values available in 

2130 `Auth Code Flow doc <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-auth-code-flow#request-an-authorization-code>`_ and 

2131 `domain_hint doc <https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-oapx/86fb452d-e34a-494e-ac61-e526e263b6d8>`_. 

2132 

2133 :param claims_challenge: 

2134 The claims_challenge parameter requests specific claims requested by the resource provider 

2135 in the form of a claims_challenge directive in the www-authenticate header to be 

2136 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

2137 It is a string of a JSON object which contains lists of claims being requested from these locations. 

2138 

2139 :param int timeout: 

2140 This method will block the current thread. 

2141 This parameter specifies the timeout value in seconds. 

2142 Default value ``None`` means wait indefinitely. 

2143 

2144 :param int port: 

2145 The port to be used to listen to an incoming auth response. 

2146 By default we will use a system-allocated port. 

2147 (The rest of the redirect_uri is hard coded as ``http://localhost``.) 

2148 

2149 :param list extra_scopes_to_consent: 

2150 "Extra scopes to consent" is a concept only available in Microsoft Entra. 

2151 It refers to other resources you might want to prompt to consent for, 

2152 in the same interaction, but for which you won't get back a 

2153 token for in this particular operation. 

2154 

2155 :param int max_age: 

2156 OPTIONAL. Maximum Authentication Age. 

2157 Specifies the allowable elapsed time in seconds 

2158 since the last time the End-User was actively authenticated. 

2159 If the elapsed time is greater than this value, 

2160 Microsoft identity platform will actively re-authenticate the End-User. 

2161 

2162 MSAL Python will also automatically validate the auth_time in ID token. 

2163 

2164 New in version 1.15. 

2165 

2166 :param int parent_window_handle: 

2167 OPTIONAL. 

2168 

2169 * If your app does not opt in to use broker, 

2170 you do not need to provide a ``parent_window_handle`` here. 

2171 

2172 * If your app opts in to use broker, 

2173 ``parent_window_handle`` is required. 

2174 

2175 - If your app is a GUI app running on Windows or Mac system, 

2176 you are required to also provide its window handle, 

2177 so that the sign-in window will pop up on top of your window. 

2178 - If your app is a console app running on Windows or Mac system, 

2179 you can use a placeholder 

2180 ``PublicClientApplication.CONSOLE_WINDOW_HANDLE``. 

2181 

2182 Most Python scripts are console apps. 

2183 

2184 New in version 1.20.0. 

2185 

2186 :param function on_before_launching_ui: 

2187 A callback with the form of 

2188 ``lambda ui="xyz", **kwargs: print("A {} will be launched".format(ui))``, 

2189 where ``ui`` will be either "browser" or "broker". 

2190 You can use it to inform your end user to expect a pop-up window. 

2191 

2192 New in version 1.20.0. 

2193 

2194 :param object auth_scheme: 

2195 You can provide an ``msal.auth_scheme.PopAuthScheme`` object 

2196 so that MSAL will get a Proof-of-Possession (POP) token for you. 

2197 

2198 New in version 1.26.0. 

2199 

2200 :return: 

2201 - A dict containing no "error" key, 

2202 and typically contains an "access_token" key. 

2203 - A dict containing an "error" key, when token refresh failed. 

2204 """ 

2205 data = kwargs.pop("data", {}) 

2206 enable_msa_passthrough = kwargs.pop( # MUST remove it from kwargs 

2207 "enable_msa_passthrough", # Keep it as a hidden param, for now. 

2208 # OPTIONAL. MSA-Passthrough is a legacy configuration, 

2209 # needed by a small amount of Microsoft first-party apps, 

2210 # which would login MSA accounts via ".../organizations" authority. 

2211 # If you app belongs to this category, AND you are enabling broker, 

2212 # you would want to enable this flag. Default value is False. 

2213 # More background of MSA-PT is available from this internal docs: 

2214 # https://microsoft.sharepoint.com/:w:/t/Identity-DevEx/EatIUauX3c9Ctw1l7AQ6iM8B5CeBZxc58eoQCE0IuZ0VFw?e=tgc3jP&CID=39c853be-76ea-79d7-ee73-f1b2706ede05 

2215 False 

2216 ) and data.get("token_type") != "ssh-cert" # Work around a known issue as of PyMsalRuntime 0.8 

2217 self._validate_ssh_cert_input_data(data) 

2218 is_ssh_cert_or_pop_request = _is_ssh_cert_or_pop_request(data.get("token_type"), auth_scheme) 

2219 

2220 if not on_before_launching_ui: 

2221 on_before_launching_ui = lambda **kwargs: None 

2222 if _is_running_in_cloud_shell() and prompt == "none": 

2223 # Note: _acquire_token_by_cloud_shell() is always silent, 

2224 # so we would not fire on_before_launching_ui() 

2225 return self._acquire_token_by_cloud_shell(scopes, data=data) 

2226 claims = _merge_claims_challenge_and_capabilities( 

2227 self._client_capabilities, claims_challenge) 

2228 if self._enable_broker and (sys.platform in ("win32", "darwin") or not is_ssh_cert_or_pop_request): 

2229 if parent_window_handle is None: 

2230 raise ValueError( 

2231 "parent_window_handle is required when you opted into using broker. " 

2232 "You need to provide the window handle of your GUI application, " 

2233 "or use msal.PublicClientApplication.CONSOLE_WINDOW_HANDLE " 

2234 "when and only when your application is a console app.") 

2235 if extra_scopes_to_consent: 

2236 logger.warning( 

2237 "Ignoring parameter extra_scopes_to_consent, " 

2238 "which is not supported by broker") 

2239 response = self._acquire_token_interactive_via_broker( 

2240 scopes, 

2241 parent_window_handle, 

2242 enable_msa_passthrough, 

2243 claims, 

2244 data, 

2245 on_before_launching_ui, 

2246 auth_scheme, 

2247 prompt=prompt, 

2248 login_hint=login_hint, 

2249 max_age=max_age, 

2250 ) 

2251 return self._process_broker_response(response, scopes, data) 

2252 

2253 if isinstance(auth_scheme, msal.auth_scheme.PopAuthScheme) and sys.platform == "linux": 

2254 raise ValueError("POP is not supported on Linux") 

2255 elif auth_scheme: 

2256 raise ValueError(self._AUTH_SCHEME_UNSUPPORTED) 

2257 on_before_launching_ui(ui="browser") 

2258 telemetry_context = self._build_telemetry_context( 

2259 self.ACQUIRE_TOKEN_INTERACTIVE) 

2260 response = _clean_up(self.client.obtain_token_by_browser( 

2261 scope=self._decorate_scope(scopes) if scopes else None, 

2262 extra_scope_to_consent=extra_scopes_to_consent, 

2263 redirect_uri="http://localhost:{port}".format( 

2264 # Hardcode the host, for now. AAD portal rejects 127.0.0.1 anyway 

2265 port=port or 0), 

2266 prompt=prompt, 

2267 login_hint=login_hint, 

2268 max_age=max_age, 

2269 timeout=timeout, 

2270 auth_params={ 

2271 "claims": claims, 

2272 "domain_hint": domain_hint, 

2273 }, 

2274 data=dict(data, claims=claims), 

2275 headers=telemetry_context.generate_headers(), 

2276 browser_name=_preferred_browser(), 

2277 **kwargs)) 

2278 if "access_token" in response: 

2279 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

2280 telemetry_context.update_telemetry(response) 

2281 return response 

2282 

2283 def _acquire_token_interactive_via_broker( 

2284 self, 

2285 scopes, # type: list[str] 

2286 parent_window_handle, # type: int 

2287 enable_msa_passthrough, # type: boolean 

2288 claims, # type: str 

2289 data, # type: dict 

2290 on_before_launching_ui, # type: callable 

2291 auth_scheme, # type: object 

2292 prompt=None, 

2293 login_hint=None, # type: Optional[str] 

2294 max_age=None, 

2295 **kwargs): 

2296 from .broker import _signin_interactively, _signin_silently, _acquire_token_silently 

2297 if "welcome_template" in kwargs: 

2298 logger.debug(kwargs["welcome_template"]) # Experimental 

2299 authority = "https://{}/{}".format( 

2300 self.authority.instance, self.authority.tenant) 

2301 validate_authority = "no" if ( 

2302 self.authority._is_known_to_developer 

2303 or self._instance_discovery is False) else None 

2304 # Calls different broker methods to mimic the OIDC behaviors 

2305 if login_hint and prompt != "select_account": # OIDC prompts when the user did not sign in 

2306 accounts = self.get_accounts(username=login_hint) 

2307 if len(accounts) == 1: # Unambiguously proceed with this account 

2308 logger.debug("Calling broker._acquire_token_silently()") 

2309 response = _acquire_token_silently( # When it works, it bypasses prompt 

2310 authority, 

2311 self.client_id, 

2312 accounts[0]["local_account_id"], 

2313 scopes, 

2314 claims=claims, 

2315 auth_scheme=auth_scheme, 

2316 **data) 

2317 if response and "error" not in response: 

2318 return response 

2319 # login_hint undecisive or not exists 

2320 if prompt == "none" or not prompt: # Must/Can attempt _signin_silently() 

2321 logger.debug("Calling broker._signin_silently()") 

2322 response = _signin_silently( # Unlike OIDC, it doesn't honor login_hint 

2323 authority, self.client_id, scopes, 

2324 validateAuthority=validate_authority, 

2325 claims=claims, 

2326 max_age=max_age, 

2327 enable_msa_pt=enable_msa_passthrough, 

2328 auth_scheme=auth_scheme, 

2329 **data) 

2330 is_wrong_account = bool( 

2331 # _signin_silently() only gets tokens for default account, 

2332 # but this seems to have been fixed in PyMsalRuntime 0.11.2 

2333 "access_token" in response and login_hint 

2334 and login_hint != response.get( 

2335 "id_token_claims", {}).get("preferred_username")) 

2336 wrong_account_error_message = ( 

2337 'prompt="none" will not work for login_hint="non-default-user"') 

2338 if is_wrong_account: 

2339 logger.debug(wrong_account_error_message) 

2340 if prompt == "none": 

2341 return response if not is_wrong_account else { 

2342 "error": "broker_error", 

2343 "error_description": wrong_account_error_message, 

2344 } 

2345 else: 

2346 assert bool(prompt) is False 

2347 from pymsalruntime import Response_Status 

2348 recoverable_errors = frozenset([ 

2349 Response_Status.Status_AccountUnusable, 

2350 Response_Status.Status_InteractionRequired, 

2351 ]) 

2352 if is_wrong_account or "error" in response and response.get( 

2353 "_broker_status") in recoverable_errors: 

2354 pass # It will fall back to the _signin_interactively() 

2355 else: 

2356 return response 

2357 

2358 logger.debug("Falls back to broker._signin_interactively()") 

2359 on_before_launching_ui(ui="broker") 

2360 return _signin_interactively( 

2361 authority, self.client_id, scopes, 

2362 None if parent_window_handle is self.CONSOLE_WINDOW_HANDLE 

2363 else parent_window_handle, 

2364 validateAuthority=validate_authority, 

2365 login_hint=login_hint, 

2366 prompt=prompt, 

2367 claims=claims, 

2368 max_age=max_age, 

2369 enable_msa_pt=enable_msa_passthrough, 

2370 auth_scheme=auth_scheme, 

2371 **data) 

2372 

2373 def initiate_device_flow(self, scopes=None, *, claims_challenge=None, **kwargs): 

2374 """Initiate a Device Flow instance, 

2375 which will be used in :func:`~acquire_token_by_device_flow`. 

2376 

2377 :param list[str] scopes: 

2378 Scopes requested to access a protected API (a resource). 

2379 :return: A dict representing a newly created Device Flow object. 

2380 

2381 - A successful response would contain "user_code" key, among others 

2382 - an error response would contain some other readable key/value pairs. 

2383 """ 

2384 correlation_id = msal.telemetry._get_new_correlation_id() 

2385 flow = self.client.initiate_device_flow( 

2386 scope=self._decorate_scope(scopes or []), 

2387 headers={msal.telemetry.CLIENT_REQUEST_ID: correlation_id}, 

2388 data={"claims": _merge_claims_challenge_and_capabilities( 

2389 self._client_capabilities, claims_challenge)}, 

2390 **kwargs) 

2391 flow[self.DEVICE_FLOW_CORRELATION_ID] = correlation_id 

2392 return flow 

2393 

2394 def acquire_token_by_device_flow(self, flow, claims_challenge=None, **kwargs): 

2395 """Obtain token by a device flow object, with customizable polling effect. 

2396 

2397 :param dict flow: 

2398 A dict previously generated by :func:`~initiate_device_flow`. 

2399 By default, this method's polling effect will block current thread. 

2400 You can abort the polling loop at any time, 

2401 by changing the value of the flow's "expires_at" key to 0. 

2402 :param claims_challenge: 

2403 The claims_challenge parameter requests specific claims requested by the resource provider 

2404 in the form of a claims_challenge directive in the www-authenticate header to be 

2405 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

2406 It is a string of a JSON object which contains lists of claims being requested from these locations. 

2407 

2408 :return: A dict representing the json response from Microsoft Entra: 

2409 

2410 - A successful response would contain "access_token" key, 

2411 - an error response would contain "error" and usually "error_description". 

2412 """ 

2413 telemetry_context = self._build_telemetry_context( 

2414 self.ACQUIRE_TOKEN_BY_DEVICE_FLOW_ID, 

2415 correlation_id=flow.get(self.DEVICE_FLOW_CORRELATION_ID)) 

2416 response = _clean_up(self.client.obtain_token_by_device_flow( 

2417 flow, 

2418 data=dict( 

2419 kwargs.pop("data", {}), 

2420 code=flow["device_code"], # 2018-10-4 Hack: 

2421 # during transition period, 

2422 # service seemingly need both device_code and code parameter. 

2423 claims=_merge_claims_challenge_and_capabilities( 

2424 self._client_capabilities, claims_challenge), 

2425 ), 

2426 headers=telemetry_context.generate_headers(), 

2427 **kwargs)) 

2428 if "access_token" in response: 

2429 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

2430 telemetry_context.update_telemetry(response) 

2431 return response 

2432 

2433 

2434class ConfidentialClientApplication(ClientApplication): # server-side web app 

2435 """Same as :func:`ClientApplication.__init__`, 

2436 except that ``allow_broker`` parameter shall remain ``None``. 

2437 """ 

2438 

2439 def acquire_token_for_client(self, scopes, claims_challenge=None, **kwargs): 

2440 """Acquires token for the current confidential client, not for an end user. 

2441 

2442 Since MSAL Python 1.23, it will automatically look for token from cache, 

2443 and only send request to Identity Provider when cache misses. 

2444 

2445 :param list[str] scopes: (Required) 

2446 Scopes requested to access a protected API (a resource). 

2447 :param claims_challenge: 

2448 The claims_challenge parameter requests specific claims requested by the resource provider 

2449 in the form of a claims_challenge directive in the www-authenticate header to be 

2450 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

2451 It is a string of a JSON object which contains lists of claims being requested from these locations. 

2452 

2453 :return: A dict representing the json response from Microsoft Entra: 

2454 

2455 - A successful response would contain "access_token" key, 

2456 - an error response would contain "error" and usually "error_description". 

2457 """ 

2458 if kwargs.get("force_refresh"): 

2459 raise ValueError( # We choose to disallow force_refresh 

2460 "Historically, this method does not support force_refresh behavior. " 

2461 ) 

2462 return _clean_up(self._acquire_token_silent_with_error( 

2463 scopes, None, claims_challenge=claims_challenge, **kwargs)) 

2464 

2465 def _acquire_token_for_client( 

2466 self, 

2467 scopes, 

2468 refresh_reason, 

2469 claims_challenge=None, 

2470 **kwargs 

2471 ): 

2472 if self.authority.tenant.lower() in ["common", "organizations"]: 

2473 warnings.warn( 

2474 "Using /common or /organizations authority " 

2475 "in acquire_token_for_client() is unreliable. " 

2476 "Please use a specific tenant instead.", DeprecationWarning) 

2477 self._validate_ssh_cert_input_data(kwargs.get("data", {})) 

2478 telemetry_context = self._build_telemetry_context( 

2479 self.ACQUIRE_TOKEN_FOR_CLIENT_ID, refresh_reason=refresh_reason) 

2480 client = self._regional_client or self.client 

2481 response = client.obtain_token_for_client( 

2482 scope=scopes, # This grant flow requires no scope decoration 

2483 headers=telemetry_context.generate_headers(), 

2484 data=dict( 

2485 kwargs.pop("data", {}), 

2486 claims=_merge_claims_challenge_and_capabilities( 

2487 self._client_capabilities, claims_challenge)), 

2488 **kwargs) 

2489 telemetry_context.update_telemetry(response) 

2490 return response 

2491 

2492 def remove_tokens_for_client(self): 

2493 """Remove all tokens that were previously acquired via 

2494 :func:`~acquire_token_for_client()` for the current client.""" 

2495 for env in [self.authority.instance] + self._get_authority_aliases( 

2496 self.authority.instance): 

2497 for at in list(self.token_cache.search( # Remove ATs from a snapshot 

2498 TokenCache.CredentialType.ACCESS_TOKEN, query={ 

2499 "client_id": self.client_id, 

2500 "environment": env, 

2501 "home_account_id": None, # These are mostly app-only tokens 

2502 })): 

2503 self.token_cache.remove_at(at) 

2504 # acquire_token_for_client() obtains no RTs, so we have no RT to remove 

2505 

2506 def acquire_token_on_behalf_of(self, user_assertion, scopes, claims_challenge=None, **kwargs): 

2507 """Acquires token using on-behalf-of (OBO) flow. 

2508 

2509 The current app is a middle-tier service which was called with a token 

2510 representing an end user. 

2511 The current app can use such token (a.k.a. a user assertion) to request 

2512 another token to access downstream web API, on behalf of that user. 

2513 See `detail docs here <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-on-behalf-of-flow>`_ . 

2514 

2515 The current middle-tier app has no user interaction to obtain consent. 

2516 See how to gain consent upfront for your middle-tier app from this article. 

2517 https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-on-behalf-of-flow#gaining-consent-for-the-middle-tier-application 

2518 

2519 :param str user_assertion: The incoming token already received by this app 

2520 :param list[str] scopes: Scopes required by downstream API (a resource). 

2521 :param claims_challenge: 

2522 The claims_challenge parameter requests specific claims requested by the resource provider 

2523 in the form of a claims_challenge directive in the www-authenticate header to be 

2524 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

2525 It is a string of a JSON object which contains lists of claims being requested from these locations. 

2526 

2527 :return: A dict representing the json response from Microsoft Entra: 

2528 

2529 - A successful response would contain "access_token" key, 

2530 - an error response would contain "error" and usually "error_description". 

2531 """ 

2532 telemetry_context = self._build_telemetry_context( 

2533 self.ACQUIRE_TOKEN_ON_BEHALF_OF_ID) 

2534 # The implementation is NOT based on Token Exchange (RFC 8693) 

2535 response = _clean_up(self.client.obtain_token_by_assertion( # bases on assertion RFC 7521 

2536 user_assertion, 

2537 self.client.GRANT_TYPE_JWT, # IDTs and AAD ATs are all JWTs 

2538 scope=self._decorate_scope(scopes), # Decoration is used for: 

2539 # 1. Explicitly requesting an RT, without relying on AAD default 

2540 # behavior, even though it currently still issues an RT. 

2541 # 2. Requesting an IDT (which would otherwise be unavailable) 

2542 # so that the calling app could use id_token_claims to implement 

2543 # their own cache mapping, which is likely needed in web apps. 

2544 data=dict( 

2545 kwargs.pop("data", {}), 

2546 requested_token_use="on_behalf_of", 

2547 claims=_merge_claims_challenge_and_capabilities( 

2548 self._client_capabilities, claims_challenge)), 

2549 headers=telemetry_context.generate_headers(), 

2550 # TBD: Expose a login_hint (or ccs_routing_hint) param for web app 

2551 **kwargs)) 

2552 if "access_token" in response: 

2553 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

2554 telemetry_context.update_telemetry(response) 

2555 return response