Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/msal/application.py: 16%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

661 statements  

1import functools 

2import json 

3import time 

4import logging 

5import sys 

6import warnings 

7from threading import Lock 

8from typing import Optional # Needed in Python 3.7 & 3.8 

9from urllib.parse import urlparse 

10import os 

11 

12from .oauth2cli import Client, JwtAssertionCreator 

13from .oauth2cli.oidc import decode_part 

14from .authority import Authority, WORLD_WIDE 

15from .mex import send_request as mex_send_request 

16from .wstrust_request import send_request as wst_send_request 

17from .wstrust_response import * 

18from .token_cache import TokenCache, _get_username, _GRANT_TYPE_BROKER 

19import msal.telemetry 

20from .region import _detect_region 

21from .throttled_http_client import ThrottledHttpClient 

22from .cloudshell import _is_running_in_cloud_shell 

23from .sku import SKU, __version__ 

24from .oauth2cli.authcode import is_wsl 

25 

26 

27logger = logging.getLogger(__name__) 

28_AUTHORITY_TYPE_CLOUDSHELL = "CLOUDSHELL" 

29 

30def _init_broker(enable_pii_log): # Make it a function to allow mocking 

31 from . import broker # Trigger Broker's initialization, lazily 

32 if enable_pii_log: 

33 broker._enable_pii_log() 

34 

35def extract_certs(public_cert_content): 

36 # Parses raw public certificate file contents and returns a list of strings 

37 # Usage: headers = {"x5c": extract_certs(open("my_cert.pem").read())} 

38 public_certificates = re.findall( 

39 r'-----BEGIN CERTIFICATE-----(?P<cert_value>[^-]+)-----END CERTIFICATE-----', 

40 public_cert_content, re.I) 

41 if public_certificates: 

42 return [cert.strip() for cert in public_certificates] 

43 # The public cert tags are not found in the input, 

44 # let's make best effort to exclude a private key pem file. 

45 if "PRIVATE KEY" in public_cert_content: 

46 raise ValueError( 

47 "We expect your public key but detect a private key instead") 

48 return [public_cert_content.strip()] 

49 

50 

51def _merge_claims_challenge_and_capabilities(capabilities, claims_challenge): 

52 # Represent capabilities as {"access_token": {"xms_cc": {"values": capabilities}}} 

53 # and then merge/add it into incoming claims 

54 if not capabilities: 

55 return claims_challenge 

56 claims_dict = json.loads(claims_challenge) if claims_challenge else {} 

57 for key in ["access_token"]: # We could add "id_token" if we'd decide to 

58 claims_dict.setdefault(key, {}).update(xms_cc={"values": capabilities}) 

59 return json.dumps(claims_dict) 

60 

61 

62def _str2bytes(raw): 

63 # A conversion based on duck-typing rather than six.text_type 

64 try: 

65 return raw.encode(encoding="utf-8") 

66 except: 

67 return raw 

68 

69 

70def _parse_pfx(pfx_path, passphrase_bytes): 

71 # Cert concepts https://security.stackexchange.com/a/226758/125264 

72 from cryptography.hazmat.primitives import hashes, serialization 

73 from cryptography.hazmat.primitives.serialization import pkcs12 

74 with open(pfx_path, 'rb') as f: 

75 private_key, cert, _ = pkcs12.load_key_and_certificates( # cryptography 2.5+ 

76 # https://cryptography.io/en/latest/hazmat/primitives/asymmetric/serialization/#cryptography.hazmat.primitives.serialization.pkcs12.load_key_and_certificates 

77 f.read(), passphrase_bytes) 

78 if not (private_key and cert): 

79 raise ValueError("Your PFX file shall contain both private key and cert") 

80 cert_pem = cert.public_bytes(encoding=serialization.Encoding.PEM).decode() # cryptography 1.0+ 

81 x5c = [ 

82 '\n'.join(cert_pem.splitlines()[1:-1]) # Strip the "--- header ---" and "--- footer ---" 

83 ] 

84 sha256_thumbprint = cert.fingerprint(hashes.SHA256()).hex() # cryptography 0.7+ 

85 sha1_thumbprint = cert.fingerprint(hashes.SHA1()).hex() # cryptography 0.7+ 

86 # https://cryptography.io/en/latest/x509/reference/#x-509-certificate-object 

87 return private_key, sha256_thumbprint, sha1_thumbprint, x5c 

88 

89 

90def _load_private_key_from_pem_str(private_key_pem_str, passphrase_bytes): 

91 from cryptography.hazmat.primitives import serialization 

92 from cryptography.hazmat.backends import default_backend 

93 return serialization.load_pem_private_key( # cryptography 0.6+ 

94 _str2bytes(private_key_pem_str), 

95 passphrase_bytes, 

96 backend=default_backend(), # It was a required param until 2020 

97 ) 

98 

99 

100def _pii_less_home_account_id(home_account_id): 

101 parts = home_account_id.split(".") # It could contain one or two parts 

102 parts[0] = "********" 

103 return ".".join(parts) 

104 

105 

106def _clean_up(result): 

107 if isinstance(result, dict): 

108 if "_msalruntime_telemetry" in result or "_msal_python_telemetry" in result: 

109 result["msal_telemetry"] = json.dumps({ # Telemetry as an opaque string 

110 "msalruntime_telemetry": result.get("_msalruntime_telemetry"), 

111 "msal_python_telemetry": result.get("_msal_python_telemetry"), 

112 }, separators=(",", ":")) 

113 return_value = { 

114 k: result[k] for k in result 

115 if k != "refresh_in" # MSAL handled refresh_in, customers need not 

116 and not k.startswith('_') # Skim internal properties 

117 } 

118 if "refresh_in" in result: # To encourage proactive refresh 

119 return_value["refresh_on"] = int(time.time() + result["refresh_in"]) 

120 return return_value 

121 return result # It could be None 

122 

123 

124def _preferred_browser(): 

125 """Register Edge and return a name suitable for subsequent webbrowser.get(...) 

126 when appropriate. Otherwise return None. 

127 """ 

128 # On Linux, only Edge will provide device-based Conditional Access support 

129 if sys.platform != "linux": # On other platforms, we have no browser preference 

130 return None 

131 browser_path = "/usr/bin/microsoft-edge" # Use a full path owned by sys admin 

132 # Note: /usr/bin/microsoft-edge, /usr/bin/microsoft-edge-stable, etc. 

133 # are symlinks that point to the actual binaries which are found under 

134 # /opt/microsoft/msedge/msedge or /opt/microsoft/msedge-beta/msedge. 

135 # Either method can be used to detect an Edge installation. 

136 user_has_no_preference = "BROWSER" not in os.environ 

137 user_wont_mind_edge = "microsoft-edge" in os.environ.get("BROWSER", "") # Note: 

138 # BROWSER could contain "microsoft-edge" or "/path/to/microsoft-edge". 

139 # Python documentation (https://docs.python.org/3/library/webbrowser.html) 

140 # does not document the name being implicitly register, 

141 # so there is no public API to know whether the ENV VAR browser would work. 

142 # Therefore, we would not bother examine the env var browser's type. 

143 # We would just register our own Edge instance. 

144 if (user_has_no_preference or user_wont_mind_edge) and os.path.exists(browser_path): 

145 try: 

146 import webbrowser # Lazy import. Some distro may not have this. 

147 browser_name = "msal-edge" # Avoid popular name "microsoft-edge" 

148 # otherwise `BROWSER="microsoft-edge"; webbrowser.get("microsoft-edge")` 

149 # would return a GenericBrowser instance which won't work. 

150 try: 

151 registration_available = isinstance( 

152 webbrowser.get(browser_name), webbrowser.BackgroundBrowser) 

153 except webbrowser.Error: 

154 registration_available = False 

155 if not registration_available: 

156 logger.debug("Register %s with %s", browser_name, browser_path) 

157 # By registering our own browser instance with our own name, 

158 # rather than populating a process-wide BROWSER enn var, 

159 # this approach does not have side effect on non-MSAL code path. 

160 webbrowser.register( # Even double-register happens to work fine 

161 browser_name, None, webbrowser.BackgroundBrowser(browser_path)) 

162 return browser_name 

163 except ImportError: 

164 pass # We may still proceed 

165 return None 

166 

167def _is_ssh_cert_or_pop_request(token_type, auth_scheme) -> bool: 

168 return token_type == "ssh-cert" or token_type == "pop" or isinstance(auth_scheme, msal.auth_scheme.PopAuthScheme) 

169 

170class _ClientWithCcsRoutingInfo(Client): 

171 

172 def initiate_auth_code_flow(self, **kwargs): 

173 if kwargs.get("login_hint"): # eSTS could have utilized this as-is, but nope 

174 kwargs["X-AnchorMailbox"] = "UPN:%s" % kwargs["login_hint"] 

175 return super(_ClientWithCcsRoutingInfo, self).initiate_auth_code_flow( 

176 client_info=1, # To be used as CSS Routing info 

177 **kwargs) 

178 

179 def obtain_token_by_auth_code_flow( 

180 self, auth_code_flow, auth_response, **kwargs): 

181 # Note: the obtain_token_by_browser() is also covered by this 

182 assert isinstance(auth_code_flow, dict) and isinstance(auth_response, dict) 

183 headers = kwargs.pop("headers", {}) 

184 client_info = json.loads( 

185 decode_part(auth_response["client_info"]) 

186 ) if auth_response.get("client_info") else {} 

187 if "uid" in client_info and "utid" in client_info: 

188 # Note: The value of X-AnchorMailbox is also case-insensitive 

189 headers["X-AnchorMailbox"] = "Oid:{uid}@{utid}".format(**client_info) 

190 return super(_ClientWithCcsRoutingInfo, self).obtain_token_by_auth_code_flow( 

191 auth_code_flow, auth_response, headers=headers, **kwargs) 

192 

193 def obtain_token_by_username_password(self, username, password, **kwargs): 

194 headers = kwargs.pop("headers", {}) 

195 headers["X-AnchorMailbox"] = "upn:{}".format(username) 

196 return super(_ClientWithCcsRoutingInfo, self).obtain_token_by_username_password( 

197 username, password, headers=headers, **kwargs) 

198 

199 

200def _msal_extension_check(): 

201 # Can't run this in module or class level otherwise you'll get circular import error 

202 try: 

203 from msal_extensions import __version__ as v 

204 major, minor, _ = v.split(".", maxsplit=3) 

205 if not (int(major) >= 1 and int(minor) >= 2): 

206 warnings.warn( 

207 "Please upgrade msal-extensions. " 

208 "Only msal-extensions 1.2+ can work with msal 1.30+") 

209 except ImportError: 

210 pass # The optional msal_extensions is not installed. Business as usual. 

211 except ValueError: 

212 logger.exception(f"msal_extensions version {v} not in major.minor.patch format") 

213 except: 

214 logger.exception( 

215 "Unable to import msal_extensions during an optional check. " 

216 "This exception can be safely ignored." 

217 ) 

218 

219 

220class ClientApplication(object): 

221 """You do not usually directly use this class. Use its subclasses instead: 

222 :class:`PublicClientApplication` and :class:`ConfidentialClientApplication`. 

223 """ 

224 ACQUIRE_TOKEN_SILENT_ID = "84" 

225 ACQUIRE_TOKEN_BY_REFRESH_TOKEN = "85" 

226 ACQUIRE_TOKEN_BY_USERNAME_PASSWORD_ID = "301" 

227 ACQUIRE_TOKEN_ON_BEHALF_OF_ID = "523" 

228 ACQUIRE_TOKEN_BY_DEVICE_FLOW_ID = "622" 

229 ACQUIRE_TOKEN_FOR_CLIENT_ID = "730" 

230 ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID = "832" 

231 ACQUIRE_TOKEN_INTERACTIVE = "169" 

232 GET_ACCOUNTS_ID = "902" 

233 REMOVE_ACCOUNT_ID = "903" 

234 

235 ATTEMPT_REGION_DISCOVERY = True # "TryAutoDetect" 

236 DISABLE_MSAL_FORCE_REGION = False # Used in azure_region to disable MSAL_FORCE_REGION behavior 

237 _TOKEN_SOURCE = "token_source" 

238 _TOKEN_SOURCE_IDP = "identity_provider" 

239 _TOKEN_SOURCE_CACHE = "cache" 

240 _TOKEN_SOURCE_BROKER = "broker" 

241 

242 _enable_broker = False 

243 _AUTH_SCHEME_UNSUPPORTED = ( 

244 "auth_scheme is currently only available from broker. " 

245 "You can enable broker by following these instructions. " 

246 "https://msal-python.readthedocs.io/en/latest/#publicclientapplication") 

247 

248 def __init__( 

249 self, client_id, 

250 client_credential=None, authority=None, validate_authority=True, 

251 token_cache=None, 

252 http_client=None, 

253 verify=True, proxies=None, timeout=None, 

254 client_claims=None, app_name=None, app_version=None, 

255 client_capabilities=None, 

256 azure_region=None, # Note: We choose to add this param in this base class, 

257 # despite it is currently only needed by ConfidentialClientApplication. 

258 # This way, it holds the same positional param place for PCA, 

259 # when we would eventually want to add this feature to PCA in future. 

260 exclude_scopes=None, 

261 http_cache=None, 

262 instance_discovery=None, 

263 allow_broker=None, 

264 enable_pii_log=None, 

265 oidc_authority=None, 

266 ): 

267 """Create an instance of application. 

268 

269 :param str client_id: Your app has a client_id after you register it on Microsoft Entra admin center. 

270 

271 :param client_credential: 

272 For :class:`PublicClientApplication`, you use `None` here. 

273 

274 For :class:`ConfidentialClientApplication`, 

275 it supports many different input formats for different scenarios. 

276 

277 .. admonition:: Support using a client secret. 

278 

279 Just feed in a string, such as ``"your client secret"``. 

280 

281 .. admonition:: Support using a certificate in X.509 (.pem) format 

282 

283 Feed in a dict in this form:: 

284 

285 { 

286 "private_key": "...-----BEGIN PRIVATE KEY-----... in PEM format", 

287 "thumbprint": "A1B2C3D4E5F6...", 

288 "passphrase": "Passphrase if the private_key is encrypted (Optional. Added in version 1.6.0)", 

289 } 

290 

291 MSAL Python requires a "private_key" in PEM format. 

292 If your cert is in PKCS12 (.pfx) format, 

293 you can convert it to X.509 (.pem) format, 

294 by ``openssl pkcs12 -in file.pfx -out file.pem -nodes``. 

295 

296 The thumbprint is available in your app's registration in Azure Portal. 

297 Alternatively, you can `calculate the thumbprint <https://github.com/Azure/azure-sdk-for-python/blob/07d10639d7e47f4852eaeb74aef5d569db499d6e/sdk/identity/azure-identity/azure/identity/_credentials/certificate.py#L94-L97>`_. 

298 

299 .. admonition:: Support Subject Name/Issuer Auth with a cert in .pem 

300 

301 `Subject Name/Issuer Auth 

302 <https://github.com/AzureAD/microsoft-authentication-library-for-python/issues/60>`_ 

303 is an approach to allow easier certificate rotation. 

304 

305 *Added in version 0.5.0*:: 

306 

307 { 

308 "private_key": "...-----BEGIN PRIVATE KEY-----... in PEM format", 

309 "thumbprint": "A1B2C3D4E5F6...", 

310 "public_certificate": "...-----BEGIN CERTIFICATE-----...", 

311 "passphrase": "Passphrase if the private_key is encrypted (Optional. Added in version 1.6.0)", 

312 } 

313 

314 ``public_certificate`` (optional) is public key certificate 

315 which will be sent through 'x5c' JWT header only for 

316 subject name and issuer authentication to support cert auto rolls. 

317 

318 Per `specs <https://tools.ietf.org/html/rfc7515#section-4.1.6>`_, 

319 "the certificate containing 

320 the public key corresponding to the key used to digitally sign the 

321 JWS MUST be the first certificate. This MAY be followed by 

322 additional certificates, with each subsequent certificate being the 

323 one used to certify the previous one." 

324 However, your certificate's issuer may use a different order. 

325 So, if your attempt ends up with an error AADSTS700027 - 

326 "The provided signature value did not match the expected signature value", 

327 you may try use only the leaf cert (in PEM/str format) instead. 

328 

329 .. admonition:: Supporting raw assertion obtained from elsewhere 

330 

331 *Added in version 1.13.0*: 

332 It can also be a completely pre-signed assertion that you've assembled yourself. 

333 Simply pass a container containing only the key "client_assertion", like this:: 

334 

335 { 

336 "client_assertion": "...a JWT with claims aud, exp, iss, jti, nbf, and sub..." 

337 } 

338 

339 .. admonition:: Supporting reading client certificates from PFX files 

340 

341 *Added in version 1.29.0*: 

342 Feed in a dictionary containing the path to a PFX file:: 

343 

344 { 

345 "private_key_pfx_path": "/path/to/your.pfx", 

346 "passphrase": "Passphrase if the private_key is encrypted (Optional)", 

347 } 

348 

349 The following command will generate a .pfx file from your .key and .pem file:: 

350 

351 openssl pkcs12 -export -out certificate.pfx -inkey privateKey.key -in certificate.pem 

352 

353 .. admonition:: Support Subject Name/Issuer Auth with a cert in .pfx 

354 

355 *Added in version 1.30.0*: 

356 If your .pfx file contains both the private key and public cert, 

357 you can opt in for Subject Name/Issuer Auth like this:: 

358 

359 { 

360 "private_key_pfx_path": "/path/to/your.pfx", 

361 "public_certificate": True, 

362 "passphrase": "Passphrase if the private_key is encrypted (Optional)", 

363 } 

364 

365 :type client_credential: Union[dict, str, None] 

366 

367 :param dict client_claims: 

368 *Added in version 0.5.0*: 

369 It is a dictionary of extra claims that would be signed by 

370 by this :class:`ConfidentialClientApplication` 's private key. 

371 For example, you can use {"client_ip": "x.x.x.x"}. 

372 You may also override any of the following default claims:: 

373 

374 { 

375 "aud": the_token_endpoint, 

376 "iss": self.client_id, 

377 "sub": same_as_issuer, 

378 "exp": now + 10_min, 

379 "iat": now, 

380 "jti": a_random_uuid 

381 } 

382 

383 :param str authority: 

384 A URL that identifies a token authority. It should be of the format 

385 ``https://login.microsoftonline.com/your_tenant`` 

386 By default, we will use ``https://login.microsoftonline.com/common`` 

387 

388 *Changed in version 1.17*: you can also use predefined constant 

389 and a builder like this:: 

390 

391 from msal.authority import ( 

392 AuthorityBuilder, 

393 AZURE_US_GOVERNMENT, AZURE_CHINA, AZURE_PUBLIC) 

394 my_authority = AuthorityBuilder(AZURE_PUBLIC, "contoso.onmicrosoft.com") 

395 # Now you get an equivalent of 

396 # "https://login.microsoftonline.com/contoso.onmicrosoft.com" 

397 

398 # You can feed such an authority to msal's ClientApplication 

399 from msal import PublicClientApplication 

400 app = PublicClientApplication("my_client_id", authority=my_authority, ...) 

401 

402 :param bool validate_authority: (optional) Turns authority validation 

403 on or off. This parameter default to true. 

404 :param TokenCache token_cache: 

405 Sets the token cache used by this ClientApplication instance. 

406 By default, an in-memory cache will be created and used. 

407 :param http_client: (optional) 

408 Your implementation of abstract class HttpClient <msal.oauth2cli.http.http_client> 

409 Defaults to a requests session instance. 

410 Since MSAL 1.11.0, the default session would be configured 

411 to attempt one retry on connection error. 

412 If you are providing your own http_client, 

413 it will be your http_client's duty to decide whether to perform retry. 

414 

415 :param verify: (optional) 

416 It will be passed to the 

417 `verify parameter in the underlying requests library 

418 <http://docs.python-requests.org/en/v2.9.1/user/advanced/#ssl-cert-verification>`_ 

419 This does not apply if you have chosen to pass your own Http client 

420 :param proxies: (optional) 

421 It will be passed to the 

422 `proxies parameter in the underlying requests library 

423 <http://docs.python-requests.org/en/v2.9.1/user/advanced/#proxies>`_ 

424 This does not apply if you have chosen to pass your own Http client 

425 :param timeout: (optional) 

426 It will be passed to the 

427 `timeout parameter in the underlying requests library 

428 <http://docs.python-requests.org/en/v2.9.1/user/advanced/#timeouts>`_ 

429 This does not apply if you have chosen to pass your own Http client 

430 :param app_name: (optional) 

431 You can provide your application name for Microsoft telemetry purposes. 

432 Default value is None, means it will not be passed to Microsoft. 

433 :param app_version: (optional) 

434 You can provide your application version for Microsoft telemetry purposes. 

435 Default value is None, means it will not be passed to Microsoft. 

436 :param list[str] client_capabilities: (optional) 

437 Allows configuration of one or more client capabilities, e.g. ["CP1"]. 

438 

439 Client capability is meant to inform the Microsoft identity platform 

440 (STS) what this client is capable for, 

441 so STS can decide to turn on certain features. 

442 For example, if client is capable to handle *claims challenge*, 

443 STS may issue 

444 `Continuous Access Evaluation (CAE) <https://learn.microsoft.com/entra/identity/conditional-access/concept-continuous-access-evaluation>`_ 

445 access tokens to resources, 

446 knowing that when the resource emits a *claims challenge* 

447 the client will be able to handle those challenges. 

448 

449 Implementation details: 

450 Client capability is implemented using "claims" parameter on the wire, 

451 for now. 

452 MSAL will combine them into 

453 `claims parameter <https://openid.net/specs/openid-connect-core-1_0-final.html#ClaimsParameter>`_ 

454 which you will later provide via one of the acquire-token request. 

455 

456 :param str azure_region: (optional) 

457 Instructs MSAL to use the Entra regional token service. This legacy feature is only available to 

458 first-party applications. Only ``acquire_token_for_client()`` is supported. 

459 

460 Supports 4 values: 

461 

462 1. ``azure_region=None`` - This default value means no region is configured. 

463 MSAL will use the region defined in env var ``MSAL_FORCE_REGION``. 

464 2. ``azure_region="some_region"`` - meaning the specified region is used. 

465 3. ``azure_region=True`` - meaning 

466 MSAL will try to auto-detect the region. This is not recommended. 

467 4. ``azure_region=False`` - meaning MSAL will use no region. 

468 

469 .. note:: 

470 Region auto-discovery has been tested on VMs and on Azure Functions. It is unreliable. 

471 Applications using this option should configure a short timeout. 

472 

473 For more details and for the values of the region string 

474 see https://learn.microsoft.com/entra/msal/dotnet/resources/region-discovery-troubleshooting 

475 

476 New in version 1.12.0. 

477 

478 :param list[str] exclude_scopes: (optional) 

479 Historically MSAL hardcodes `offline_access` scope, 

480 which would allow your app to have prolonged access to user's data. 

481 If that is unnecessary or undesirable for your app, 

482 now you can use this parameter to supply an exclusion list of scopes, 

483 such as ``exclude_scopes = ["offline_access"]``. 

484 

485 :param dict http_cache: 

486 MSAL has long been caching tokens in the ``token_cache``. 

487 Recently, MSAL also introduced a concept of ``http_cache``, 

488 by automatically caching some finite amount of non-token http responses, 

489 so that *long-lived* 

490 ``PublicClientApplication`` and ``ConfidentialClientApplication`` 

491 would be more performant and responsive in some situations. 

492 

493 This ``http_cache`` parameter accepts any dict-like object. 

494 If not provided, MSAL will use an in-memory dict. 

495 

496 If your app is a command-line app (CLI), 

497 you would want to persist your http_cache across different CLI runs. 

498 The persisted file's format may change due to, but not limited to, 

499 `unstable protocol <https://docs.python.org/3/library/pickle.html#data-stream-format>`_, 

500 so your implementation shall tolerate unexpected loading errors. 

501 The following recipe shows a way to do so:: 

502 

503 # Just add the following lines at the beginning of your CLI script 

504 import sys, atexit, pickle, logging 

505 http_cache_filename = sys.argv[0] + ".http_cache" 

506 try: 

507 with open(http_cache_filename, "rb") as f: 

508 persisted_http_cache = pickle.load(f) # Take a snapshot 

509 except ( 

510 FileNotFoundError, # Or IOError in Python 2 

511 pickle.UnpicklingError, # A corrupted http cache file 

512 AttributeError, # Cache created by a different version of MSAL 

513 ): 

514 persisted_http_cache = {} # Recover by starting afresh 

515 except: # Unexpected exceptions 

516 logging.exception("You may want to debug this") 

517 persisted_http_cache = {} # Recover by starting afresh 

518 atexit.register(lambda: pickle.dump( 

519 # When exit, flush it back to the file. 

520 # It may occasionally overwrite another process's concurrent write, 

521 # but that is fine. Subsequent runs will reach eventual consistency. 

522 persisted_http_cache, open(http_cache_file, "wb"))) 

523 

524 # And then you can implement your app as you normally would 

525 app = msal.PublicClientApplication( 

526 "your_client_id", 

527 ..., 

528 http_cache=persisted_http_cache, # Utilize persisted_http_cache 

529 ..., 

530 #token_cache=..., # You may combine the old token_cache trick 

531 # Please refer to token_cache recipe at 

532 # https://msal-python.readthedocs.io/en/latest/#msal.SerializableTokenCache 

533 ) 

534 app.acquire_token_interactive(["your", "scope"], ...) 

535 

536 Content inside ``http_cache`` are cheap to obtain. 

537 There is no need to share them among different apps. 

538 

539 Content inside ``http_cache`` will contain no tokens nor 

540 Personally Identifiable Information (PII). Encryption is unnecessary. 

541 

542 New in version 1.16.0. 

543 

544 :param boolean instance_discovery: 

545 Historically, MSAL would connect to a central endpoint located at 

546 ``https://login.microsoftonline.com`` to acquire some metadata, 

547 especially when using an unfamiliar authority. 

548 This behavior is known as Instance Discovery. 

549 

550 This parameter defaults to None, which enables the Instance Discovery. 

551 

552 If you know some authorities which you allow MSAL to operate with as-is, 

553 without involving any Instance Discovery, the recommended pattern is:: 

554 

555 known_authorities = frozenset([ # Treat your known authorities as const 

556 "https://contoso.com/adfs", "https://login.azs/foo"]) 

557 ... 

558 authority = "https://contoso.com/adfs" # Assuming your app will use this 

559 app1 = PublicClientApplication( 

560 "client_id", 

561 authority=authority, 

562 # Conditionally disable Instance Discovery for known authorities 

563 instance_discovery=authority not in known_authorities, 

564 ) 

565 

566 If you do not know some authorities beforehand, 

567 yet still want MSAL to accept any authority that you will provide, 

568 you can use a ``False`` to unconditionally disable Instance Discovery. 

569 

570 New in version 1.19.0. 

571 

572 :param boolean allow_broker: 

573 Deprecated. Please use ``enable_broker_on_windows`` instead. 

574 

575 :param boolean enable_pii_log: 

576 When enabled, logs may include PII (Personal Identifiable Information). 

577 This can be useful in troubleshooting broker behaviors. 

578 The default behavior is False. 

579 

580 New in version 1.24.0. 

581 

582 :param str oidc_authority: 

583 *Added in version 1.28.0*: 

584 It is a URL that identifies an OpenID Connect (OIDC) authority of 

585 the format ``https://contoso.com/tenant``. 

586 MSAL will append ".well-known/openid-configuration" to the authority 

587 and retrieve the OIDC metadata from there, to figure out the endpoints. 

588 

589 Note: Broker will NOT be used for OIDC authority. 

590 """ 

591 self.client_id = client_id 

592 self.client_credential = client_credential 

593 self.client_claims = client_claims 

594 self._client_capabilities = client_capabilities 

595 self._instance_discovery = instance_discovery 

596 

597 if exclude_scopes and not isinstance(exclude_scopes, list): 

598 raise ValueError( 

599 "Invalid exclude_scopes={}. It need to be a list of strings.".format( 

600 repr(exclude_scopes))) 

601 self._exclude_scopes = frozenset(exclude_scopes or []) 

602 if "openid" in self._exclude_scopes: 

603 raise ValueError( 

604 'Invalid exclude_scopes={}. You can not opt out "openid" scope'.format( 

605 repr(exclude_scopes))) 

606 

607 if http_client: 

608 self.http_client = http_client 

609 else: 

610 import requests # Lazy load 

611 

612 self.http_client = requests.Session() 

613 self.http_client.verify = verify 

614 self.http_client.proxies = proxies 

615 # Requests, does not support session - wide timeout 

616 # But you can patch that (https://github.com/psf/requests/issues/3341): 

617 self.http_client.request = functools.partial( 

618 self.http_client.request, timeout=timeout) 

619 

620 # Enable a minimal retry. Better than nothing. 

621 # https://github.com/psf/requests/blob/v2.25.1/requests/adapters.py#L94-L108 

622 a = requests.adapters.HTTPAdapter(max_retries=1) 

623 self.http_client.mount("http://", a) 

624 self.http_client.mount("https://", a) 

625 self.http_client = ThrottledHttpClient( 

626 self.http_client, 

627 http_cache=http_cache, 

628 default_throttle_time=60 

629 # The default value 60 was recommended mainly for PCA at the end of 

630 # https://identitydivision.visualstudio.com/devex/_git/AuthLibrariesApiReview?version=GBdev&path=%2FService%20protection%2FIntial%20set%20of%20protection%20measures.md&_a=preview 

631 if isinstance(self, PublicClientApplication) else 5, 

632 ) 

633 

634 self.app_name = app_name 

635 self.app_version = app_version 

636 

637 # Here the self.authority will not be the same type as authority in input 

638 if oidc_authority and authority: 

639 raise ValueError("You can not provide both authority and oidc_authority") 

640 if isinstance(authority, str) and urlparse(authority).path.startswith( 

641 "/dstsv2"): # dSTS authority's path always starts with "/dstsv2" 

642 oidc_authority = authority # So we treat it as if an oidc_authority 

643 try: 

644 authority_to_use = authority or "https://{}/common/".format(WORLD_WIDE) 

645 self.authority = Authority( 

646 authority_to_use, 

647 self.http_client, 

648 validate_authority=validate_authority, 

649 instance_discovery=self._instance_discovery, 

650 oidc_authority_url=oidc_authority, 

651 ) 

652 except ValueError: # Those are explicit authority validation errors 

653 raise 

654 except Exception: # The rest are typically connection errors 

655 if validate_authority and not oidc_authority and ( 

656 azure_region # Opted in to use region 

657 or (azure_region is None and os.getenv("MSAL_FORCE_REGION")) # Will use region 

658 ): 

659 # Since caller opts in to use region, here we tolerate connection 

660 # errors happened during authority validation at non-region endpoint 

661 self.authority = Authority( 

662 authority_to_use, 

663 self.http_client, 

664 instance_discovery=False, 

665 ) 

666 else: 

667 raise 

668 

669 self._decide_broker(allow_broker, enable_pii_log) 

670 self.token_cache = token_cache or TokenCache() 

671 self._region_configured = azure_region 

672 self._region_detected = None 

673 self.client, self._regional_client = self._build_client( 

674 client_credential, self.authority) 

675 self.authority_groups = None 

676 self._telemetry_buffer = {} 

677 self._telemetry_lock = Lock() 

678 _msal_extension_check() 

679 

680 

681 def _decide_broker(self, allow_broker, enable_pii_log): 

682 is_confidential_app = self.client_credential or isinstance( 

683 self, ConfidentialClientApplication) 

684 if is_confidential_app and allow_broker: 

685 raise ValueError("allow_broker=True is only supported in PublicClientApplication") 

686 # Historically, we chose to support ClientApplication("client_id", allow_broker=True) 

687 if allow_broker: 

688 warnings.warn( 

689 "allow_broker is deprecated. " 

690 "Please use PublicClientApplication(..., " 

691 "enable_broker_on_windows=True, " 

692 # No need to mention non-Windows platforms, because allow_broker is only for Windows 

693 "...)", 

694 DeprecationWarning) 

695 opted_in_for_broker = ( 

696 self._enable_broker # True means Opted-in from PCA 

697 or ( 

698 # When we started the broker project on Windows platform, 

699 # the allow_broker was meant to be cross-platform. Now we realize 

700 # that other platforms have different redirect_uri requirements, 

701 # so the old allow_broker is deprecated and will only for Windows. 

702 allow_broker and sys.platform == "win32") 

703 ) 

704 self._enable_broker = ( # This same variable will also store the state 

705 opted_in_for_broker 

706 and not is_confidential_app 

707 and not self.authority.is_adfs 

708 and not self.authority._is_b2c 

709 ) 

710 if self._enable_broker: 

711 try: 

712 _init_broker(enable_pii_log) 

713 except RuntimeError: 

714 self._enable_broker = False 

715 logger.warning( # It is common on Mac and Linux where broker is not built-in 

716 "Broker is unavailable on this platform. " 

717 "We will fallback to non-broker.") 

718 logger.debug("Broker enabled? %s", self._enable_broker) 

719 

720 def is_pop_supported(self): 

721 """Returns True if this client supports Proof-of-Possession Access Token.""" 

722 return self._enable_broker and sys.platform in ("win32", "darwin") 

723 

724 def _decorate_scope( 

725 self, scopes, 

726 reserved_scope=frozenset(['openid', 'profile', 'offline_access'])): 

727 if not isinstance(scopes, (list, set, tuple)): 

728 raise ValueError("The input scopes should be a list, tuple, or set") 

729 scope_set = set(scopes) # Input scopes is typically a list. Copy it to a set. 

730 if scope_set & reserved_scope: 

731 # These scopes are reserved for the API to provide good experience. 

732 # We could make the developer pass these and then if they do they will 

733 # come back asking why they don't see refresh token or user information. 

734 raise ValueError( 

735 """You cannot use any scope value that is reserved. 

736Your input: {} 

737The reserved list: {}""".format(list(scope_set), list(reserved_scope))) 

738 raise ValueError( 

739 "You cannot use any scope value that is in this reserved list: {}".format( 

740 list(reserved_scope))) 

741 

742 # client_id can also be used as a scope in B2C 

743 decorated = scope_set | reserved_scope 

744 decorated -= self._exclude_scopes 

745 return list(decorated) 

746 

747 def _build_telemetry_context( 

748 self, api_id, correlation_id=None, refresh_reason=None): 

749 return msal.telemetry._TelemetryContext( 

750 self._telemetry_buffer, self._telemetry_lock, api_id, 

751 correlation_id=correlation_id, refresh_reason=refresh_reason) 

752 

753 def _get_regional_authority(self, central_authority) -> Optional[Authority]: 

754 if self._region_configured is False: # User opts out of ESTS-R 

755 return None # Short circuit to completely bypass region detection 

756 if self._region_configured is None: # User did not make an ESTS-R choice 

757 self._region_configured = os.getenv("MSAL_FORCE_REGION") or None 

758 self._region_detected = self._region_detected or _detect_region( 

759 self.http_client if self._region_configured is not None else None) 

760 if (self._region_configured != self.ATTEMPT_REGION_DISCOVERY 

761 and self._region_configured != self._region_detected): 

762 logger.warning('Region configured ({}) != region detected ({})'.format( 

763 repr(self._region_configured), repr(self._region_detected))) 

764 region_to_use = ( 

765 self._region_detected 

766 if self._region_configured == self.ATTEMPT_REGION_DISCOVERY 

767 else self._region_configured) # It will retain the None i.e. opted out 

768 logger.debug('Region to be used: {}'.format(repr(region_to_use))) 

769 if region_to_use: 

770 regional_host = ("{}.login.microsoft.com".format(region_to_use) 

771 if central_authority.instance in ( 

772 # The list came from point 3 of the algorithm section in this internal doc 

773 # https://identitydivision.visualstudio.com/DevEx/_git/AuthLibrariesApiReview?path=/PinAuthToRegion/AAD%20SDK%20Proposal%20to%20Pin%20Auth%20to%20region.md&anchor=algorithm&_a=preview 

774 "login.microsoftonline.com", 

775 "login.microsoft.com", 

776 "login.windows.net", 

777 "sts.windows.net", 

778 ) 

779 else "{}.{}".format(region_to_use, central_authority.instance)) 

780 return Authority( # The central_authority has already been validated 

781 "https://{}/{}".format(regional_host, central_authority.tenant), 

782 self.http_client, 

783 instance_discovery=False, 

784 ) 

785 return None 

786 

787 def _build_client(self, client_credential, authority, skip_regional_client=False): 

788 client_assertion = None 

789 client_assertion_type = None 

790 default_headers = { 

791 "x-client-sku": SKU, "x-client-ver": __version__, 

792 "x-client-os": sys.platform, 

793 "x-ms-lib-capability": "retry-after, h429", 

794 } 

795 if self.app_name: 

796 default_headers['x-app-name'] = self.app_name 

797 if self.app_version: 

798 default_headers['x-app-ver'] = self.app_version 

799 default_body = {"client_info": 1} 

800 if isinstance(client_credential, dict): 

801 client_assertion_type = Client.CLIENT_ASSERTION_TYPE_JWT 

802 # Use client_credential.get("...") rather than "..." in client_credential 

803 # so that we can ignore an empty string came from an empty ENV VAR. 

804 if client_credential.get("client_assertion"): 

805 client_assertion = client_credential['client_assertion'] 

806 else: 

807 headers = {} 

808 sha1_thumbprint = sha256_thumbprint = None 

809 passphrase_bytes = _str2bytes( 

810 client_credential["passphrase"] 

811 ) if client_credential.get("passphrase") else None 

812 if client_credential.get("private_key_pfx_path"): 

813 private_key, sha256_thumbprint, sha1_thumbprint, x5c = _parse_pfx( 

814 client_credential["private_key_pfx_path"], 

815 passphrase_bytes) 

816 if client_credential.get("public_certificate") is True and x5c: 

817 headers["x5c"] = x5c 

818 elif ( 

819 client_credential.get("private_key") # PEM blob 

820 and client_credential.get("thumbprint")): 

821 sha1_thumbprint = client_credential["thumbprint"] 

822 if passphrase_bytes: 

823 private_key = _load_private_key_from_pem_str( 

824 client_credential['private_key'], passphrase_bytes) 

825 else: # PEM without passphrase 

826 private_key = client_credential['private_key'] 

827 else: 

828 raise ValueError( 

829 "client_credential needs to follow this format " 

830 "https://msal-python.readthedocs.io/en/latest/#msal.ClientApplication.params.client_credential") 

831 if ("x5c" not in headers # So the .pfx file contains no certificate 

832 and isinstance(client_credential.get('public_certificate'), str) 

833 ): # Then we treat the public_certificate value as PEM content 

834 headers["x5c"] = extract_certs(client_credential['public_certificate']) 

835 if sha256_thumbprint and not authority.is_adfs: 

836 assertion_params = { 

837 "algorithm": "PS256", "sha256_thumbprint": sha256_thumbprint, 

838 } 

839 else: # Fall back 

840 if not sha1_thumbprint: 

841 raise ValueError("You shall provide a thumbprint in SHA1.") 

842 assertion_params = { 

843 "algorithm": "RS256", "sha1_thumbprint": sha1_thumbprint, 

844 } 

845 assertion = JwtAssertionCreator( 

846 private_key, headers=headers, **assertion_params) 

847 client_assertion = assertion.create_regenerative_assertion( 

848 audience=authority.token_endpoint, issuer=self.client_id, 

849 additional_claims=self.client_claims or {}) 

850 else: 

851 default_body['client_secret'] = client_credential 

852 central_configuration = { 

853 "authorization_endpoint": authority.authorization_endpoint, 

854 "token_endpoint": authority.token_endpoint, 

855 "device_authorization_endpoint": authority.device_authorization_endpoint, 

856 } 

857 central_client = _ClientWithCcsRoutingInfo( 

858 central_configuration, 

859 self.client_id, 

860 http_client=self.http_client, 

861 default_headers=default_headers, 

862 default_body=default_body, 

863 client_assertion=client_assertion, 

864 client_assertion_type=client_assertion_type, 

865 on_obtaining_tokens=lambda event: self.token_cache.add(dict( 

866 event, environment=authority.instance)), 

867 on_removing_rt=self.token_cache.remove_rt, 

868 on_updating_rt=self.token_cache.update_rt) 

869 

870 regional_client = None 

871 if (client_credential # Currently regional endpoint only serves some CCA flows 

872 and not skip_regional_client): 

873 regional_authority = self._get_regional_authority(authority) 

874 if regional_authority: 

875 regional_configuration = { 

876 "authorization_endpoint": regional_authority.authorization_endpoint, 

877 "token_endpoint": regional_authority.token_endpoint, 

878 "device_authorization_endpoint": 

879 regional_authority.device_authorization_endpoint, 

880 } 

881 regional_client = _ClientWithCcsRoutingInfo( 

882 regional_configuration, 

883 self.client_id, 

884 http_client=self.http_client, 

885 default_headers=default_headers, 

886 default_body=default_body, 

887 client_assertion=client_assertion, 

888 client_assertion_type=client_assertion_type, 

889 on_obtaining_tokens=lambda event: self.token_cache.add(dict( 

890 event, environment=authority.instance)), 

891 on_removing_rt=self.token_cache.remove_rt, 

892 on_updating_rt=self.token_cache.update_rt) 

893 return central_client, regional_client 

894 

895 def initiate_auth_code_flow( 

896 self, 

897 scopes, # type: list[str] 

898 redirect_uri=None, 

899 state=None, # Recommended by OAuth2 for CSRF protection 

900 prompt=None, 

901 login_hint=None, # type: Optional[str] 

902 domain_hint=None, # type: Optional[str] 

903 claims_challenge=None, 

904 max_age=None, 

905 response_mode=None, # type: Optional[str] 

906 ): 

907 """Initiate an auth code flow. 

908 

909 Later when the response reaches your redirect_uri, 

910 you can use :func:`~acquire_token_by_auth_code_flow()` 

911 to complete the authentication/authorization. 

912 

913 :param list scopes: 

914 It is a list of case-sensitive strings. 

915 :param str redirect_uri: 

916 Optional. If not specified, server will use the pre-registered one. 

917 :param str state: 

918 An opaque value used by the client to 

919 maintain state between the request and callback. 

920 If absent, this library will automatically generate one internally. 

921 :param str prompt: 

922 By default, no prompt value will be sent, not even string ``"none"``. 

923 You will have to specify a value explicitly. 

924 Its valid values are the constants defined in 

925 :class:`Prompt <msal.Prompt>`. 

926 

927 :param str login_hint: 

928 Optional. Identifier of the user. Generally a User Principal Name (UPN). 

929 :param domain_hint: 

930 Can be one of "consumers" or "organizations" or your tenant domain "contoso.com". 

931 If included, it will skip the email-based discovery process that user goes 

932 through on the sign-in page, leading to a slightly more streamlined user experience. 

933 More information on possible values available in 

934 `Auth Code Flow doc <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-auth-code-flow#request-an-authorization-code>`_ and 

935 `domain_hint doc <https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-oapx/86fb452d-e34a-494e-ac61-e526e263b6d8>`_. 

936 

937 :param int max_age: 

938 OPTIONAL. Maximum Authentication Age. 

939 Specifies the allowable elapsed time in seconds 

940 since the last time the End-User was actively authenticated. 

941 If the elapsed time is greater than this value, 

942 Microsoft identity platform will actively re-authenticate the End-User. 

943 

944 MSAL Python will also automatically validate the auth_time in ID token. 

945 

946 New in version 1.15. 

947 

948 :param str response_mode: 

949 OPTIONAL. Specifies the method with which response parameters should be returned. 

950 The default value is equivalent to ``query``, which is still secure enough in MSAL Python 

951 (because MSAL Python does not transfer tokens via query parameter in the first place). 

952 For even better security, we recommend using the value ``form_post``. 

953 In "form_post" mode, response parameters 

954 will be encoded as HTML form values that are transmitted via the HTTP POST method and 

955 encoded in the body using the application/x-www-form-urlencoded format. 

956 Valid values can be either "form_post" for HTTP POST to callback URI or 

957 "query" (the default) for HTTP GET with parameters encoded in query string. 

958 More information on possible values 

959 `here <https://openid.net/specs/oauth-v2-multiple-response-types-1_0.html#ResponseModes>` 

960 and `here <https://openid.net/specs/oauth-v2-form-post-response-mode-1_0.html#FormPostResponseMode>` 

961 

962 :return: 

963 The auth code flow. It is a dict in this form:: 

964 

965 { 

966 "auth_uri": "https://...", // Guide user to visit this 

967 "state": "...", // You may choose to verify it by yourself, 

968 // or just let acquire_token_by_auth_code_flow() 

969 // do that for you. 

970 "...": "...", // Everything else are reserved and internal 

971 } 

972 

973 The caller is expected to: 

974 

975 1. somehow store this content, typically inside the current session, 

976 2. guide the end user (i.e. resource owner) to visit that auth_uri, 

977 3. and then relay this dict and subsequent auth response to 

978 :func:`~acquire_token_by_auth_code_flow()`. 

979 """ 

980 client = _ClientWithCcsRoutingInfo( 

981 {"authorization_endpoint": self.authority.authorization_endpoint}, 

982 self.client_id, 

983 http_client=self.http_client) 

984 flow = client.initiate_auth_code_flow( 

985 redirect_uri=redirect_uri, state=state, login_hint=login_hint, 

986 prompt=prompt, 

987 scope=self._decorate_scope(scopes), 

988 domain_hint=domain_hint, 

989 claims=_merge_claims_challenge_and_capabilities( 

990 self._client_capabilities, claims_challenge), 

991 max_age=max_age, 

992 response_mode=response_mode, 

993 ) 

994 flow["claims_challenge"] = claims_challenge 

995 return flow 

996 

997 def get_authorization_request_url( 

998 self, 

999 scopes, # type: list[str] 

1000 login_hint=None, # type: Optional[str] 

1001 state=None, # Recommended by OAuth2 for CSRF protection 

1002 redirect_uri=None, 

1003 response_type="code", # Could be "token" if you use Implicit Grant 

1004 prompt=None, 

1005 nonce=None, 

1006 domain_hint=None, # type: Optional[str] 

1007 claims_challenge=None, 

1008 **kwargs): 

1009 """Constructs a URL for you to start a Authorization Code Grant. 

1010 

1011 :param list[str] scopes: (Required) 

1012 Scopes requested to access a protected API (a resource). 

1013 :param str state: Recommended by OAuth2 for CSRF protection. 

1014 :param str login_hint: 

1015 Identifier of the user. Generally a User Principal Name (UPN). 

1016 :param str redirect_uri: 

1017 Address to return to upon receiving a response from the authority. 

1018 :param str response_type: 

1019 Default value is "code" for an OAuth2 Authorization Code grant. 

1020 

1021 You could use other content such as "id_token" or "token", 

1022 which would trigger an Implicit Grant, but that is 

1023 `not recommended <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-implicit-grant-flow#is-the-implicit-grant-suitable-for-my-app>`_. 

1024 

1025 :param str prompt: 

1026 By default, no prompt value will be sent, not even string ``"none"``. 

1027 You will have to specify a value explicitly. 

1028 Its valid values are the constants defined in 

1029 :class:`Prompt <msal.Prompt>`. 

1030 :param nonce: 

1031 A cryptographically random value used to mitigate replay attacks. See also 

1032 `OIDC specs <https://openid.net/specs/openid-connect-core-1_0.html#AuthRequest>`_. 

1033 :param domain_hint: 

1034 Can be one of "consumers" or "organizations" or your tenant domain "contoso.com". 

1035 If included, it will skip the email-based discovery process that user goes 

1036 through on the sign-in page, leading to a slightly more streamlined user experience. 

1037 More information on possible values available in 

1038 `Auth Code Flow doc <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-auth-code-flow#request-an-authorization-code>`_ and 

1039 `domain_hint doc <https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-oapx/86fb452d-e34a-494e-ac61-e526e263b6d8>`_. 

1040 :param claims_challenge: 

1041 The claims_challenge parameter requests specific claims requested by the resource provider 

1042 in the form of a claims_challenge directive in the www-authenticate header to be 

1043 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

1044 It is a string of a JSON object which contains lists of claims being requested from these locations. 

1045 

1046 :return: The authorization url as a string. 

1047 """ 

1048 authority = kwargs.pop("authority", None) # Historically we support this 

1049 if authority: 

1050 warnings.warn( 

1051 "We haven't decided if this method will accept authority parameter") 

1052 # The previous implementation is, it will use self.authority by default. 

1053 # Multi-tenant app can use new authority on demand 

1054 the_authority = Authority( 

1055 authority, 

1056 self.http_client, 

1057 instance_discovery=self._instance_discovery, 

1058 ) if authority else self.authority 

1059 

1060 client = _ClientWithCcsRoutingInfo( 

1061 {"authorization_endpoint": the_authority.authorization_endpoint}, 

1062 self.client_id, 

1063 http_client=self.http_client) 

1064 warnings.warn( 

1065 "Change your get_authorization_request_url() " 

1066 "to initiate_auth_code_flow()", DeprecationWarning) 

1067 with warnings.catch_warnings(record=True): 

1068 return client.build_auth_request_uri( 

1069 response_type=response_type, 

1070 redirect_uri=redirect_uri, state=state, login_hint=login_hint, 

1071 prompt=prompt, 

1072 scope=self._decorate_scope(scopes), 

1073 nonce=nonce, 

1074 domain_hint=domain_hint, 

1075 claims=_merge_claims_challenge_and_capabilities( 

1076 self._client_capabilities, claims_challenge), 

1077 ) 

1078 

1079 def acquire_token_by_auth_code_flow( 

1080 self, auth_code_flow, auth_response, scopes=None, **kwargs): 

1081 """Validate the auth response being redirected back, and obtain tokens. 

1082 

1083 It automatically provides nonce protection. 

1084 

1085 :param dict auth_code_flow: 

1086 The same dict returned by :func:`~initiate_auth_code_flow()`. 

1087 :param dict auth_response: 

1088 A dict of the query string received from auth server. 

1089 :param list[str] scopes: 

1090 Scopes requested to access a protected API (a resource). 

1091 

1092 Most of the time, you can leave it empty. 

1093 

1094 If you requested user consent for multiple resources, here you will 

1095 need to provide a subset of what you required in 

1096 :func:`~initiate_auth_code_flow()`. 

1097 

1098 OAuth2 was designed mostly for singleton services, 

1099 where tokens are always meant for the same resource and the only 

1100 changes are in the scopes. 

1101 In Microsoft Entra, tokens can be issued for multiple 3rd party resources. 

1102 You can ask authorization code for multiple resources, 

1103 but when you redeem it, the token is for only one intended 

1104 recipient, called audience. 

1105 So the developer need to specify a scope so that we can restrict the 

1106 token to be issued for the corresponding audience. 

1107 

1108 :return: 

1109 * A dict containing "access_token" and/or "id_token", among others, 

1110 depends on what scope was used. 

1111 (See https://tools.ietf.org/html/rfc6749#section-5.1) 

1112 * A dict containing "error", optionally "error_description", "error_uri". 

1113 (It is either `this <https://tools.ietf.org/html/rfc6749#section-4.1.2.1>`_ 

1114 or `that <https://tools.ietf.org/html/rfc6749#section-5.2>`_) 

1115 * Most client-side data error would result in ValueError exception. 

1116 So the usage pattern could be without any protocol details:: 

1117 

1118 def authorize(): # A controller in a web app 

1119 try: 

1120 result = msal_app.acquire_token_by_auth_code_flow( 

1121 session.get("flow", {}), request.args) 

1122 if "error" in result: 

1123 return render_template("error.html", result) 

1124 use(result) # Token(s) are available in result and cache 

1125 except ValueError: # Usually caused by CSRF 

1126 pass # Simply ignore them 

1127 return redirect(url_for("index")) 

1128 """ 

1129 self._validate_ssh_cert_input_data(kwargs.get("data", {})) 

1130 telemetry_context = self._build_telemetry_context( 

1131 self.ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID) 

1132 response = _clean_up(self.client.obtain_token_by_auth_code_flow( 

1133 auth_code_flow, 

1134 auth_response, 

1135 scope=self._decorate_scope(scopes) if scopes else None, 

1136 headers=telemetry_context.generate_headers(), 

1137 data=dict( 

1138 kwargs.pop("data", {}), 

1139 claims=_merge_claims_challenge_and_capabilities( 

1140 self._client_capabilities, 

1141 auth_code_flow.pop("claims_challenge", None))), 

1142 **kwargs)) 

1143 if "access_token" in response: 

1144 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

1145 telemetry_context.update_telemetry(response) 

1146 return response 

1147 

1148 def acquire_token_by_authorization_code( 

1149 self, 

1150 code, 

1151 scopes, # Syntactically required. STS accepts empty value though. 

1152 redirect_uri=None, 

1153 # REQUIRED, if the "redirect_uri" parameter was included in the 

1154 # authorization request as described in Section 4.1.1, and their 

1155 # values MUST be identical. 

1156 nonce=None, 

1157 claims_challenge=None, 

1158 **kwargs): 

1159 """The second half of the Authorization Code Grant. 

1160 

1161 :param code: The authorization code returned from Authorization Server. 

1162 :param list[str] scopes: (Required) 

1163 Scopes requested to access a protected API (a resource). 

1164 

1165 If you requested user consent for multiple resources, here you will 

1166 typically want to provide a subset of what you required in AuthCode. 

1167 

1168 OAuth2 was designed mostly for singleton services, 

1169 where tokens are always meant for the same resource and the only 

1170 changes are in the scopes. 

1171 In Microsoft Entra, tokens can be issued for multiple 3rd party resources. 

1172 You can ask authorization code for multiple resources, 

1173 but when you redeem it, the token is for only one intended 

1174 recipient, called audience. 

1175 So the developer need to specify a scope so that we can restrict the 

1176 token to be issued for the corresponding audience. 

1177 

1178 :param nonce: 

1179 If you provided a nonce when calling :func:`get_authorization_request_url`, 

1180 same nonce should also be provided here, so that we'll validate it. 

1181 An exception will be raised if the nonce in id token mismatches. 

1182 

1183 :param claims_challenge: 

1184 The claims_challenge parameter requests specific claims requested by the resource provider 

1185 in the form of a claims_challenge directive in the www-authenticate header to be 

1186 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

1187 It is a string of a JSON object which contains lists of claims being requested from these locations. 

1188 

1189 :return: A dict representing the json response from Microsoft Entra: 

1190 

1191 - A successful response would contain "access_token" key, 

1192 - an error response would contain "error" and usually "error_description". 

1193 """ 

1194 # If scope is absent on the wire, STS will give you a token associated 

1195 # to the FIRST scope sent during the authorization request. 

1196 # So in theory, you can omit scope here when you were working with only 

1197 # one scope. But, MSAL decorates your scope anyway, so they are never 

1198 # really empty. 

1199 assert isinstance(scopes, list), "Invalid parameter type" 

1200 self._validate_ssh_cert_input_data(kwargs.get("data", {})) 

1201 warnings.warn( 

1202 "Change your acquire_token_by_authorization_code() " 

1203 "to acquire_token_by_auth_code_flow()", DeprecationWarning) 

1204 with warnings.catch_warnings(record=True): 

1205 telemetry_context = self._build_telemetry_context( 

1206 self.ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID) 

1207 response = _clean_up(self.client.obtain_token_by_authorization_code( 

1208 code, redirect_uri=redirect_uri, 

1209 scope=self._decorate_scope(scopes), 

1210 headers=telemetry_context.generate_headers(), 

1211 data=dict( 

1212 kwargs.pop("data", {}), 

1213 claims=_merge_claims_challenge_and_capabilities( 

1214 self._client_capabilities, claims_challenge)), 

1215 nonce=nonce, 

1216 **kwargs)) 

1217 if "access_token" in response: 

1218 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

1219 telemetry_context.update_telemetry(response) 

1220 return response 

1221 

1222 def get_accounts(self, username=None): 

1223 """Get a list of accounts which previously signed in, i.e. exists in cache. 

1224 

1225 An account can later be used in :func:`~acquire_token_silent` 

1226 to find its tokens. 

1227 

1228 :param username: 

1229 Filter accounts with this username only. Case insensitive. 

1230 :return: A list of account objects. 

1231 Each account is a dict. For now, we only document its "username" field. 

1232 Your app can choose to display those information to end user, 

1233 and allow user to choose one of his/her accounts to proceed. 

1234 """ 

1235 accounts = self._find_msal_accounts(environment=self.authority.instance) 

1236 if not accounts: # Now try other aliases of this authority instance 

1237 for alias in self._get_authority_aliases(self.authority.instance): 

1238 accounts = self._find_msal_accounts(environment=alias) 

1239 if accounts: 

1240 break 

1241 if username: 

1242 # Federated account["username"] from AAD could contain mixed case 

1243 lowercase_username = username.lower() 

1244 accounts = [a for a in accounts 

1245 if a["username"].lower() == lowercase_username] 

1246 if not accounts: 

1247 logger.debug(( # This would also happen when the cache is empty 

1248 "get_accounts(username='{}') finds no account. " 

1249 "If tokens were acquired without 'profile' scope, " 

1250 "they would contain no username for filtering. " 

1251 "Consider calling get_accounts(username=None) instead." 

1252 ).format(username)) 

1253 # Does not further filter by existing RTs here. It probably won't matter. 

1254 # Because in most cases Accounts and RTs co-exist. 

1255 # Even in the rare case when an RT is revoked and then removed, 

1256 # acquire_token_silent() would then yield no result, 

1257 # apps would fall back to other acquire methods. This is the standard pattern. 

1258 return accounts 

1259 

1260 def _find_msal_accounts(self, environment): 

1261 interested_authority_types = [ 

1262 TokenCache.AuthorityType.ADFS, TokenCache.AuthorityType.MSSTS] 

1263 if _is_running_in_cloud_shell(): 

1264 interested_authority_types.append(_AUTHORITY_TYPE_CLOUDSHELL) 

1265 grouped_accounts = { 

1266 a.get("home_account_id"): # Grouped by home tenant's id 

1267 { # These are minimal amount of non-tenant-specific account info 

1268 "home_account_id": a.get("home_account_id"), 

1269 "environment": a.get("environment"), 

1270 "username": a.get("username"), 

1271 "account_source": a.get("account_source"), 

1272 

1273 # The following fields for backward compatibility, for now 

1274 "authority_type": a.get("authority_type"), 

1275 "local_account_id": a.get("local_account_id"), # Tenant-specific 

1276 "realm": a.get("realm"), # Tenant-specific 

1277 } 

1278 for a in self.token_cache.search( 

1279 TokenCache.CredentialType.ACCOUNT, 

1280 query={"environment": environment}) 

1281 if a["authority_type"] in interested_authority_types 

1282 } 

1283 return list(grouped_accounts.values()) 

1284 

1285 def _get_instance_metadata(self): # This exists so it can be mocked in unit test 

1286 resp = self.http_client.get( 

1287 "https://login.microsoftonline.com/common/discovery/instance?api-version=1.1&authorization_endpoint=https://login.microsoftonline.com/common/oauth2/authorize", # TBD: We may extend this to use self._instance_discovery endpoint 

1288 headers={'Accept': 'application/json'}) 

1289 resp.raise_for_status() 

1290 return json.loads(resp.text)['metadata'] 

1291 

1292 def _get_authority_aliases(self, instance): 

1293 if self._instance_discovery is False: 

1294 return [] 

1295 if self.authority._is_known_to_developer: 

1296 # Then it is an ADFS/B2C/known_authority_hosts situation 

1297 # which may not reach the central endpoint, so we skip it. 

1298 return [] 

1299 if not self.authority_groups: 

1300 self.authority_groups = [ 

1301 set(group['aliases']) for group in self._get_instance_metadata()] 

1302 for group in self.authority_groups: 

1303 if instance in group: 

1304 return [alias for alias in group if alias != instance] 

1305 return [] 

1306 

1307 def remove_account(self, account): 

1308 """Sign me out and forget me from token cache""" 

1309 if self._enable_broker: 

1310 from .broker import _signout_silently 

1311 error = _signout_silently(self.client_id, account["local_account_id"]) 

1312 if error: 

1313 logger.debug("_signout_silently() returns error: %s", error) 

1314 # Broker sign-out has been attempted, even if the _forget_me() below throws. 

1315 self._forget_me(account) 

1316 

1317 def _sign_out(self, home_account): 

1318 # Remove all relevant RTs and ATs from token cache 

1319 owned_by_home_account = { 

1320 "environment": home_account["environment"], 

1321 "home_account_id": home_account["home_account_id"],} # realm-independent 

1322 app_metadata = self._get_app_metadata(home_account["environment"]) 

1323 # Remove RTs/FRTs, and they are realm-independent 

1324 for rt in [ # Remove RTs from a static list (rather than from a dynamic generator), 

1325 # to avoid changing self.token_cache while it is being iterated 

1326 rt for rt in self.token_cache.search( 

1327 TokenCache.CredentialType.REFRESH_TOKEN, query=owned_by_home_account) 

1328 # Do RT's app ownership check as a precaution, in case family apps 

1329 # and 3rd-party apps share same token cache, although they should not. 

1330 if rt["client_id"] == self.client_id or ( 

1331 app_metadata.get("family_id") # Now let's settle family business 

1332 and rt.get("family_id") == app_metadata["family_id"]) 

1333 ]: 

1334 self.token_cache.remove_rt(rt) 

1335 for at in list(self.token_cache.search( # Remove ATs from a static list, 

1336 # to avoid changing self.token_cache while it is being iterated 

1337 TokenCache.CredentialType.ACCESS_TOKEN, query=owned_by_home_account, 

1338 # Regardless of realm, b/c we've removed realm-independent RTs anyway 

1339 )): 

1340 # To avoid the complexity of locating sibling family app's AT, 

1341 # we skip AT's app ownership check. 

1342 # It means ATs for other apps will also be removed, it is OK because: 

1343 # * non-family apps are not supposed to share token cache to begin with; 

1344 # * Even if it happens, we keep other app's RT already, so SSO still works 

1345 self.token_cache.remove_at(at) 

1346 

1347 def _forget_me(self, home_account): 

1348 # It implies signout, and then also remove all relevant accounts and IDTs 

1349 self._sign_out(home_account) 

1350 owned_by_home_account = { 

1351 "environment": home_account["environment"], 

1352 "home_account_id": home_account["home_account_id"],} # realm-independent 

1353 for idt in list(self.token_cache.search( # Remove IDTs from a static list, 

1354 # to avoid changing self.token_cache while it is being iterated 

1355 TokenCache.CredentialType.ID_TOKEN, query=owned_by_home_account, # regardless of realm 

1356 )): 

1357 self.token_cache.remove_idt(idt) 

1358 for a in list(self.token_cache.search( # Remove Accounts from a static list, 

1359 # to avoid changing self.token_cache while it is being iterated 

1360 TokenCache.CredentialType.ACCOUNT, query=owned_by_home_account, # regardless of realm 

1361 )): 

1362 self.token_cache.remove_account(a) 

1363 

1364 def _acquire_token_by_cloud_shell(self, scopes, data=None): 

1365 from .cloudshell import _obtain_token 

1366 response = _obtain_token( 

1367 self.http_client, scopes, client_id=self.client_id, data=data) 

1368 if "error" not in response: 

1369 self.token_cache.add(dict( 

1370 client_id=self.client_id, 

1371 scope=response["scope"].split() if "scope" in response else scopes, 

1372 token_endpoint=self.authority.token_endpoint, 

1373 response=response, 

1374 data=data or {}, 

1375 authority_type=_AUTHORITY_TYPE_CLOUDSHELL, 

1376 )) 

1377 if "access_token" in response: 

1378 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_BROKER 

1379 return response 

1380 

1381 def acquire_token_silent( 

1382 self, 

1383 scopes, # type: List[str] 

1384 account, # type: Optional[Account] 

1385 authority=None, # See get_authorization_request_url() 

1386 force_refresh=False, # type: Optional[boolean] 

1387 claims_challenge=None, 

1388 auth_scheme=None, 

1389 **kwargs): 

1390 """Acquire an access token for given account, without user interaction. 

1391 

1392 It has same parameters as the :func:`~acquire_token_silent_with_error`. 

1393 The difference is the behavior of the return value. 

1394 This method will combine the cache empty and refresh error 

1395 into one return value, `None`. 

1396 If your app does not care about the exact token refresh error during 

1397 token cache look-up, then this method is easier and recommended. 

1398 

1399 :return: 

1400 - A dict containing no "error" key, 

1401 and typically contains an "access_token" key, 

1402 if cache lookup succeeded. 

1403 - None when cache lookup does not yield a token. 

1404 """ 

1405 if not account: 

1406 return None # A backward-compatible NO-OP to drop the account=None usage 

1407 result = _clean_up(self._acquire_token_silent_with_error( 

1408 scopes, account, authority=authority, force_refresh=force_refresh, 

1409 claims_challenge=claims_challenge, auth_scheme=auth_scheme, **kwargs)) 

1410 return result if result and "error" not in result else None 

1411 

1412 def acquire_token_silent_with_error( 

1413 self, 

1414 scopes, # type: List[str] 

1415 account, # type: Optional[Account] 

1416 authority=None, # See get_authorization_request_url() 

1417 force_refresh=False, # type: Optional[boolean] 

1418 claims_challenge=None, 

1419 auth_scheme=None, 

1420 **kwargs): 

1421 """Acquire an access token for given account, without user interaction. 

1422 

1423 It is done either by finding a valid access token from cache, 

1424 or by finding a valid refresh token from cache and then automatically 

1425 use it to redeem a new access token. 

1426 

1427 This method will differentiate cache empty from token refresh error. 

1428 If your app cares the exact token refresh error during 

1429 token cache look-up, then this method is suitable. 

1430 Otherwise, the other method :func:`~acquire_token_silent` is recommended. 

1431 

1432 :param list[str] scopes: (Required) 

1433 Scopes requested to access a protected API (a resource). 

1434 :param account: (Required) 

1435 One of the account object returned by :func:`~get_accounts`. 

1436 Starting from MSAL Python 1.23, 

1437 a ``None`` input will become a NO-OP and always return ``None``. 

1438 :param force_refresh: 

1439 If True, it will skip Access Token look-up, 

1440 and try to find a Refresh Token to obtain a new Access Token. 

1441 :param claims_challenge: 

1442 The claims_challenge parameter requests specific claims requested by the resource provider 

1443 in the form of a claims_challenge directive in the www-authenticate header to be 

1444 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

1445 It is a string of a JSON object which contains lists of claims being requested from these locations. 

1446 :param object auth_scheme: 

1447 You can provide an ``msal.auth_scheme.PopAuthScheme`` object 

1448 so that MSAL will get a Proof-of-Possession (POP) token for you. 

1449 

1450 New in version 1.26.0. 

1451 

1452 :return: 

1453 - A dict containing no "error" key, 

1454 and typically contains an "access_token" key, 

1455 if cache lookup succeeded. 

1456 - None when there is simply no token in the cache. 

1457 - A dict containing an "error" key, when token refresh failed. 

1458 """ 

1459 if not account: 

1460 return None # A backward-compatible NO-OP to drop the account=None usage 

1461 return _clean_up(self._acquire_token_silent_with_error( 

1462 scopes, account, authority=authority, force_refresh=force_refresh, 

1463 claims_challenge=claims_challenge, auth_scheme=auth_scheme, **kwargs)) 

1464 

1465 def _acquire_token_silent_with_error( 

1466 self, 

1467 scopes, # type: List[str] 

1468 account, # type: Optional[Account] 

1469 authority=None, # See get_authorization_request_url() 

1470 force_refresh=False, # type: Optional[boolean] 

1471 claims_challenge=None, 

1472 auth_scheme=None, 

1473 **kwargs): 

1474 assert isinstance(scopes, list), "Invalid parameter type" 

1475 self._validate_ssh_cert_input_data(kwargs.get("data", {})) 

1476 correlation_id = msal.telemetry._get_new_correlation_id() 

1477 if authority: 

1478 warnings.warn("We haven't decided how/if this method will accept authority parameter") 

1479 # the_authority = Authority( 

1480 # authority, 

1481 # self.http_client, 

1482 # instance_discovery=self._instance_discovery, 

1483 # ) if authority else self.authority 

1484 result = self._acquire_token_silent_from_cache_and_possibly_refresh_it( 

1485 scopes, account, self.authority, force_refresh=force_refresh, 

1486 claims_challenge=claims_challenge, 

1487 correlation_id=correlation_id, 

1488 auth_scheme=auth_scheme, 

1489 **kwargs) 

1490 if result and "error" not in result: 

1491 return result 

1492 final_result = result 

1493 for alias in self._get_authority_aliases(self.authority.instance): 

1494 if not list(self.token_cache.search( # Need a list to test emptiness 

1495 self.token_cache.CredentialType.REFRESH_TOKEN, 

1496 # target=scopes, # MUST NOT filter by scopes, because: 

1497 # 1. AAD RTs are scope-independent; 

1498 # 2. therefore target is optional per schema; 

1499 query={"environment": alias})): 

1500 # Skip heavy weight logic when RT for this alias doesn't exist 

1501 continue 

1502 the_authority = Authority( 

1503 "https://" + alias + "/" + self.authority.tenant, 

1504 self.http_client, 

1505 instance_discovery=False, 

1506 ) 

1507 result = self._acquire_token_silent_from_cache_and_possibly_refresh_it( 

1508 scopes, account, the_authority, force_refresh=force_refresh, 

1509 claims_challenge=claims_challenge, 

1510 correlation_id=correlation_id, 

1511 auth_scheme=auth_scheme, 

1512 **kwargs) 

1513 if result: 

1514 if "error" not in result: 

1515 return result 

1516 final_result = result 

1517 if final_result and final_result.get("suberror"): 

1518 final_result["classification"] = { # Suppress these suberrors, per #57 

1519 "bad_token": "", 

1520 "token_expired": "", 

1521 "protection_policy_required": "", 

1522 "client_mismatch": "", 

1523 "device_authentication_failed": "", 

1524 }.get(final_result["suberror"], final_result["suberror"]) 

1525 return final_result 

1526 

1527 def _acquire_token_silent_from_cache_and_possibly_refresh_it( 

1528 self, 

1529 scopes, # type: List[str] 

1530 account, # type: Optional[Account] 

1531 authority, # This can be different than self.authority 

1532 force_refresh=False, # type: Optional[boolean] 

1533 claims_challenge=None, 

1534 correlation_id=None, 

1535 http_exceptions=None, 

1536 auth_scheme=None, 

1537 **kwargs): 

1538 # This internal method has two calling patterns: 

1539 # it accepts a non-empty account to find token for a user, 

1540 # and accepts account=None to find a token for the current app. 

1541 access_token_from_cache = None 

1542 if not (force_refresh or claims_challenge or auth_scheme): # Then attempt AT cache 

1543 query={ 

1544 "client_id": self.client_id, 

1545 "environment": authority.instance, 

1546 "realm": authority.tenant, 

1547 "home_account_id": (account or {}).get("home_account_id"), 

1548 } 

1549 key_id = kwargs.get("data", {}).get("key_id") 

1550 if key_id: # Some token types (SSH-certs, POP) are bound to a key 

1551 query["key_id"] = key_id 

1552 now = time.time() 

1553 refresh_reason = msal.telemetry.AT_ABSENT 

1554 for entry in self.token_cache.search( # A generator allows us to 

1555 # break early in cache-hit without finding a full list 

1556 self.token_cache.CredentialType.ACCESS_TOKEN, 

1557 target=scopes, 

1558 query=query, 

1559 ): # This loop is about token search, not about token deletion. 

1560 # Note that search() holds a lock during this loop; 

1561 # that is fine because this loop is fast 

1562 expires_in = int(entry["expires_on"]) - now 

1563 if expires_in < 5*60: # Then consider it expired 

1564 refresh_reason = msal.telemetry.AT_EXPIRED 

1565 continue # Removal is not necessary, it will be overwritten 

1566 logger.debug("Cache hit an AT") 

1567 access_token_from_cache = { # Mimic a real response 

1568 "access_token": entry["secret"], 

1569 "token_type": entry.get("token_type", "Bearer"), 

1570 "expires_in": int(expires_in), # OAuth2 specs defines it as int 

1571 self._TOKEN_SOURCE: self._TOKEN_SOURCE_CACHE, 

1572 } 

1573 if "refresh_on" in entry: 

1574 access_token_from_cache["refresh_on"] = int(entry["refresh_on"]) 

1575 if int(entry["refresh_on"]) < now: # aging 

1576 refresh_reason = msal.telemetry.AT_AGING 

1577 break # With a fallback in hand, we break here to go refresh 

1578 self._build_telemetry_context(-1).hit_an_access_token() 

1579 return access_token_from_cache # It is still good as new 

1580 else: 

1581 refresh_reason = msal.telemetry.FORCE_REFRESH # TODO: It could also mean claims_challenge 

1582 assert refresh_reason, "It should have been established at this point" 

1583 if not http_exceptions: # It can be a tuple of exceptions 

1584 # The exact HTTP exceptions are transportation-layer dependent 

1585 from requests.exceptions import RequestException # Lazy load 

1586 http_exceptions = (RequestException,) 

1587 try: 

1588 data = kwargs.get("data", {}) 

1589 if account and account.get("authority_type") == _AUTHORITY_TYPE_CLOUDSHELL: 

1590 if auth_scheme: 

1591 raise ValueError("auth_scheme is not supported in Cloud Shell") 

1592 return self._acquire_token_by_cloud_shell(scopes, data=data) 

1593 

1594 is_ssh_cert_or_pop_request = _is_ssh_cert_or_pop_request(data.get("token_type"), auth_scheme) 

1595 

1596 if self._enable_broker and account and account.get("account_source") in ( 

1597 _GRANT_TYPE_BROKER, # Broker successfully established this account previously. 

1598 None, # Unknown data from older MSAL. Broker might still work. 

1599 ) and (sys.platform in ("win32", "darwin") or not is_ssh_cert_or_pop_request): 

1600 from .broker import _acquire_token_silently 

1601 response = _acquire_token_silently( 

1602 "https://{}/{}".format(self.authority.instance, self.authority.tenant), 

1603 self.client_id, 

1604 account["local_account_id"], 

1605 scopes, 

1606 claims=_merge_claims_challenge_and_capabilities( 

1607 self._client_capabilities, claims_challenge), 

1608 correlation_id=correlation_id, 

1609 auth_scheme=auth_scheme, 

1610 **data) 

1611 if response: # Broker provides a decisive outcome 

1612 account_was_established_by_broker = account.get( 

1613 "account_source") == _GRANT_TYPE_BROKER 

1614 broker_attempt_succeeded_just_now = "error" not in response 

1615 if account_was_established_by_broker or broker_attempt_succeeded_just_now: 

1616 return self._process_broker_response(response, scopes, data) 

1617 

1618 if auth_scheme: 

1619 raise ValueError(self._AUTH_SCHEME_UNSUPPORTED) 

1620 if account: 

1621 result = self._acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family( 

1622 authority, self._decorate_scope(scopes), account, 

1623 refresh_reason=refresh_reason, claims_challenge=claims_challenge, 

1624 correlation_id=correlation_id, 

1625 **kwargs) 

1626 else: # The caller is acquire_token_for_client() 

1627 result = self._acquire_token_for_client( 

1628 scopes, refresh_reason, claims_challenge=claims_challenge, 

1629 **kwargs) 

1630 if result and "access_token" in result: 

1631 result[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

1632 if (result and "error" not in result) or (not access_token_from_cache): 

1633 return result 

1634 except http_exceptions: 

1635 # Typically network error. Potential AAD outage? 

1636 if not access_token_from_cache: # It means there is no fall back option 

1637 raise # We choose to bubble up the exception 

1638 return access_token_from_cache 

1639 

1640 def _process_broker_response(self, response, scopes, data): 

1641 if "error" not in response: 

1642 self.token_cache.add(dict( 

1643 client_id=self.client_id, 

1644 scope=response["scope"].split() if "scope" in response else scopes, 

1645 token_endpoint=self.authority.token_endpoint, 

1646 response=response, 

1647 data=data, 

1648 _account_id=response["_account_id"], 

1649 environment=self.authority.instance, # Be consistent with non-broker flows 

1650 grant_type=_GRANT_TYPE_BROKER, # A pseudo grant type for TokenCache to mark account_source as broker 

1651 )) 

1652 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_BROKER 

1653 return _clean_up(response) 

1654 

1655 def _acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family( 

1656 self, authority, scopes, account, **kwargs): 

1657 query = { 

1658 "environment": authority.instance, 

1659 "home_account_id": (account or {}).get("home_account_id"), 

1660 # "realm": authority.tenant, # AAD RTs are tenant-independent 

1661 } 

1662 app_metadata = self._get_app_metadata(authority.instance) 

1663 if not app_metadata: # Meaning this app is now used for the first time. 

1664 # When/if we have a way to directly detect current app's family, 

1665 # we'll rewrite this block, to support multiple families. 

1666 # For now, we try existing RTs (*). If it works, we are in that family. 

1667 # (*) RTs of a different app/family are not supposed to be 

1668 # shared with or accessible by us in the first place. 

1669 at = self._acquire_token_silent_by_finding_specific_refresh_token( 

1670 authority, scopes, 

1671 dict(query, family_id="1"), # A hack, we have only 1 family for now 

1672 rt_remover=lambda rt_item: None, # NO-OP b/c RTs are likely not mine 

1673 break_condition=lambda response: # Break loop when app not in family 

1674 # Based on an AAD-only behavior mentioned in internal doc here 

1675 # https://msazure.visualstudio.com/One/_git/ESTS-Docs/pullrequest/1138595 

1676 "client_mismatch" in response.get("error_additional_info", []), 

1677 **kwargs) 

1678 if at and "error" not in at: 

1679 return at 

1680 last_resp = None 

1681 if app_metadata.get("family_id"): # Meaning this app belongs to this family 

1682 last_resp = at = self._acquire_token_silent_by_finding_specific_refresh_token( 

1683 authority, scopes, dict(query, family_id=app_metadata["family_id"]), 

1684 **kwargs) 

1685 if at and "error" not in at: 

1686 return at 

1687 # Either this app is an orphan, so we will naturally use its own RT; 

1688 # or all attempts above have failed, so we fall back to non-foci behavior. 

1689 return self._acquire_token_silent_by_finding_specific_refresh_token( 

1690 authority, scopes, dict(query, client_id=self.client_id), 

1691 **kwargs) or last_resp 

1692 

1693 def _get_app_metadata(self, environment): 

1694 return self.token_cache._get_app_metadata( 

1695 environment=environment, client_id=self.client_id, default={}) 

1696 

1697 def _acquire_token_silent_by_finding_specific_refresh_token( 

1698 self, authority, scopes, query, 

1699 rt_remover=None, break_condition=lambda response: False, 

1700 refresh_reason=None, correlation_id=None, claims_challenge=None, 

1701 **kwargs): 

1702 matches = list(self.token_cache.search( # We want a list to test emptiness 

1703 self.token_cache.CredentialType.REFRESH_TOKEN, 

1704 # target=scopes, # AAD RTs are scope-independent 

1705 query=query)) 

1706 logger.debug("Found %d RTs matching %s", len(matches), { 

1707 k: _pii_less_home_account_id(v) if k == "home_account_id" and v else v 

1708 for k, v in query.items() 

1709 }) 

1710 

1711 response = None # A distinguishable value to mean cache is empty 

1712 if not matches: # Then exit early to avoid expensive operations 

1713 return response 

1714 client, _ = self._build_client( 

1715 # Potentially expensive if building regional client 

1716 self.client_credential, authority, skip_regional_client=True) 

1717 telemetry_context = self._build_telemetry_context( 

1718 self.ACQUIRE_TOKEN_SILENT_ID, 

1719 correlation_id=correlation_id, refresh_reason=refresh_reason) 

1720 for entry in sorted( # Since unfit RTs would not be aggressively removed, 

1721 # we start from newer RTs which are more likely fit. 

1722 matches, 

1723 key=lambda e: int(e.get("last_modification_time", "0")), 

1724 reverse=True): 

1725 logger.debug("Cache attempts an RT") 

1726 headers = telemetry_context.generate_headers() 

1727 if query.get("home_account_id"): # Then use it as CCS Routing info 

1728 headers["X-AnchorMailbox"] = "Oid:{}".format( # case-insensitive value 

1729 query["home_account_id"].replace(".", "@")) 

1730 response = client.obtain_token_by_refresh_token( 

1731 entry, rt_getter=lambda token_item: token_item["secret"], 

1732 on_removing_rt=lambda rt_item: None, # Disable RT removal, 

1733 # because an invalid_grant could be caused by new MFA policy, 

1734 # the RT could still be useful for other MFA-less scope or tenant 

1735 on_obtaining_tokens=lambda event: self.token_cache.add(dict( 

1736 event, 

1737 environment=authority.instance, 

1738 skip_account_creation=True, # To honor a concurrent remove_account() 

1739 )), 

1740 scope=scopes, 

1741 headers=headers, 

1742 data=dict( 

1743 kwargs.pop("data", {}), 

1744 claims=_merge_claims_challenge_and_capabilities( 

1745 self._client_capabilities, claims_challenge)), 

1746 **kwargs) 

1747 telemetry_context.update_telemetry(response) 

1748 if "error" not in response: 

1749 return response 

1750 logger.debug("Refresh failed. {error}: {error_description}".format( 

1751 error=response.get("error"), 

1752 error_description=response.get("error_description"), 

1753 )) 

1754 if break_condition(response): 

1755 break 

1756 return response # Returns the latest error (if any), or just None 

1757 

1758 def _validate_ssh_cert_input_data(self, data): 

1759 if data.get("token_type") == "ssh-cert": 

1760 if not data.get("req_cnf"): 

1761 raise ValueError( 

1762 "When requesting an SSH certificate, " 

1763 "you must include a string parameter named 'req_cnf' " 

1764 "containing the public key in JWK format " 

1765 "(https://tools.ietf.org/html/rfc7517).") 

1766 if not data.get("key_id"): 

1767 raise ValueError( 

1768 "When requesting an SSH certificate, " 

1769 "you must include a string parameter named 'key_id' " 

1770 "which identifies the key in the 'req_cnf' argument.") 

1771 

1772 def acquire_token_by_refresh_token(self, refresh_token, scopes, **kwargs): 

1773 """Acquire token(s) based on a refresh token (RT) obtained from elsewhere. 

1774 

1775 You use this method only when you have old RTs from elsewhere, 

1776 and now you want to migrate them into MSAL. 

1777 Calling this method results in new tokens automatically storing into MSAL. 

1778 

1779 You do NOT need to use this method if you are already using MSAL. 

1780 MSAL maintains RT automatically inside its token cache, 

1781 and an access token can be retrieved 

1782 when you call :func:`~acquire_token_silent`. 

1783 

1784 :param str refresh_token: The old refresh token, as a string. 

1785 

1786 :param list scopes: 

1787 The scopes associate with this old RT. 

1788 Each scope needs to be in the Microsoft identity platform (v2) format. 

1789 See `Scopes not resources <https://docs.microsoft.com/en-us/azure/active-directory/develop/migrate-python-adal-msal#scopes-not-resources>`_. 

1790 

1791 :return: 

1792 * A dict contains "error" and some other keys, when error happened. 

1793 * A dict contains no "error" key means migration was successful. 

1794 """ 

1795 self._validate_ssh_cert_input_data(kwargs.get("data", {})) 

1796 telemetry_context = self._build_telemetry_context( 

1797 self.ACQUIRE_TOKEN_BY_REFRESH_TOKEN, 

1798 refresh_reason=msal.telemetry.FORCE_REFRESH) 

1799 response = _clean_up(self.client.obtain_token_by_refresh_token( 

1800 refresh_token, 

1801 scope=self._decorate_scope(scopes), 

1802 headers=telemetry_context.generate_headers(), 

1803 rt_getter=lambda rt: rt, 

1804 on_updating_rt=False, 

1805 on_removing_rt=lambda rt_item: None, # No OP 

1806 **kwargs)) 

1807 if "access_token" in response: 

1808 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

1809 telemetry_context.update_telemetry(response) 

1810 return response 

1811 

1812 def acquire_token_by_username_password( 

1813 self, username, password, scopes, claims_challenge=None, 

1814 # Note: We shouldn't need to surface enable_msa_passthrough, 

1815 # because this ROPC won't work with MSA account anyway. 

1816 auth_scheme=None, 

1817 **kwargs): 

1818 """Gets a token for a given resource via user credentials. 

1819 

1820 See this page for constraints of Username Password Flow. 

1821 https://github.com/AzureAD/microsoft-authentication-library-for-python/wiki/Username-Password-Authentication 

1822 

1823 :param str username: Typically a UPN in the form of an email address. 

1824 :param str password: The password. 

1825 :param list[str] scopes: 

1826 Scopes requested to access a protected API (a resource). 

1827 :param claims_challenge: 

1828 The claims_challenge parameter requests specific claims requested by the resource provider 

1829 in the form of a claims_challenge directive in the www-authenticate header to be 

1830 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

1831 It is a string of a JSON object which contains lists of claims being requested from these locations. 

1832 

1833 :param object auth_scheme: 

1834 You can provide an ``msal.auth_scheme.PopAuthScheme`` object 

1835 so that MSAL will get a Proof-of-Possession (POP) token for you. 

1836 

1837 New in version 1.26.0. 

1838 

1839 :return: A dict representing the json response from Microsoft Entra: 

1840 

1841 - A successful response would contain "access_token" key, 

1842 - an error response would contain "error" and usually "error_description". 

1843 """ 

1844 claims = _merge_claims_challenge_and_capabilities( 

1845 self._client_capabilities, claims_challenge) 

1846 if self._enable_broker and sys.platform in ("win32", "darwin"): 

1847 from .broker import _signin_silently 

1848 response = _signin_silently( 

1849 "https://{}/{}".format(self.authority.instance, self.authority.tenant), 

1850 self.client_id, 

1851 scopes, # Decorated scopes won't work due to offline_access 

1852 MSALRuntime_Username=username, 

1853 MSALRuntime_Password=password, 

1854 validateAuthority="no" if ( 

1855 self.authority._is_known_to_developer 

1856 or self._instance_discovery is False) else None, 

1857 claims=claims, 

1858 auth_scheme=auth_scheme, 

1859 ) 

1860 return self._process_broker_response(response, scopes, kwargs.get("data", {})) 

1861 

1862 if auth_scheme: 

1863 raise ValueError(self._AUTH_SCHEME_UNSUPPORTED) 

1864 scopes = self._decorate_scope(scopes) 

1865 telemetry_context = self._build_telemetry_context( 

1866 self.ACQUIRE_TOKEN_BY_USERNAME_PASSWORD_ID) 

1867 headers = telemetry_context.generate_headers() 

1868 data = dict(kwargs.pop("data", {}), claims=claims) 

1869 response = None 

1870 if not self.authority.is_adfs: 

1871 user_realm_result = self.authority.user_realm_discovery( 

1872 username, correlation_id=headers[msal.telemetry.CLIENT_REQUEST_ID]) 

1873 if user_realm_result.get("account_type") == "Federated": 

1874 response = _clean_up(self._acquire_token_by_username_password_federated( 

1875 user_realm_result, username, password, scopes=scopes, 

1876 data=data, 

1877 headers=headers, **kwargs)) 

1878 if response is None: # Either ADFS or not federated 

1879 response = _clean_up(self.client.obtain_token_by_username_password( 

1880 username, password, scope=scopes, 

1881 headers=headers, 

1882 data=data, 

1883 **kwargs)) 

1884 if "access_token" in response: 

1885 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

1886 telemetry_context.update_telemetry(response) 

1887 return response 

1888 

1889 def _acquire_token_by_username_password_federated( 

1890 self, user_realm_result, username, password, scopes=None, **kwargs): 

1891 wstrust_endpoint = {} 

1892 if user_realm_result.get("federation_metadata_url"): 

1893 wstrust_endpoint = mex_send_request( 

1894 user_realm_result["federation_metadata_url"], 

1895 self.http_client) 

1896 if wstrust_endpoint is None: 

1897 raise ValueError("Unable to find wstrust endpoint from MEX. " 

1898 "This typically happens when attempting MSA accounts. " 

1899 "More details available here. " 

1900 "https://github.com/AzureAD/microsoft-authentication-library-for-python/wiki/Username-Password-Authentication") 

1901 logger.debug("wstrust_endpoint = %s", wstrust_endpoint) 

1902 wstrust_result = wst_send_request( 

1903 username, password, 

1904 user_realm_result.get("cloud_audience_urn", "urn:federation:MicrosoftOnline"), 

1905 wstrust_endpoint.get("address", 

1906 # Fallback to an AAD supplied endpoint 

1907 user_realm_result.get("federation_active_auth_url")), 

1908 wstrust_endpoint.get("action"), self.http_client) 

1909 if not ("token" in wstrust_result and "type" in wstrust_result): 

1910 raise RuntimeError("Unsuccessful RSTR. %s" % wstrust_result) 

1911 GRANT_TYPE_SAML1_1 = 'urn:ietf:params:oauth:grant-type:saml1_1-bearer' 

1912 grant_type = { 

1913 SAML_TOKEN_TYPE_V1: GRANT_TYPE_SAML1_1, 

1914 SAML_TOKEN_TYPE_V2: self.client.GRANT_TYPE_SAML2, 

1915 WSS_SAML_TOKEN_PROFILE_V1_1: GRANT_TYPE_SAML1_1, 

1916 WSS_SAML_TOKEN_PROFILE_V2: self.client.GRANT_TYPE_SAML2 

1917 }.get(wstrust_result.get("type")) 

1918 if not grant_type: 

1919 raise RuntimeError( 

1920 "RSTR returned unknown token type: %s", wstrust_result.get("type")) 

1921 self.client.grant_assertion_encoders.setdefault( # Register a non-standard type 

1922 grant_type, self.client.encode_saml_assertion) 

1923 return self.client.obtain_token_by_assertion( 

1924 wstrust_result["token"], grant_type, scope=scopes, 

1925 on_obtaining_tokens=lambda event: self.token_cache.add(dict( 

1926 event, 

1927 environment=self.authority.instance, 

1928 username=username, # Useful in case IDT contains no such info 

1929 )), 

1930 **kwargs) 

1931 

1932 

1933class PublicClientApplication(ClientApplication): # browser app or mobile app 

1934 

1935 DEVICE_FLOW_CORRELATION_ID = "_correlation_id" 

1936 CONSOLE_WINDOW_HANDLE = object() 

1937 

1938 def __init__( 

1939 self, client_id, client_credential=None, 

1940 *, 

1941 enable_broker_on_windows=None, 

1942 enable_broker_on_mac=None, 

1943 enable_broker_on_linux=None, 

1944 enable_broker_on_wsl=None, 

1945 **kwargs): 

1946 """Same as :func:`ClientApplication.__init__`, 

1947 except that ``client_credential`` parameter shall remain ``None``. 

1948 

1949 .. note:: 

1950 

1951 **What is a broker, and why use it?** 

1952 

1953 A broker is a component installed on your device. 

1954 Broker implicitly gives your device an identity. By using a broker, 

1955 your device becomes a factor that can satisfy MFA (Multi-factor authentication). 

1956 This factor would become mandatory 

1957 if a tenant's admin enables a corresponding Conditional Access (CA) policy. 

1958 The broker's presence allows Microsoft identity platform 

1959 to have higher confidence that the tokens are being issued to your device, 

1960 and that is more secure. 

1961 

1962 An additional benefit of broker is, 

1963 it runs as a long-lived process with your device's OS, 

1964 and maintains its own cache, 

1965 so that your broker-enabled apps (even a CLI) 

1966 could automatically SSO from a previously established signed-in session. 

1967 

1968 **How to opt in to use broker?** 

1969 

1970 1. You can set any combination of the following opt-in parameters to true: 

1971 

1972 +--------------------------+-----------------------------------+------------------------------------------------------------------------------------+ 

1973 | Opt-in flag | If app will run on | App has registered this as a Desktop platform redirect URI in Azure Portal | 

1974 +==========================+===================================+====================================================================================+ 

1975 | enable_broker_on_windows | Windows 10+ | ms-appx-web://Microsoft.AAD.BrokerPlugin/your_client_id | 

1976 +--------------------------+-----------------------------------+------------------------------------------------------------------------------------+ 

1977 | enable_broker_on_wsl | WSL | ms-appx-web://Microsoft.AAD.BrokerPlugin/your_client_id | 

1978 +--------------------------+-----------------------------------+------------------------------------------------------------------------------------+ 

1979 | enable_broker_on_mac | Mac with Company Portal installed | msauth.com.msauth.unsignedapp://auth | 

1980 +--------------------------+-----------------------------------+------------------------------------------------------------------------------------+ 

1981 | enable_broker_on_linux | Linux with Intune installed | ``https://login.microsoftonline.com/common/oauth2/nativeclient`` (MUST be enabled) | 

1982 +--------------------------+-----------------------------------+------------------------------------------------------------------------------------+ 

1983 

1984 2. Install broker dependency, 

1985 e.g. ``pip install msal[broker]>=1.33,<2``. 

1986 

1987 3. Test with ``acquire_token_interactive()`` and ``acquire_token_silent()``. 

1988 

1989 **The fallback behaviors of MSAL Python's broker support** 

1990 

1991 MSAL will either error out, or silently fallback to non-broker flows. 

1992 

1993 1. MSAL will ignore the `enable_broker_...` and bypass broker 

1994 on those auth flows that are known to be NOT supported by broker. 

1995 This includes ADFS, B2C, etc.. 

1996 For other "could-use-broker" scenarios, please see below. 

1997 2. MSAL errors out when app developer opted-in to use broker 

1998 but a direct dependency "mid-tier" package is not installed. 

1999 Error message guides app developer to declare the correct dependency 

2000 ``msal[broker]``. 

2001 We error out here because the error is actionable to app developers. 

2002 3. MSAL silently "deactivates" the broker and fallback to non-broker, 

2003 when opted-in, dependency installed yet failed to initialize. 

2004 We anticipate this would happen on a device whose OS is too old 

2005 or the underlying broker component is somehow unavailable. 

2006 There is not much an app developer or the end user can do here. 

2007 Eventually, the conditional access policy shall 

2008 force the user to switch to a different device. 

2009 4. MSAL errors out when broker is opted in, installed, initialized, 

2010 but subsequent token request(s) failed. 

2011 

2012 :param boolean enable_broker_on_windows: 

2013 This setting is only effective if your app is running on Windows 10+. 

2014 This parameter defaults to None, which means MSAL will not utilize a broker. 

2015 

2016 New in MSAL Python 1.25.0. 

2017 

2018 :param boolean enable_broker_on_mac: 

2019 This setting is only effective if your app is running on Mac. 

2020 This parameter defaults to None, which means MSAL will not utilize a broker. 

2021 

2022 New in MSAL Python 1.31.0. 

2023 

2024 :param boolean enable_broker_on_linux: 

2025 This setting is only effective if your app is running on Linux, including WSL. 

2026 This parameter defaults to None, which means MSAL will not utilize a broker. 

2027 

2028 New in MSAL Python 1.33.0. 

2029 

2030 :param boolean enable_broker_on_wsl: 

2031 This setting is only effective if your app is running on WSL. 

2032 This parameter defaults to None, which means MSAL will not utilize a broker. 

2033 

2034 New in MSAL Python 1.33.0. 

2035 """ 

2036 if client_credential is not None: 

2037 raise ValueError("Public Client should not possess credentials") 

2038 

2039 self._enable_broker = bool( 

2040 enable_broker_on_windows and sys.platform == "win32" 

2041 or enable_broker_on_mac and sys.platform == "darwin" 

2042 or enable_broker_on_linux and sys.platform == "linux" 

2043 or enable_broker_on_wsl and is_wsl() 

2044 ) 

2045 

2046 super(PublicClientApplication, self).__init__( 

2047 client_id, client_credential=None, **kwargs) 

2048 

2049 def acquire_token_interactive( 

2050 self, 

2051 scopes, # type: list[str] 

2052 prompt=None, 

2053 login_hint=None, # type: Optional[str] 

2054 domain_hint=None, # type: Optional[str] 

2055 claims_challenge=None, 

2056 timeout=None, 

2057 port=None, 

2058 extra_scopes_to_consent=None, 

2059 max_age=None, 

2060 parent_window_handle=None, 

2061 on_before_launching_ui=None, 

2062 auth_scheme=None, 

2063 **kwargs): 

2064 """Acquire token interactively i.e. via a local browser. 

2065 

2066 Prerequisite: In Azure Portal, configure the Redirect URI of your 

2067 "Mobile and Desktop application" as ``http://localhost``. 

2068 If you opts in to use broker during ``PublicClientApplication`` creation, 

2069 your app also need this Redirect URI: 

2070 ``ms-appx-web://Microsoft.AAD.BrokerPlugin/YOUR_CLIENT_ID`` 

2071 

2072 :param list scopes: 

2073 It is a list of case-sensitive strings. 

2074 :param str prompt: 

2075 By default, no prompt value will be sent, not even string ``"none"``. 

2076 You will have to specify a value explicitly. 

2077 Its valid values are the constants defined in 

2078 :class:`Prompt <msal.Prompt>`. 

2079 :param str login_hint: 

2080 Optional. Identifier of the user. Generally a User Principal Name (UPN). 

2081 :param domain_hint: 

2082 Can be one of "consumers" or "organizations" or your tenant domain "contoso.com". 

2083 If included, it will skip the email-based discovery process that user goes 

2084 through on the sign-in page, leading to a slightly more streamlined user experience. 

2085 More information on possible values available in 

2086 `Auth Code Flow doc <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-auth-code-flow#request-an-authorization-code>`_ and 

2087 `domain_hint doc <https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-oapx/86fb452d-e34a-494e-ac61-e526e263b6d8>`_. 

2088 

2089 :param claims_challenge: 

2090 The claims_challenge parameter requests specific claims requested by the resource provider 

2091 in the form of a claims_challenge directive in the www-authenticate header to be 

2092 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

2093 It is a string of a JSON object which contains lists of claims being requested from these locations. 

2094 

2095 :param int timeout: 

2096 This method will block the current thread. 

2097 This parameter specifies the timeout value in seconds. 

2098 Default value ``None`` means wait indefinitely. 

2099 

2100 :param int port: 

2101 The port to be used to listen to an incoming auth response. 

2102 By default we will use a system-allocated port. 

2103 (The rest of the redirect_uri is hard coded as ``http://localhost``.) 

2104 

2105 :param list extra_scopes_to_consent: 

2106 "Extra scopes to consent" is a concept only available in Microsoft Entra. 

2107 It refers to other resources you might want to prompt to consent for, 

2108 in the same interaction, but for which you won't get back a 

2109 token for in this particular operation. 

2110 

2111 :param int max_age: 

2112 OPTIONAL. Maximum Authentication Age. 

2113 Specifies the allowable elapsed time in seconds 

2114 since the last time the End-User was actively authenticated. 

2115 If the elapsed time is greater than this value, 

2116 Microsoft identity platform will actively re-authenticate the End-User. 

2117 

2118 MSAL Python will also automatically validate the auth_time in ID token. 

2119 

2120 New in version 1.15. 

2121 

2122 :param int parent_window_handle: 

2123 OPTIONAL. 

2124 

2125 * If your app does not opt in to use broker, 

2126 you do not need to provide a ``parent_window_handle`` here. 

2127 

2128 * If your app opts in to use broker, 

2129 ``parent_window_handle`` is required. 

2130 

2131 - If your app is a GUI app running on Windows or Mac system, 

2132 you are required to also provide its window handle, 

2133 so that the sign-in window will pop up on top of your window. 

2134 - If your app is a console app running on Windows or Mac system, 

2135 you can use a placeholder 

2136 ``PublicClientApplication.CONSOLE_WINDOW_HANDLE``. 

2137 

2138 Most Python scripts are console apps. 

2139 

2140 New in version 1.20.0. 

2141 

2142 :param function on_before_launching_ui: 

2143 A callback with the form of 

2144 ``lambda ui="xyz", **kwargs: print("A {} will be launched".format(ui))``, 

2145 where ``ui`` will be either "browser" or "broker". 

2146 You can use it to inform your end user to expect a pop-up window. 

2147 

2148 New in version 1.20.0. 

2149 

2150 :param object auth_scheme: 

2151 You can provide an ``msal.auth_scheme.PopAuthScheme`` object 

2152 so that MSAL will get a Proof-of-Possession (POP) token for you. 

2153 

2154 New in version 1.26.0. 

2155 

2156 :return: 

2157 - A dict containing no "error" key, 

2158 and typically contains an "access_token" key. 

2159 - A dict containing an "error" key, when token refresh failed. 

2160 """ 

2161 data = kwargs.pop("data", {}) 

2162 enable_msa_passthrough = kwargs.pop( # MUST remove it from kwargs 

2163 "enable_msa_passthrough", # Keep it as a hidden param, for now. 

2164 # OPTIONAL. MSA-Passthrough is a legacy configuration, 

2165 # needed by a small amount of Microsoft first-party apps, 

2166 # which would login MSA accounts via ".../organizations" authority. 

2167 # If you app belongs to this category, AND you are enabling broker, 

2168 # you would want to enable this flag. Default value is False. 

2169 # More background of MSA-PT is available from this internal docs: 

2170 # https://microsoft.sharepoint.com/:w:/t/Identity-DevEx/EatIUauX3c9Ctw1l7AQ6iM8B5CeBZxc58eoQCE0IuZ0VFw?e=tgc3jP&CID=39c853be-76ea-79d7-ee73-f1b2706ede05 

2171 False 

2172 ) and data.get("token_type") != "ssh-cert" # Work around a known issue as of PyMsalRuntime 0.8 

2173 self._validate_ssh_cert_input_data(data) 

2174 is_ssh_cert_or_pop_request = _is_ssh_cert_or_pop_request(data.get("token_type"), auth_scheme) 

2175 

2176 if not on_before_launching_ui: 

2177 on_before_launching_ui = lambda **kwargs: None 

2178 if _is_running_in_cloud_shell() and prompt == "none": 

2179 # Note: _acquire_token_by_cloud_shell() is always silent, 

2180 # so we would not fire on_before_launching_ui() 

2181 return self._acquire_token_by_cloud_shell(scopes, data=data) 

2182 claims = _merge_claims_challenge_and_capabilities( 

2183 self._client_capabilities, claims_challenge) 

2184 if self._enable_broker and (sys.platform in ("win32", "darwin") or not is_ssh_cert_or_pop_request): 

2185 if parent_window_handle is None: 

2186 raise ValueError( 

2187 "parent_window_handle is required when you opted into using broker. " 

2188 "You need to provide the window handle of your GUI application, " 

2189 "or use msal.PublicClientApplication.CONSOLE_WINDOW_HANDLE " 

2190 "when and only when your application is a console app.") 

2191 if extra_scopes_to_consent: 

2192 logger.warning( 

2193 "Ignoring parameter extra_scopes_to_consent, " 

2194 "which is not supported by broker") 

2195 response = self._acquire_token_interactive_via_broker( 

2196 scopes, 

2197 parent_window_handle, 

2198 enable_msa_passthrough, 

2199 claims, 

2200 data, 

2201 on_before_launching_ui, 

2202 auth_scheme, 

2203 prompt=prompt, 

2204 login_hint=login_hint, 

2205 max_age=max_age, 

2206 ) 

2207 return self._process_broker_response(response, scopes, data) 

2208 

2209 if isinstance(auth_scheme, msal.auth_scheme.PopAuthScheme) and sys.platform == "linux": 

2210 raise ValueError("POP is not supported on Linux") 

2211 elif auth_scheme: 

2212 raise ValueError(self._AUTH_SCHEME_UNSUPPORTED) 

2213 on_before_launching_ui(ui="browser") 

2214 telemetry_context = self._build_telemetry_context( 

2215 self.ACQUIRE_TOKEN_INTERACTIVE) 

2216 response = _clean_up(self.client.obtain_token_by_browser( 

2217 scope=self._decorate_scope(scopes) if scopes else None, 

2218 extra_scope_to_consent=extra_scopes_to_consent, 

2219 redirect_uri="http://localhost:{port}".format( 

2220 # Hardcode the host, for now. AAD portal rejects 127.0.0.1 anyway 

2221 port=port or 0), 

2222 prompt=prompt, 

2223 login_hint=login_hint, 

2224 max_age=max_age, 

2225 timeout=timeout, 

2226 auth_params={ 

2227 "claims": claims, 

2228 "domain_hint": domain_hint, 

2229 }, 

2230 data=dict(data, claims=claims), 

2231 headers=telemetry_context.generate_headers(), 

2232 browser_name=_preferred_browser(), 

2233 **kwargs)) 

2234 if "access_token" in response: 

2235 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

2236 telemetry_context.update_telemetry(response) 

2237 return response 

2238 

2239 def _acquire_token_interactive_via_broker( 

2240 self, 

2241 scopes, # type: list[str] 

2242 parent_window_handle, # type: int 

2243 enable_msa_passthrough, # type: boolean 

2244 claims, # type: str 

2245 data, # type: dict 

2246 on_before_launching_ui, # type: callable 

2247 auth_scheme, # type: object 

2248 prompt=None, 

2249 login_hint=None, # type: Optional[str] 

2250 max_age=None, 

2251 **kwargs): 

2252 from .broker import _signin_interactively, _signin_silently, _acquire_token_silently 

2253 if "welcome_template" in kwargs: 

2254 logger.debug(kwargs["welcome_template"]) # Experimental 

2255 authority = "https://{}/{}".format( 

2256 self.authority.instance, self.authority.tenant) 

2257 validate_authority = "no" if ( 

2258 self.authority._is_known_to_developer 

2259 or self._instance_discovery is False) else None 

2260 # Calls different broker methods to mimic the OIDC behaviors 

2261 if login_hint and prompt != "select_account": # OIDC prompts when the user did not sign in 

2262 accounts = self.get_accounts(username=login_hint) 

2263 if len(accounts) == 1: # Unambiguously proceed with this account 

2264 logger.debug("Calling broker._acquire_token_silently()") 

2265 response = _acquire_token_silently( # When it works, it bypasses prompt 

2266 authority, 

2267 self.client_id, 

2268 accounts[0]["local_account_id"], 

2269 scopes, 

2270 claims=claims, 

2271 auth_scheme=auth_scheme, 

2272 **data) 

2273 if response and "error" not in response: 

2274 return response 

2275 # login_hint undecisive or not exists 

2276 if prompt == "none" or not prompt: # Must/Can attempt _signin_silently() 

2277 logger.debug("Calling broker._signin_silently()") 

2278 response = _signin_silently( # Unlike OIDC, it doesn't honor login_hint 

2279 authority, self.client_id, scopes, 

2280 validateAuthority=validate_authority, 

2281 claims=claims, 

2282 max_age=max_age, 

2283 enable_msa_pt=enable_msa_passthrough, 

2284 auth_scheme=auth_scheme, 

2285 **data) 

2286 is_wrong_account = bool( 

2287 # _signin_silently() only gets tokens for default account, 

2288 # but this seems to have been fixed in PyMsalRuntime 0.11.2 

2289 "access_token" in response and login_hint 

2290 and login_hint != response.get( 

2291 "id_token_claims", {}).get("preferred_username")) 

2292 wrong_account_error_message = ( 

2293 'prompt="none" will not work for login_hint="non-default-user"') 

2294 if is_wrong_account: 

2295 logger.debug(wrong_account_error_message) 

2296 if prompt == "none": 

2297 return response if not is_wrong_account else { 

2298 "error": "broker_error", 

2299 "error_description": wrong_account_error_message, 

2300 } 

2301 else: 

2302 assert bool(prompt) is False 

2303 from pymsalruntime import Response_Status 

2304 recoverable_errors = frozenset([ 

2305 Response_Status.Status_AccountUnusable, 

2306 Response_Status.Status_InteractionRequired, 

2307 ]) 

2308 if is_wrong_account or "error" in response and response.get( 

2309 "_broker_status") in recoverable_errors: 

2310 pass # It will fall back to the _signin_interactively() 

2311 else: 

2312 return response 

2313 

2314 logger.debug("Falls back to broker._signin_interactively()") 

2315 on_before_launching_ui(ui="broker") 

2316 return _signin_interactively( 

2317 authority, self.client_id, scopes, 

2318 None if parent_window_handle is self.CONSOLE_WINDOW_HANDLE 

2319 else parent_window_handle, 

2320 validateAuthority=validate_authority, 

2321 login_hint=login_hint, 

2322 prompt=prompt, 

2323 claims=claims, 

2324 max_age=max_age, 

2325 enable_msa_pt=enable_msa_passthrough, 

2326 auth_scheme=auth_scheme, 

2327 **data) 

2328 

2329 def initiate_device_flow(self, scopes=None, *, claims_challenge=None, **kwargs): 

2330 """Initiate a Device Flow instance, 

2331 which will be used in :func:`~acquire_token_by_device_flow`. 

2332 

2333 :param list[str] scopes: 

2334 Scopes requested to access a protected API (a resource). 

2335 :return: A dict representing a newly created Device Flow object. 

2336 

2337 - A successful response would contain "user_code" key, among others 

2338 - an error response would contain some other readable key/value pairs. 

2339 """ 

2340 correlation_id = msal.telemetry._get_new_correlation_id() 

2341 flow = self.client.initiate_device_flow( 

2342 scope=self._decorate_scope(scopes or []), 

2343 headers={msal.telemetry.CLIENT_REQUEST_ID: correlation_id}, 

2344 data={"claims": _merge_claims_challenge_and_capabilities( 

2345 self._client_capabilities, claims_challenge)}, 

2346 **kwargs) 

2347 flow[self.DEVICE_FLOW_CORRELATION_ID] = correlation_id 

2348 return flow 

2349 

2350 def acquire_token_by_device_flow(self, flow, claims_challenge=None, **kwargs): 

2351 """Obtain token by a device flow object, with customizable polling effect. 

2352 

2353 :param dict flow: 

2354 A dict previously generated by :func:`~initiate_device_flow`. 

2355 By default, this method's polling effect will block current thread. 

2356 You can abort the polling loop at any time, 

2357 by changing the value of the flow's "expires_at" key to 0. 

2358 :param claims_challenge: 

2359 The claims_challenge parameter requests specific claims requested by the resource provider 

2360 in the form of a claims_challenge directive in the www-authenticate header to be 

2361 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

2362 It is a string of a JSON object which contains lists of claims being requested from these locations. 

2363 

2364 :return: A dict representing the json response from Microsoft Entra: 

2365 

2366 - A successful response would contain "access_token" key, 

2367 - an error response would contain "error" and usually "error_description". 

2368 """ 

2369 telemetry_context = self._build_telemetry_context( 

2370 self.ACQUIRE_TOKEN_BY_DEVICE_FLOW_ID, 

2371 correlation_id=flow.get(self.DEVICE_FLOW_CORRELATION_ID)) 

2372 response = _clean_up(self.client.obtain_token_by_device_flow( 

2373 flow, 

2374 data=dict( 

2375 kwargs.pop("data", {}), 

2376 code=flow["device_code"], # 2018-10-4 Hack: 

2377 # during transition period, 

2378 # service seemingly need both device_code and code parameter. 

2379 claims=_merge_claims_challenge_and_capabilities( 

2380 self._client_capabilities, claims_challenge), 

2381 ), 

2382 headers=telemetry_context.generate_headers(), 

2383 **kwargs)) 

2384 if "access_token" in response: 

2385 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

2386 telemetry_context.update_telemetry(response) 

2387 return response 

2388 

2389 

2390class ConfidentialClientApplication(ClientApplication): # server-side web app 

2391 """Same as :func:`ClientApplication.__init__`, 

2392 except that ``allow_broker`` parameter shall remain ``None``. 

2393 """ 

2394 

2395 def acquire_token_for_client(self, scopes, claims_challenge=None, **kwargs): 

2396 """Acquires token for the current confidential client, not for an end user. 

2397 

2398 Since MSAL Python 1.23, it will automatically look for token from cache, 

2399 and only send request to Identity Provider when cache misses. 

2400 

2401 :param list[str] scopes: (Required) 

2402 Scopes requested to access a protected API (a resource). 

2403 :param claims_challenge: 

2404 The claims_challenge parameter requests specific claims requested by the resource provider 

2405 in the form of a claims_challenge directive in the www-authenticate header to be 

2406 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

2407 It is a string of a JSON object which contains lists of claims being requested from these locations. 

2408 

2409 :return: A dict representing the json response from Microsoft Entra: 

2410 

2411 - A successful response would contain "access_token" key, 

2412 - an error response would contain "error" and usually "error_description". 

2413 """ 

2414 if kwargs.get("force_refresh"): 

2415 raise ValueError( # We choose to disallow force_refresh 

2416 "Historically, this method does not support force_refresh behavior. " 

2417 ) 

2418 return _clean_up(self._acquire_token_silent_with_error( 

2419 scopes, None, claims_challenge=claims_challenge, **kwargs)) 

2420 

2421 def _acquire_token_for_client( 

2422 self, 

2423 scopes, 

2424 refresh_reason, 

2425 claims_challenge=None, 

2426 **kwargs 

2427 ): 

2428 if self.authority.tenant.lower() in ["common", "organizations"]: 

2429 warnings.warn( 

2430 "Using /common or /organizations authority " 

2431 "in acquire_token_for_client() is unreliable. " 

2432 "Please use a specific tenant instead.", DeprecationWarning) 

2433 self._validate_ssh_cert_input_data(kwargs.get("data", {})) 

2434 telemetry_context = self._build_telemetry_context( 

2435 self.ACQUIRE_TOKEN_FOR_CLIENT_ID, refresh_reason=refresh_reason) 

2436 client = self._regional_client or self.client 

2437 response = client.obtain_token_for_client( 

2438 scope=scopes, # This grant flow requires no scope decoration 

2439 headers=telemetry_context.generate_headers(), 

2440 data=dict( 

2441 kwargs.pop("data", {}), 

2442 claims=_merge_claims_challenge_and_capabilities( 

2443 self._client_capabilities, claims_challenge)), 

2444 **kwargs) 

2445 telemetry_context.update_telemetry(response) 

2446 return response 

2447 

2448 def remove_tokens_for_client(self): 

2449 """Remove all tokens that were previously acquired via 

2450 :func:`~acquire_token_for_client()` for the current client.""" 

2451 for env in [self.authority.instance] + self._get_authority_aliases( 

2452 self.authority.instance): 

2453 for at in list(self.token_cache.search( # Remove ATs from a snapshot 

2454 TokenCache.CredentialType.ACCESS_TOKEN, query={ 

2455 "client_id": self.client_id, 

2456 "environment": env, 

2457 "home_account_id": None, # These are mostly app-only tokens 

2458 })): 

2459 self.token_cache.remove_at(at) 

2460 # acquire_token_for_client() obtains no RTs, so we have no RT to remove 

2461 

2462 def acquire_token_on_behalf_of(self, user_assertion, scopes, claims_challenge=None, **kwargs): 

2463 """Acquires token using on-behalf-of (OBO) flow. 

2464 

2465 The current app is a middle-tier service which was called with a token 

2466 representing an end user. 

2467 The current app can use such token (a.k.a. a user assertion) to request 

2468 another token to access downstream web API, on behalf of that user. 

2469 See `detail docs here <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-on-behalf-of-flow>`_ . 

2470 

2471 The current middle-tier app has no user interaction to obtain consent. 

2472 See how to gain consent upfront for your middle-tier app from this article. 

2473 https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-on-behalf-of-flow#gaining-consent-for-the-middle-tier-application 

2474 

2475 :param str user_assertion: The incoming token already received by this app 

2476 :param list[str] scopes: Scopes required by downstream API (a resource). 

2477 :param claims_challenge: 

2478 The claims_challenge parameter requests specific claims requested by the resource provider 

2479 in the form of a claims_challenge directive in the www-authenticate header to be 

2480 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

2481 It is a string of a JSON object which contains lists of claims being requested from these locations. 

2482 

2483 :return: A dict representing the json response from Microsoft Entra: 

2484 

2485 - A successful response would contain "access_token" key, 

2486 - an error response would contain "error" and usually "error_description". 

2487 """ 

2488 telemetry_context = self._build_telemetry_context( 

2489 self.ACQUIRE_TOKEN_ON_BEHALF_OF_ID) 

2490 # The implementation is NOT based on Token Exchange (RFC 8693) 

2491 response = _clean_up(self.client.obtain_token_by_assertion( # bases on assertion RFC 7521 

2492 user_assertion, 

2493 self.client.GRANT_TYPE_JWT, # IDTs and AAD ATs are all JWTs 

2494 scope=self._decorate_scope(scopes), # Decoration is used for: 

2495 # 1. Explicitly requesting an RT, without relying on AAD default 

2496 # behavior, even though it currently still issues an RT. 

2497 # 2. Requesting an IDT (which would otherwise be unavailable) 

2498 # so that the calling app could use id_token_claims to implement 

2499 # their own cache mapping, which is likely needed in web apps. 

2500 data=dict( 

2501 kwargs.pop("data", {}), 

2502 requested_token_use="on_behalf_of", 

2503 claims=_merge_claims_challenge_and_capabilities( 

2504 self._client_capabilities, claims_challenge)), 

2505 headers=telemetry_context.generate_headers(), 

2506 # TBD: Expose a login_hint (or ccs_routing_hint) param for web app 

2507 **kwargs)) 

2508 if "access_token" in response: 

2509 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

2510 telemetry_context.update_telemetry(response) 

2511 return response