Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/msal/application.py: 16%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

651 statements  

1import functools 

2import json 

3import time 

4import logging 

5import sys 

6import warnings 

7from threading import Lock 

8from typing import Optional # Needed in Python 3.7 & 3.8 

9import os 

10 

11from .oauth2cli import Client, JwtAssertionCreator 

12from .oauth2cli.oidc import decode_part 

13from .authority import Authority, WORLD_WIDE 

14from .mex import send_request as mex_send_request 

15from .wstrust_request import send_request as wst_send_request 

16from .wstrust_response import * 

17from .token_cache import TokenCache, _get_username, _GRANT_TYPE_BROKER 

18import msal.telemetry 

19from .region import _detect_region 

20from .throttled_http_client import ThrottledHttpClient 

21from .cloudshell import _is_running_in_cloud_shell 

22 

23 

24# The __init__.py will import this. Not the other way around. 

25__version__ = "1.31.1" # When releasing, also check and bump our dependencies's versions if needed 

26 

27logger = logging.getLogger(__name__) 

28_AUTHORITY_TYPE_CLOUDSHELL = "CLOUDSHELL" 

29 

30def _init_broker(enable_pii_log): # Make it a function to allow mocking 

31 from . import broker # Trigger Broker's initialization, lazily 

32 if enable_pii_log: 

33 broker._enable_pii_log() 

34 

35def extract_certs(public_cert_content): 

36 # Parses raw public certificate file contents and returns a list of strings 

37 # Usage: headers = {"x5c": extract_certs(open("my_cert.pem").read())} 

38 public_certificates = re.findall( 

39 r'-----BEGIN CERTIFICATE-----(?P<cert_value>[^-]+)-----END CERTIFICATE-----', 

40 public_cert_content, re.I) 

41 if public_certificates: 

42 return [cert.strip() for cert in public_certificates] 

43 # The public cert tags are not found in the input, 

44 # let's make best effort to exclude a private key pem file. 

45 if "PRIVATE KEY" in public_cert_content: 

46 raise ValueError( 

47 "We expect your public key but detect a private key instead") 

48 return [public_cert_content.strip()] 

49 

50 

51def _merge_claims_challenge_and_capabilities(capabilities, claims_challenge): 

52 # Represent capabilities as {"access_token": {"xms_cc": {"values": capabilities}}} 

53 # and then merge/add it into incoming claims 

54 if not capabilities: 

55 return claims_challenge 

56 claims_dict = json.loads(claims_challenge) if claims_challenge else {} 

57 for key in ["access_token"]: # We could add "id_token" if we'd decide to 

58 claims_dict.setdefault(key, {}).update(xms_cc={"values": capabilities}) 

59 return json.dumps(claims_dict) 

60 

61 

62def _str2bytes(raw): 

63 # A conversion based on duck-typing rather than six.text_type 

64 try: 

65 return raw.encode(encoding="utf-8") 

66 except: 

67 return raw 

68 

69 

70def _parse_pfx(pfx_path, passphrase_bytes): 

71 # Cert concepts https://security.stackexchange.com/a/226758/125264 

72 from cryptography.hazmat.primitives import hashes, serialization 

73 from cryptography.hazmat.primitives.serialization import pkcs12 

74 with open(pfx_path, 'rb') as f: 

75 private_key, cert, _ = pkcs12.load_key_and_certificates( # cryptography 2.5+ 

76 # https://cryptography.io/en/latest/hazmat/primitives/asymmetric/serialization/#cryptography.hazmat.primitives.serialization.pkcs12.load_key_and_certificates 

77 f.read(), passphrase_bytes) 

78 if not (private_key and cert): 

79 raise ValueError("Your PFX file shall contain both private key and cert") 

80 cert_pem = cert.public_bytes(encoding=serialization.Encoding.PEM).decode() # cryptography 1.0+ 

81 x5c = [ 

82 '\n'.join(cert_pem.splitlines()[1:-1]) # Strip the "--- header ---" and "--- footer ---" 

83 ] 

84 sha256_thumbprint = cert.fingerprint(hashes.SHA256()).hex() # cryptography 0.7+ 

85 sha1_thumbprint = cert.fingerprint(hashes.SHA1()).hex() # cryptography 0.7+ 

86 # https://cryptography.io/en/latest/x509/reference/#x-509-certificate-object 

87 return private_key, sha256_thumbprint, sha1_thumbprint, x5c 

88 

89 

90def _load_private_key_from_pem_str(private_key_pem_str, passphrase_bytes): 

91 from cryptography.hazmat.primitives import serialization 

92 from cryptography.hazmat.backends import default_backend 

93 return serialization.load_pem_private_key( # cryptography 0.6+ 

94 _str2bytes(private_key_pem_str), 

95 passphrase_bytes, 

96 backend=default_backend(), # It was a required param until 2020 

97 ) 

98 

99 

100def _pii_less_home_account_id(home_account_id): 

101 parts = home_account_id.split(".") # It could contain one or two parts 

102 parts[0] = "********" 

103 return ".".join(parts) 

104 

105 

106def _clean_up(result): 

107 if isinstance(result, dict): 

108 if "_msalruntime_telemetry" in result or "_msal_python_telemetry" in result: 

109 result["msal_telemetry"] = json.dumps({ # Telemetry as an opaque string 

110 "msalruntime_telemetry": result.get("_msalruntime_telemetry"), 

111 "msal_python_telemetry": result.get("_msal_python_telemetry"), 

112 }, separators=(",", ":")) 

113 return_value = { 

114 k: result[k] for k in result 

115 if k != "refresh_in" # MSAL handled refresh_in, customers need not 

116 and not k.startswith('_') # Skim internal properties 

117 } 

118 if "refresh_in" in result: # To encourage proactive refresh 

119 return_value["refresh_on"] = int(time.time() + result["refresh_in"]) 

120 return return_value 

121 return result # It could be None 

122 

123 

124def _preferred_browser(): 

125 """Register Edge and return a name suitable for subsequent webbrowser.get(...) 

126 when appropriate. Otherwise return None. 

127 """ 

128 # On Linux, only Edge will provide device-based Conditional Access support 

129 if sys.platform != "linux": # On other platforms, we have no browser preference 

130 return None 

131 browser_path = "/usr/bin/microsoft-edge" # Use a full path owned by sys admin 

132 # Note: /usr/bin/microsoft-edge, /usr/bin/microsoft-edge-stable, etc. 

133 # are symlinks that point to the actual binaries which are found under 

134 # /opt/microsoft/msedge/msedge or /opt/microsoft/msedge-beta/msedge. 

135 # Either method can be used to detect an Edge installation. 

136 user_has_no_preference = "BROWSER" not in os.environ 

137 user_wont_mind_edge = "microsoft-edge" in os.environ.get("BROWSER", "") # Note: 

138 # BROWSER could contain "microsoft-edge" or "/path/to/microsoft-edge". 

139 # Python documentation (https://docs.python.org/3/library/webbrowser.html) 

140 # does not document the name being implicitly register, 

141 # so there is no public API to know whether the ENV VAR browser would work. 

142 # Therefore, we would not bother examine the env var browser's type. 

143 # We would just register our own Edge instance. 

144 if (user_has_no_preference or user_wont_mind_edge) and os.path.exists(browser_path): 

145 try: 

146 import webbrowser # Lazy import. Some distro may not have this. 

147 browser_name = "msal-edge" # Avoid popular name "microsoft-edge" 

148 # otherwise `BROWSER="microsoft-edge"; webbrowser.get("microsoft-edge")` 

149 # would return a GenericBrowser instance which won't work. 

150 try: 

151 registration_available = isinstance( 

152 webbrowser.get(browser_name), webbrowser.BackgroundBrowser) 

153 except webbrowser.Error: 

154 registration_available = False 

155 if not registration_available: 

156 logger.debug("Register %s with %s", browser_name, browser_path) 

157 # By registering our own browser instance with our own name, 

158 # rather than populating a process-wide BROWSER enn var, 

159 # this approach does not have side effect on non-MSAL code path. 

160 webbrowser.register( # Even double-register happens to work fine 

161 browser_name, None, webbrowser.BackgroundBrowser(browser_path)) 

162 return browser_name 

163 except ImportError: 

164 pass # We may still proceed 

165 return None 

166 

167 

168class _ClientWithCcsRoutingInfo(Client): 

169 

170 def initiate_auth_code_flow(self, **kwargs): 

171 if kwargs.get("login_hint"): # eSTS could have utilized this as-is, but nope 

172 kwargs["X-AnchorMailbox"] = "UPN:%s" % kwargs["login_hint"] 

173 return super(_ClientWithCcsRoutingInfo, self).initiate_auth_code_flow( 

174 client_info=1, # To be used as CSS Routing info 

175 **kwargs) 

176 

177 def obtain_token_by_auth_code_flow( 

178 self, auth_code_flow, auth_response, **kwargs): 

179 # Note: the obtain_token_by_browser() is also covered by this 

180 assert isinstance(auth_code_flow, dict) and isinstance(auth_response, dict) 

181 headers = kwargs.pop("headers", {}) 

182 client_info = json.loads( 

183 decode_part(auth_response["client_info"]) 

184 ) if auth_response.get("client_info") else {} 

185 if "uid" in client_info and "utid" in client_info: 

186 # Note: The value of X-AnchorMailbox is also case-insensitive 

187 headers["X-AnchorMailbox"] = "Oid:{uid}@{utid}".format(**client_info) 

188 return super(_ClientWithCcsRoutingInfo, self).obtain_token_by_auth_code_flow( 

189 auth_code_flow, auth_response, headers=headers, **kwargs) 

190 

191 def obtain_token_by_username_password(self, username, password, **kwargs): 

192 headers = kwargs.pop("headers", {}) 

193 headers["X-AnchorMailbox"] = "upn:{}".format(username) 

194 return super(_ClientWithCcsRoutingInfo, self).obtain_token_by_username_password( 

195 username, password, headers=headers, **kwargs) 

196 

197 

198def _msal_extension_check(): 

199 # Can't run this in module or class level otherwise you'll get circular import error 

200 try: 

201 from msal_extensions import __version__ as v 

202 major, minor, _ = v.split(".", maxsplit=3) 

203 if not (int(major) >= 1 and int(minor) >= 2): 

204 warnings.warn( 

205 "Please upgrade msal-extensions. " 

206 "Only msal-extensions 1.2+ can work with msal 1.30+") 

207 except ImportError: 

208 pass # The optional msal_extensions is not installed. Business as usual. 

209 except ValueError: 

210 logger.exception(f"msal_extensions version {v} not in major.minor.patch format") 

211 

212 

213class ClientApplication(object): 

214 """You do not usually directly use this class. Use its subclasses instead: 

215 :class:`PublicClientApplication` and :class:`ConfidentialClientApplication`. 

216 """ 

217 ACQUIRE_TOKEN_SILENT_ID = "84" 

218 ACQUIRE_TOKEN_BY_REFRESH_TOKEN = "85" 

219 ACQUIRE_TOKEN_BY_USERNAME_PASSWORD_ID = "301" 

220 ACQUIRE_TOKEN_ON_BEHALF_OF_ID = "523" 

221 ACQUIRE_TOKEN_BY_DEVICE_FLOW_ID = "622" 

222 ACQUIRE_TOKEN_FOR_CLIENT_ID = "730" 

223 ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID = "832" 

224 ACQUIRE_TOKEN_INTERACTIVE = "169" 

225 GET_ACCOUNTS_ID = "902" 

226 REMOVE_ACCOUNT_ID = "903" 

227 

228 ATTEMPT_REGION_DISCOVERY = True # "TryAutoDetect" 

229 DISABLE_MSAL_FORCE_REGION = False # Used in azure_region to disable MSAL_FORCE_REGION behavior 

230 _TOKEN_SOURCE = "token_source" 

231 _TOKEN_SOURCE_IDP = "identity_provider" 

232 _TOKEN_SOURCE_CACHE = "cache" 

233 _TOKEN_SOURCE_BROKER = "broker" 

234 

235 _enable_broker = False 

236 _AUTH_SCHEME_UNSUPPORTED = ( 

237 "auth_scheme is currently only available from broker. " 

238 "You can enable broker by following these instructions. " 

239 "https://msal-python.readthedocs.io/en/latest/#publicclientapplication") 

240 

241 def __init__( 

242 self, client_id, 

243 client_credential=None, authority=None, validate_authority=True, 

244 token_cache=None, 

245 http_client=None, 

246 verify=True, proxies=None, timeout=None, 

247 client_claims=None, app_name=None, app_version=None, 

248 client_capabilities=None, 

249 azure_region=None, # Note: We choose to add this param in this base class, 

250 # despite it is currently only needed by ConfidentialClientApplication. 

251 # This way, it holds the same positional param place for PCA, 

252 # when we would eventually want to add this feature to PCA in future. 

253 exclude_scopes=None, 

254 http_cache=None, 

255 instance_discovery=None, 

256 allow_broker=None, 

257 enable_pii_log=None, 

258 oidc_authority=None, 

259 ): 

260 """Create an instance of application. 

261 

262 :param str client_id: Your app has a client_id after you register it on Microsoft Entra admin center. 

263 

264 :param client_credential: 

265 For :class:`PublicClientApplication`, you use `None` here. 

266 

267 For :class:`ConfidentialClientApplication`, 

268 it supports many different input formats for different scenarios. 

269 

270 .. admonition:: Support using a client secret. 

271 

272 Just feed in a string, such as ``"your client secret"``. 

273 

274 .. admonition:: Support using a certificate in X.509 (.pem) format 

275 

276 Feed in a dict in this form:: 

277 

278 { 

279 "private_key": "...-----BEGIN PRIVATE KEY-----... in PEM format", 

280 "thumbprint": "A1B2C3D4E5F6...", 

281 "passphrase": "Passphrase if the private_key is encrypted (Optional. Added in version 1.6.0)", 

282 } 

283 

284 MSAL Python requires a "private_key" in PEM format. 

285 If your cert is in PKCS12 (.pfx) format, 

286 you can convert it to X.509 (.pem) format, 

287 by ``openssl pkcs12 -in file.pfx -out file.pem -nodes``. 

288 

289 The thumbprint is available in your app's registration in Azure Portal. 

290 Alternatively, you can `calculate the thumbprint <https://github.com/Azure/azure-sdk-for-python/blob/07d10639d7e47f4852eaeb74aef5d569db499d6e/sdk/identity/azure-identity/azure/identity/_credentials/certificate.py#L94-L97>`_. 

291 

292 .. admonition:: Support Subject Name/Issuer Auth with a cert in .pem 

293 

294 `Subject Name/Issuer Auth 

295 <https://github.com/AzureAD/microsoft-authentication-library-for-python/issues/60>`_ 

296 is an approach to allow easier certificate rotation. 

297 

298 *Added in version 0.5.0*:: 

299 

300 { 

301 "private_key": "...-----BEGIN PRIVATE KEY-----... in PEM format", 

302 "thumbprint": "A1B2C3D4E5F6...", 

303 "public_certificate": "...-----BEGIN CERTIFICATE-----...", 

304 "passphrase": "Passphrase if the private_key is encrypted (Optional. Added in version 1.6.0)", 

305 } 

306 

307 ``public_certificate`` (optional) is public key certificate 

308 which will be sent through 'x5c' JWT header only for 

309 subject name and issuer authentication to support cert auto rolls. 

310 

311 Per `specs <https://tools.ietf.org/html/rfc7515#section-4.1.6>`_, 

312 "the certificate containing 

313 the public key corresponding to the key used to digitally sign the 

314 JWS MUST be the first certificate. This MAY be followed by 

315 additional certificates, with each subsequent certificate being the 

316 one used to certify the previous one." 

317 However, your certificate's issuer may use a different order. 

318 So, if your attempt ends up with an error AADSTS700027 - 

319 "The provided signature value did not match the expected signature value", 

320 you may try use only the leaf cert (in PEM/str format) instead. 

321 

322 .. admonition:: Supporting raw assertion obtained from elsewhere 

323 

324 *Added in version 1.13.0*: 

325 It can also be a completely pre-signed assertion that you've assembled yourself. 

326 Simply pass a container containing only the key "client_assertion", like this:: 

327 

328 { 

329 "client_assertion": "...a JWT with claims aud, exp, iss, jti, nbf, and sub..." 

330 } 

331 

332 .. admonition:: Supporting reading client cerficates from PFX files 

333 

334 *Added in version 1.29.0*: 

335 Feed in a dictionary containing the path to a PFX file:: 

336 

337 { 

338 "private_key_pfx_path": "/path/to/your.pfx", 

339 "passphrase": "Passphrase if the private_key is encrypted (Optional)", 

340 } 

341 

342 The following command will generate a .pfx file from your .key and .pem file:: 

343 

344 openssl pkcs12 -export -out certificate.pfx -inkey privateKey.key -in certificate.pem 

345 

346 .. admonition:: Support Subject Name/Issuer Auth with a cert in .pfx 

347 

348 *Added in version 1.30.0*: 

349 If your .pfx file contains both the private key and public cert, 

350 you can opt in for Subject Name/Issuer Auth like this:: 

351 

352 { 

353 "private_key_pfx_path": "/path/to/your.pfx", 

354 "public_certificate": True, 

355 "passphrase": "Passphrase if the private_key is encrypted (Optional)", 

356 } 

357 

358 :type client_credential: Union[dict, str, None] 

359 

360 :param dict client_claims: 

361 *Added in version 0.5.0*: 

362 It is a dictionary of extra claims that would be signed by 

363 by this :class:`ConfidentialClientApplication` 's private key. 

364 For example, you can use {"client_ip": "x.x.x.x"}. 

365 You may also override any of the following default claims:: 

366 

367 { 

368 "aud": the_token_endpoint, 

369 "iss": self.client_id, 

370 "sub": same_as_issuer, 

371 "exp": now + 10_min, 

372 "iat": now, 

373 "jti": a_random_uuid 

374 } 

375 

376 :param str authority: 

377 A URL that identifies a token authority. It should be of the format 

378 ``https://login.microsoftonline.com/your_tenant`` 

379 By default, we will use ``https://login.microsoftonline.com/common`` 

380 

381 *Changed in version 1.17*: you can also use predefined constant 

382 and a builder like this:: 

383 

384 from msal.authority import ( 

385 AuthorityBuilder, 

386 AZURE_US_GOVERNMENT, AZURE_CHINA, AZURE_PUBLIC) 

387 my_authority = AuthorityBuilder(AZURE_PUBLIC, "contoso.onmicrosoft.com") 

388 # Now you get an equivalent of 

389 # "https://login.microsoftonline.com/contoso.onmicrosoft.com" 

390 

391 # You can feed such an authority to msal's ClientApplication 

392 from msal import PublicClientApplication 

393 app = PublicClientApplication("my_client_id", authority=my_authority, ...) 

394 

395 :param bool validate_authority: (optional) Turns authority validation 

396 on or off. This parameter default to true. 

397 :param TokenCache token_cache: 

398 Sets the token cache used by this ClientApplication instance. 

399 By default, an in-memory cache will be created and used. 

400 :param http_client: (optional) 

401 Your implementation of abstract class HttpClient <msal.oauth2cli.http.http_client> 

402 Defaults to a requests session instance. 

403 Since MSAL 1.11.0, the default session would be configured 

404 to attempt one retry on connection error. 

405 If you are providing your own http_client, 

406 it will be your http_client's duty to decide whether to perform retry. 

407 

408 :param verify: (optional) 

409 It will be passed to the 

410 `verify parameter in the underlying requests library 

411 <http://docs.python-requests.org/en/v2.9.1/user/advanced/#ssl-cert-verification>`_ 

412 This does not apply if you have chosen to pass your own Http client 

413 :param proxies: (optional) 

414 It will be passed to the 

415 `proxies parameter in the underlying requests library 

416 <http://docs.python-requests.org/en/v2.9.1/user/advanced/#proxies>`_ 

417 This does not apply if you have chosen to pass your own Http client 

418 :param timeout: (optional) 

419 It will be passed to the 

420 `timeout parameter in the underlying requests library 

421 <http://docs.python-requests.org/en/v2.9.1/user/advanced/#timeouts>`_ 

422 This does not apply if you have chosen to pass your own Http client 

423 :param app_name: (optional) 

424 You can provide your application name for Microsoft telemetry purposes. 

425 Default value is None, means it will not be passed to Microsoft. 

426 :param app_version: (optional) 

427 You can provide your application version for Microsoft telemetry purposes. 

428 Default value is None, means it will not be passed to Microsoft. 

429 :param list[str] client_capabilities: (optional) 

430 Allows configuration of one or more client capabilities, e.g. ["CP1"]. 

431 

432 Client capability is meant to inform the Microsoft identity platform 

433 (STS) what this client is capable for, 

434 so STS can decide to turn on certain features. 

435 For example, if client is capable to handle *claims challenge*, 

436 STS may issue 

437 `Continuous Access Evaluation (CAE) <https://learn.microsoft.com/entra/identity/conditional-access/concept-continuous-access-evaluation>`_ 

438 access tokens to resources, 

439 knowing that when the resource emits a *claims challenge* 

440 the client will be able to handle those challenges. 

441 

442 Implementation details: 

443 Client capability is implemented using "claims" parameter on the wire, 

444 for now. 

445 MSAL will combine them into 

446 `claims parameter <https://openid.net/specs/openid-connect-core-1_0-final.html#ClaimsParameter>`_ 

447 which you will later provide via one of the acquire-token request. 

448 

449 :param str azure_region: (optional) 

450 Instructs MSAL to use the Entra regional token service. This legacy feature is only available to 

451 first-party applications. Only ``acquire_token_for_client()`` is supported. 

452 

453 Supports 4 values: 

454 

455 1. ``azure_region=None`` - This default value means no region is configured. 

456 MSAL will use the region defined in env var ``MSAL_FORCE_REGION``. 

457 2. ``azure_region="some_region"`` - meaning the specified region is used. 

458 3. ``azure_region=True`` - meaning 

459 MSAL will try to auto-detect the region. This is not recommended. 

460 4. ``azure_region=False`` - meaning MSAL will use no region. 

461 

462 .. note:: 

463 Region auto-discovery has been tested on VMs and on Azure Functions. It is unreliable. 

464 Applications using this option should configure a short timeout. 

465 

466 For more details and for the values of the region string 

467 see https://learn.microsoft.com/entra/msal/dotnet/resources/region-discovery-troubleshooting 

468 

469 New in version 1.12.0. 

470 

471 :param list[str] exclude_scopes: (optional) 

472 Historically MSAL hardcodes `offline_access` scope, 

473 which would allow your app to have prolonged access to user's data. 

474 If that is unnecessary or undesirable for your app, 

475 now you can use this parameter to supply an exclusion list of scopes, 

476 such as ``exclude_scopes = ["offline_access"]``. 

477 

478 :param dict http_cache: 

479 MSAL has long been caching tokens in the ``token_cache``. 

480 Recently, MSAL also introduced a concept of ``http_cache``, 

481 by automatically caching some finite amount of non-token http responses, 

482 so that *long-lived* 

483 ``PublicClientApplication`` and ``ConfidentialClientApplication`` 

484 would be more performant and responsive in some situations. 

485 

486 This ``http_cache`` parameter accepts any dict-like object. 

487 If not provided, MSAL will use an in-memory dict. 

488 

489 If your app is a command-line app (CLI), 

490 you would want to persist your http_cache across different CLI runs. 

491 The following recipe shows a way to do so:: 

492 

493 # Just add the following lines at the beginning of your CLI script 

494 import sys, atexit, pickle 

495 http_cache_filename = sys.argv[0] + ".http_cache" 

496 try: 

497 with open(http_cache_filename, "rb") as f: 

498 persisted_http_cache = pickle.load(f) # Take a snapshot 

499 except ( 

500 FileNotFoundError, # Or IOError in Python 2 

501 pickle.UnpicklingError, # A corrupted http cache file 

502 ): 

503 persisted_http_cache = {} # Recover by starting afresh 

504 atexit.register(lambda: pickle.dump( 

505 # When exit, flush it back to the file. 

506 # It may occasionally overwrite another process's concurrent write, 

507 # but that is fine. Subsequent runs will reach eventual consistency. 

508 persisted_http_cache, open(http_cache_file, "wb"))) 

509 

510 # And then you can implement your app as you normally would 

511 app = msal.PublicClientApplication( 

512 "your_client_id", 

513 ..., 

514 http_cache=persisted_http_cache, # Utilize persisted_http_cache 

515 ..., 

516 #token_cache=..., # You may combine the old token_cache trick 

517 # Please refer to token_cache recipe at 

518 # https://msal-python.readthedocs.io/en/latest/#msal.SerializableTokenCache 

519 ) 

520 app.acquire_token_interactive(["your", "scope"], ...) 

521 

522 Content inside ``http_cache`` are cheap to obtain. 

523 There is no need to share them among different apps. 

524 

525 Content inside ``http_cache`` will contain no tokens nor 

526 Personally Identifiable Information (PII). Encryption is unnecessary. 

527 

528 New in version 1.16.0. 

529 

530 :param boolean instance_discovery: 

531 Historically, MSAL would connect to a central endpoint located at 

532 ``https://login.microsoftonline.com`` to acquire some metadata, 

533 especially when using an unfamiliar authority. 

534 This behavior is known as Instance Discovery. 

535 

536 This parameter defaults to None, which enables the Instance Discovery. 

537 

538 If you know some authorities which you allow MSAL to operate with as-is, 

539 without involving any Instance Discovery, the recommended pattern is:: 

540 

541 known_authorities = frozenset([ # Treat your known authorities as const 

542 "https://contoso.com/adfs", "https://login.azs/foo"]) 

543 ... 

544 authority = "https://contoso.com/adfs" # Assuming your app will use this 

545 app1 = PublicClientApplication( 

546 "client_id", 

547 authority=authority, 

548 # Conditionally disable Instance Discovery for known authorities 

549 instance_discovery=authority not in known_authorities, 

550 ) 

551 

552 If you do not know some authorities beforehand, 

553 yet still want MSAL to accept any authority that you will provide, 

554 you can use a ``False`` to unconditionally disable Instance Discovery. 

555 

556 New in version 1.19.0. 

557 

558 :param boolean allow_broker: 

559 Deprecated. Please use ``enable_broker_on_windows`` instead. 

560 

561 :param boolean enable_pii_log: 

562 When enabled, logs may include PII (Personal Identifiable Information). 

563 This can be useful in troubleshooting broker behaviors. 

564 The default behavior is False. 

565 

566 New in version 1.24.0. 

567 

568 :param str oidc_authority: 

569 *Added in version 1.28.0*: 

570 It is a URL that identifies an OpenID Connect (OIDC) authority of 

571 the format ``https://contoso.com/tenant``. 

572 MSAL will append ".well-known/openid-configuration" to the authority 

573 and retrieve the OIDC metadata from there, to figure out the endpoints. 

574 

575 Note: Broker will NOT be used for OIDC authority. 

576 """ 

577 self.client_id = client_id 

578 self.client_credential = client_credential 

579 self.client_claims = client_claims 

580 self._client_capabilities = client_capabilities 

581 self._instance_discovery = instance_discovery 

582 

583 if exclude_scopes and not isinstance(exclude_scopes, list): 

584 raise ValueError( 

585 "Invalid exclude_scopes={}. It need to be a list of strings.".format( 

586 repr(exclude_scopes))) 

587 self._exclude_scopes = frozenset(exclude_scopes or []) 

588 if "openid" in self._exclude_scopes: 

589 raise ValueError( 

590 'Invalid exclude_scopes={}. You can not opt out "openid" scope'.format( 

591 repr(exclude_scopes))) 

592 

593 if http_client: 

594 self.http_client = http_client 

595 else: 

596 import requests # Lazy load 

597 

598 self.http_client = requests.Session() 

599 self.http_client.verify = verify 

600 self.http_client.proxies = proxies 

601 # Requests, does not support session - wide timeout 

602 # But you can patch that (https://github.com/psf/requests/issues/3341): 

603 self.http_client.request = functools.partial( 

604 self.http_client.request, timeout=timeout) 

605 

606 # Enable a minimal retry. Better than nothing. 

607 # https://github.com/psf/requests/blob/v2.25.1/requests/adapters.py#L94-L108 

608 a = requests.adapters.HTTPAdapter(max_retries=1) 

609 self.http_client.mount("http://", a) 

610 self.http_client.mount("https://", a) 

611 self.http_client = ThrottledHttpClient( 

612 self.http_client, 

613 http_cache=http_cache, 

614 default_throttle_time=60 

615 # The default value 60 was recommended mainly for PCA at the end of 

616 # https://identitydivision.visualstudio.com/devex/_git/AuthLibrariesApiReview?version=GBdev&path=%2FService%20protection%2FIntial%20set%20of%20protection%20measures.md&_a=preview 

617 if isinstance(self, PublicClientApplication) else 5, 

618 ) 

619 

620 self.app_name = app_name 

621 self.app_version = app_version 

622 

623 # Here the self.authority will not be the same type as authority in input 

624 if oidc_authority and authority: 

625 raise ValueError("You can not provide both authority and oidc_authority") 

626 try: 

627 authority_to_use = authority or "https://{}/common/".format(WORLD_WIDE) 

628 self.authority = Authority( 

629 authority_to_use, 

630 self.http_client, 

631 validate_authority=validate_authority, 

632 instance_discovery=self._instance_discovery, 

633 oidc_authority_url=oidc_authority, 

634 ) 

635 except ValueError: # Those are explicit authority validation errors 

636 raise 

637 except Exception: # The rest are typically connection errors 

638 if validate_authority and not oidc_authority and ( 

639 azure_region # Opted in to use region 

640 or (azure_region is None and os.getenv("MSAL_FORCE_REGION")) # Will use region 

641 ): 

642 # Since caller opts in to use region, here we tolerate connection 

643 # errors happened during authority validation at non-region endpoint 

644 self.authority = Authority( 

645 authority_to_use, 

646 self.http_client, 

647 instance_discovery=False, 

648 ) 

649 else: 

650 raise 

651 

652 self._decide_broker(allow_broker, enable_pii_log) 

653 self.token_cache = token_cache or TokenCache() 

654 self._region_configured = azure_region 

655 self._region_detected = None 

656 self.client, self._regional_client = self._build_client( 

657 client_credential, self.authority) 

658 self.authority_groups = None 

659 self._telemetry_buffer = {} 

660 self._telemetry_lock = Lock() 

661 _msal_extension_check() 

662 

663 

664 def _decide_broker(self, allow_broker, enable_pii_log): 

665 is_confidential_app = self.client_credential or isinstance( 

666 self, ConfidentialClientApplication) 

667 if is_confidential_app and allow_broker: 

668 raise ValueError("allow_broker=True is only supported in PublicClientApplication") 

669 # Historically, we chose to support ClientApplication("client_id", allow_broker=True) 

670 if allow_broker: 

671 warnings.warn( 

672 "allow_broker is deprecated. " 

673 "Please use PublicClientApplication(..., " 

674 "enable_broker_on_windows=True, " 

675 "enable_broker_on_mac=...)", 

676 DeprecationWarning) 

677 opted_in_for_broker = ( 

678 self._enable_broker # True means Opted-in from PCA 

679 or ( 

680 # When we started the broker project on Windows platform, 

681 # the allow_broker was meant to be cross-platform. Now we realize 

682 # that other platforms have different redirect_uri requirements, 

683 # so the old allow_broker is deprecated and will only for Windows. 

684 allow_broker and sys.platform == "win32") 

685 ) 

686 self._enable_broker = ( # This same variable will also store the state 

687 opted_in_for_broker 

688 and not is_confidential_app 

689 and not self.authority.is_adfs 

690 and not self.authority._is_b2c 

691 ) 

692 if self._enable_broker: 

693 try: 

694 _init_broker(enable_pii_log) 

695 except RuntimeError: 

696 self._enable_broker = False 

697 logger.exception( 

698 "Broker is unavailable on this platform. " 

699 "We will fallback to non-broker.") 

700 logger.debug("Broker enabled? %s", self._enable_broker) 

701 

702 def is_pop_supported(self): 

703 """Returns True if this client supports Proof-of-Possession Access Token.""" 

704 return self._enable_broker 

705 

706 def _decorate_scope( 

707 self, scopes, 

708 reserved_scope=frozenset(['openid', 'profile', 'offline_access'])): 

709 if not isinstance(scopes, (list, set, tuple)): 

710 raise ValueError("The input scopes should be a list, tuple, or set") 

711 scope_set = set(scopes) # Input scopes is typically a list. Copy it to a set. 

712 if scope_set & reserved_scope: 

713 # These scopes are reserved for the API to provide good experience. 

714 # We could make the developer pass these and then if they do they will 

715 # come back asking why they don't see refresh token or user information. 

716 raise ValueError( 

717 """You cannot use any scope value that is reserved. 

718Your input: {} 

719The reserved list: {}""".format(list(scope_set), list(reserved_scope))) 

720 raise ValueError( 

721 "You cannot use any scope value that is in this reserved list: {}".format( 

722 list(reserved_scope))) 

723 

724 # client_id can also be used as a scope in B2C 

725 decorated = scope_set | reserved_scope 

726 decorated -= self._exclude_scopes 

727 return list(decorated) 

728 

729 def _build_telemetry_context( 

730 self, api_id, correlation_id=None, refresh_reason=None): 

731 return msal.telemetry._TelemetryContext( 

732 self._telemetry_buffer, self._telemetry_lock, api_id, 

733 correlation_id=correlation_id, refresh_reason=refresh_reason) 

734 

735 def _get_regional_authority(self, central_authority) -> Optional[Authority]: 

736 if self._region_configured is False: # User opts out of ESTS-R 

737 return None # Short circuit to completely bypass region detection 

738 if self._region_configured is None: # User did not make an ESTS-R choice 

739 self._region_configured = os.getenv("MSAL_FORCE_REGION") or None 

740 self._region_detected = self._region_detected or _detect_region( 

741 self.http_client if self._region_configured is not None else None) 

742 if (self._region_configured != self.ATTEMPT_REGION_DISCOVERY 

743 and self._region_configured != self._region_detected): 

744 logger.warning('Region configured ({}) != region detected ({})'.format( 

745 repr(self._region_configured), repr(self._region_detected))) 

746 region_to_use = ( 

747 self._region_detected 

748 if self._region_configured == self.ATTEMPT_REGION_DISCOVERY 

749 else self._region_configured) # It will retain the None i.e. opted out 

750 logger.debug('Region to be used: {}'.format(repr(region_to_use))) 

751 if region_to_use: 

752 regional_host = ("{}.login.microsoft.com".format(region_to_use) 

753 if central_authority.instance in ( 

754 # The list came from point 3 of the algorithm section in this internal doc 

755 # https://identitydivision.visualstudio.com/DevEx/_git/AuthLibrariesApiReview?path=/PinAuthToRegion/AAD%20SDK%20Proposal%20to%20Pin%20Auth%20to%20region.md&anchor=algorithm&_a=preview 

756 "login.microsoftonline.com", 

757 "login.microsoft.com", 

758 "login.windows.net", 

759 "sts.windows.net", 

760 ) 

761 else "{}.{}".format(region_to_use, central_authority.instance)) 

762 return Authority( # The central_authority has already been validated 

763 "https://{}/{}".format(regional_host, central_authority.tenant), 

764 self.http_client, 

765 instance_discovery=False, 

766 ) 

767 return None 

768 

769 def _build_client(self, client_credential, authority, skip_regional_client=False): 

770 client_assertion = None 

771 client_assertion_type = None 

772 default_headers = { 

773 "x-client-sku": "MSAL.Python", "x-client-ver": __version__, 

774 "x-client-os": sys.platform, 

775 "x-ms-lib-capability": "retry-after, h429", 

776 } 

777 if self.app_name: 

778 default_headers['x-app-name'] = self.app_name 

779 if self.app_version: 

780 default_headers['x-app-ver'] = self.app_version 

781 default_body = {"client_info": 1} 

782 if isinstance(client_credential, dict): 

783 client_assertion_type = Client.CLIENT_ASSERTION_TYPE_JWT 

784 # Use client_credential.get("...") rather than "..." in client_credential 

785 # so that we can ignore an empty string came from an empty ENV VAR. 

786 if client_credential.get("client_assertion"): 

787 client_assertion = client_credential['client_assertion'] 

788 else: 

789 headers = {} 

790 sha1_thumbprint = sha256_thumbprint = None 

791 passphrase_bytes = _str2bytes( 

792 client_credential["passphrase"] 

793 ) if client_credential.get("passphrase") else None 

794 if client_credential.get("private_key_pfx_path"): 

795 private_key, sha256_thumbprint, sha1_thumbprint, x5c = _parse_pfx( 

796 client_credential["private_key_pfx_path"], 

797 passphrase_bytes) 

798 if client_credential.get("public_certificate") is True and x5c: 

799 headers["x5c"] = x5c 

800 elif ( 

801 client_credential.get("private_key") # PEM blob 

802 and client_credential.get("thumbprint")): 

803 sha1_thumbprint = client_credential["thumbprint"] 

804 if passphrase_bytes: 

805 private_key = _load_private_key_from_pem_str( 

806 client_credential['private_key'], passphrase_bytes) 

807 else: # PEM without passphrase 

808 private_key = client_credential['private_key'] 

809 else: 

810 raise ValueError( 

811 "client_credential needs to follow this format " 

812 "https://msal-python.readthedocs.io/en/latest/#msal.ClientApplication.params.client_credential") 

813 if ("x5c" not in headers # So the .pfx file contains no certificate 

814 and isinstance(client_credential.get('public_certificate'), str) 

815 ): # Then we treat the public_certificate value as PEM content 

816 headers["x5c"] = extract_certs(client_credential['public_certificate']) 

817 if sha256_thumbprint and not authority.is_adfs: 

818 assertion_params = { 

819 "algorithm": "PS256", "sha256_thumbprint": sha256_thumbprint, 

820 } 

821 else: # Fall back 

822 if not sha1_thumbprint: 

823 raise ValueError("You shall provide a thumbprint in SHA1.") 

824 assertion_params = { 

825 "algorithm": "RS256", "sha1_thumbprint": sha1_thumbprint, 

826 } 

827 assertion = JwtAssertionCreator( 

828 private_key, headers=headers, **assertion_params) 

829 client_assertion = assertion.create_regenerative_assertion( 

830 audience=authority.token_endpoint, issuer=self.client_id, 

831 additional_claims=self.client_claims or {}) 

832 else: 

833 default_body['client_secret'] = client_credential 

834 central_configuration = { 

835 "authorization_endpoint": authority.authorization_endpoint, 

836 "token_endpoint": authority.token_endpoint, 

837 "device_authorization_endpoint": authority.device_authorization_endpoint, 

838 } 

839 central_client = _ClientWithCcsRoutingInfo( 

840 central_configuration, 

841 self.client_id, 

842 http_client=self.http_client, 

843 default_headers=default_headers, 

844 default_body=default_body, 

845 client_assertion=client_assertion, 

846 client_assertion_type=client_assertion_type, 

847 on_obtaining_tokens=lambda event: self.token_cache.add(dict( 

848 event, environment=authority.instance)), 

849 on_removing_rt=self.token_cache.remove_rt, 

850 on_updating_rt=self.token_cache.update_rt) 

851 

852 regional_client = None 

853 if (client_credential # Currently regional endpoint only serves some CCA flows 

854 and not skip_regional_client): 

855 regional_authority = self._get_regional_authority(authority) 

856 if regional_authority: 

857 regional_configuration = { 

858 "authorization_endpoint": regional_authority.authorization_endpoint, 

859 "token_endpoint": regional_authority.token_endpoint, 

860 "device_authorization_endpoint": 

861 regional_authority.device_authorization_endpoint, 

862 } 

863 regional_client = _ClientWithCcsRoutingInfo( 

864 regional_configuration, 

865 self.client_id, 

866 http_client=self.http_client, 

867 default_headers=default_headers, 

868 default_body=default_body, 

869 client_assertion=client_assertion, 

870 client_assertion_type=client_assertion_type, 

871 on_obtaining_tokens=lambda event: self.token_cache.add(dict( 

872 event, environment=authority.instance)), 

873 on_removing_rt=self.token_cache.remove_rt, 

874 on_updating_rt=self.token_cache.update_rt) 

875 return central_client, regional_client 

876 

877 def initiate_auth_code_flow( 

878 self, 

879 scopes, # type: list[str] 

880 redirect_uri=None, 

881 state=None, # Recommended by OAuth2 for CSRF protection 

882 prompt=None, 

883 login_hint=None, # type: Optional[str] 

884 domain_hint=None, # type: Optional[str] 

885 claims_challenge=None, 

886 max_age=None, 

887 response_mode=None, # type: Optional[str] 

888 ): 

889 """Initiate an auth code flow. 

890 

891 Later when the response reaches your redirect_uri, 

892 you can use :func:`~acquire_token_by_auth_code_flow()` 

893 to complete the authentication/authorization. 

894 

895 :param list scopes: 

896 It is a list of case-sensitive strings. 

897 :param str redirect_uri: 

898 Optional. If not specified, server will use the pre-registered one. 

899 :param str state: 

900 An opaque value used by the client to 

901 maintain state between the request and callback. 

902 If absent, this library will automatically generate one internally. 

903 :param str prompt: 

904 By default, no prompt value will be sent, not even string ``"none"``. 

905 You will have to specify a value explicitly. 

906 Its valid values are the constants defined in 

907 :class:`Prompt <msal.Prompt>`. 

908 

909 :param str login_hint: 

910 Optional. Identifier of the user. Generally a User Principal Name (UPN). 

911 :param domain_hint: 

912 Can be one of "consumers" or "organizations" or your tenant domain "contoso.com". 

913 If included, it will skip the email-based discovery process that user goes 

914 through on the sign-in page, leading to a slightly more streamlined user experience. 

915 More information on possible values available in 

916 `Auth Code Flow doc <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-auth-code-flow#request-an-authorization-code>`_ and 

917 `domain_hint doc <https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-oapx/86fb452d-e34a-494e-ac61-e526e263b6d8>`_. 

918 

919 :param int max_age: 

920 OPTIONAL. Maximum Authentication Age. 

921 Specifies the allowable elapsed time in seconds 

922 since the last time the End-User was actively authenticated. 

923 If the elapsed time is greater than this value, 

924 Microsoft identity platform will actively re-authenticate the End-User. 

925 

926 MSAL Python will also automatically validate the auth_time in ID token. 

927 

928 New in version 1.15. 

929 

930 :param str response_mode: 

931 OPTIONAL. Specifies the method with which response parameters should be returned. 

932 The default value is equivalent to ``query``, which is still secure enough in MSAL Python 

933 (because MSAL Python does not transfer tokens via query parameter in the first place). 

934 For even better security, we recommend using the value ``form_post``. 

935 In "form_post" mode, response parameters 

936 will be encoded as HTML form values that are transmitted via the HTTP POST method and 

937 encoded in the body using the application/x-www-form-urlencoded format. 

938 Valid values can be either "form_post" for HTTP POST to callback URI or 

939 "query" (the default) for HTTP GET with parameters encoded in query string. 

940 More information on possible values 

941 `here <https://openid.net/specs/oauth-v2-multiple-response-types-1_0.html#ResponseModes>` 

942 and `here <https://openid.net/specs/oauth-v2-form-post-response-mode-1_0.html#FormPostResponseMode>` 

943 

944 :return: 

945 The auth code flow. It is a dict in this form:: 

946 

947 { 

948 "auth_uri": "https://...", // Guide user to visit this 

949 "state": "...", // You may choose to verify it by yourself, 

950 // or just let acquire_token_by_auth_code_flow() 

951 // do that for you. 

952 "...": "...", // Everything else are reserved and internal 

953 } 

954 

955 The caller is expected to: 

956 

957 1. somehow store this content, typically inside the current session, 

958 2. guide the end user (i.e. resource owner) to visit that auth_uri, 

959 3. and then relay this dict and subsequent auth response to 

960 :func:`~acquire_token_by_auth_code_flow()`. 

961 """ 

962 client = _ClientWithCcsRoutingInfo( 

963 {"authorization_endpoint": self.authority.authorization_endpoint}, 

964 self.client_id, 

965 http_client=self.http_client) 

966 flow = client.initiate_auth_code_flow( 

967 redirect_uri=redirect_uri, state=state, login_hint=login_hint, 

968 prompt=prompt, 

969 scope=self._decorate_scope(scopes), 

970 domain_hint=domain_hint, 

971 claims=_merge_claims_challenge_and_capabilities( 

972 self._client_capabilities, claims_challenge), 

973 max_age=max_age, 

974 response_mode=response_mode, 

975 ) 

976 flow["claims_challenge"] = claims_challenge 

977 return flow 

978 

979 def get_authorization_request_url( 

980 self, 

981 scopes, # type: list[str] 

982 login_hint=None, # type: Optional[str] 

983 state=None, # Recommended by OAuth2 for CSRF protection 

984 redirect_uri=None, 

985 response_type="code", # Could be "token" if you use Implicit Grant 

986 prompt=None, 

987 nonce=None, 

988 domain_hint=None, # type: Optional[str] 

989 claims_challenge=None, 

990 **kwargs): 

991 """Constructs a URL for you to start a Authorization Code Grant. 

992 

993 :param list[str] scopes: (Required) 

994 Scopes requested to access a protected API (a resource). 

995 :param str state: Recommended by OAuth2 for CSRF protection. 

996 :param str login_hint: 

997 Identifier of the user. Generally a User Principal Name (UPN). 

998 :param str redirect_uri: 

999 Address to return to upon receiving a response from the authority. 

1000 :param str response_type: 

1001 Default value is "code" for an OAuth2 Authorization Code grant. 

1002 

1003 You could use other content such as "id_token" or "token", 

1004 which would trigger an Implicit Grant, but that is 

1005 `not recommended <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-implicit-grant-flow#is-the-implicit-grant-suitable-for-my-app>`_. 

1006 

1007 :param str prompt: 

1008 By default, no prompt value will be sent, not even string ``"none"``. 

1009 You will have to specify a value explicitly. 

1010 Its valid values are the constants defined in 

1011 :class:`Prompt <msal.Prompt>`. 

1012 :param nonce: 

1013 A cryptographically random value used to mitigate replay attacks. See also 

1014 `OIDC specs <https://openid.net/specs/openid-connect-core-1_0.html#AuthRequest>`_. 

1015 :param domain_hint: 

1016 Can be one of "consumers" or "organizations" or your tenant domain "contoso.com". 

1017 If included, it will skip the email-based discovery process that user goes 

1018 through on the sign-in page, leading to a slightly more streamlined user experience. 

1019 More information on possible values available in 

1020 `Auth Code Flow doc <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-auth-code-flow#request-an-authorization-code>`_ and 

1021 `domain_hint doc <https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-oapx/86fb452d-e34a-494e-ac61-e526e263b6d8>`_. 

1022 :param claims_challenge: 

1023 The claims_challenge parameter requests specific claims requested by the resource provider 

1024 in the form of a claims_challenge directive in the www-authenticate header to be 

1025 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

1026 It is a string of a JSON object which contains lists of claims being requested from these locations. 

1027 

1028 :return: The authorization url as a string. 

1029 """ 

1030 authority = kwargs.pop("authority", None) # Historically we support this 

1031 if authority: 

1032 warnings.warn( 

1033 "We haven't decided if this method will accept authority parameter") 

1034 # The previous implementation is, it will use self.authority by default. 

1035 # Multi-tenant app can use new authority on demand 

1036 the_authority = Authority( 

1037 authority, 

1038 self.http_client, 

1039 instance_discovery=self._instance_discovery, 

1040 ) if authority else self.authority 

1041 

1042 client = _ClientWithCcsRoutingInfo( 

1043 {"authorization_endpoint": the_authority.authorization_endpoint}, 

1044 self.client_id, 

1045 http_client=self.http_client) 

1046 warnings.warn( 

1047 "Change your get_authorization_request_url() " 

1048 "to initiate_auth_code_flow()", DeprecationWarning) 

1049 with warnings.catch_warnings(record=True): 

1050 return client.build_auth_request_uri( 

1051 response_type=response_type, 

1052 redirect_uri=redirect_uri, state=state, login_hint=login_hint, 

1053 prompt=prompt, 

1054 scope=self._decorate_scope(scopes), 

1055 nonce=nonce, 

1056 domain_hint=domain_hint, 

1057 claims=_merge_claims_challenge_and_capabilities( 

1058 self._client_capabilities, claims_challenge), 

1059 ) 

1060 

1061 def acquire_token_by_auth_code_flow( 

1062 self, auth_code_flow, auth_response, scopes=None, **kwargs): 

1063 """Validate the auth response being redirected back, and obtain tokens. 

1064 

1065 It automatically provides nonce protection. 

1066 

1067 :param dict auth_code_flow: 

1068 The same dict returned by :func:`~initiate_auth_code_flow()`. 

1069 :param dict auth_response: 

1070 A dict of the query string received from auth server. 

1071 :param list[str] scopes: 

1072 Scopes requested to access a protected API (a resource). 

1073 

1074 Most of the time, you can leave it empty. 

1075 

1076 If you requested user consent for multiple resources, here you will 

1077 need to provide a subset of what you required in 

1078 :func:`~initiate_auth_code_flow()`. 

1079 

1080 OAuth2 was designed mostly for singleton services, 

1081 where tokens are always meant for the same resource and the only 

1082 changes are in the scopes. 

1083 In Microsoft Entra, tokens can be issued for multiple 3rd party resources. 

1084 You can ask authorization code for multiple resources, 

1085 but when you redeem it, the token is for only one intended 

1086 recipient, called audience. 

1087 So the developer need to specify a scope so that we can restrict the 

1088 token to be issued for the corresponding audience. 

1089 

1090 :return: 

1091 * A dict containing "access_token" and/or "id_token", among others, 

1092 depends on what scope was used. 

1093 (See https://tools.ietf.org/html/rfc6749#section-5.1) 

1094 * A dict containing "error", optionally "error_description", "error_uri". 

1095 (It is either `this <https://tools.ietf.org/html/rfc6749#section-4.1.2.1>`_ 

1096 or `that <https://tools.ietf.org/html/rfc6749#section-5.2>`_) 

1097 * Most client-side data error would result in ValueError exception. 

1098 So the usage pattern could be without any protocol details:: 

1099 

1100 def authorize(): # A controller in a web app 

1101 try: 

1102 result = msal_app.acquire_token_by_auth_code_flow( 

1103 session.get("flow", {}), request.args) 

1104 if "error" in result: 

1105 return render_template("error.html", result) 

1106 use(result) # Token(s) are available in result and cache 

1107 except ValueError: # Usually caused by CSRF 

1108 pass # Simply ignore them 

1109 return redirect(url_for("index")) 

1110 """ 

1111 self._validate_ssh_cert_input_data(kwargs.get("data", {})) 

1112 telemetry_context = self._build_telemetry_context( 

1113 self.ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID) 

1114 response = _clean_up(self.client.obtain_token_by_auth_code_flow( 

1115 auth_code_flow, 

1116 auth_response, 

1117 scope=self._decorate_scope(scopes) if scopes else None, 

1118 headers=telemetry_context.generate_headers(), 

1119 data=dict( 

1120 kwargs.pop("data", {}), 

1121 claims=_merge_claims_challenge_and_capabilities( 

1122 self._client_capabilities, 

1123 auth_code_flow.pop("claims_challenge", None))), 

1124 **kwargs)) 

1125 if "access_token" in response: 

1126 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

1127 telemetry_context.update_telemetry(response) 

1128 return response 

1129 

1130 def acquire_token_by_authorization_code( 

1131 self, 

1132 code, 

1133 scopes, # Syntactically required. STS accepts empty value though. 

1134 redirect_uri=None, 

1135 # REQUIRED, if the "redirect_uri" parameter was included in the 

1136 # authorization request as described in Section 4.1.1, and their 

1137 # values MUST be identical. 

1138 nonce=None, 

1139 claims_challenge=None, 

1140 **kwargs): 

1141 """The second half of the Authorization Code Grant. 

1142 

1143 :param code: The authorization code returned from Authorization Server. 

1144 :param list[str] scopes: (Required) 

1145 Scopes requested to access a protected API (a resource). 

1146 

1147 If you requested user consent for multiple resources, here you will 

1148 typically want to provide a subset of what you required in AuthCode. 

1149 

1150 OAuth2 was designed mostly for singleton services, 

1151 where tokens are always meant for the same resource and the only 

1152 changes are in the scopes. 

1153 In Microsoft Entra, tokens can be issued for multiple 3rd party resources. 

1154 You can ask authorization code for multiple resources, 

1155 but when you redeem it, the token is for only one intended 

1156 recipient, called audience. 

1157 So the developer need to specify a scope so that we can restrict the 

1158 token to be issued for the corresponding audience. 

1159 

1160 :param nonce: 

1161 If you provided a nonce when calling :func:`get_authorization_request_url`, 

1162 same nonce should also be provided here, so that we'll validate it. 

1163 An exception will be raised if the nonce in id token mismatches. 

1164 

1165 :param claims_challenge: 

1166 The claims_challenge parameter requests specific claims requested by the resource provider 

1167 in the form of a claims_challenge directive in the www-authenticate header to be 

1168 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

1169 It is a string of a JSON object which contains lists of claims being requested from these locations. 

1170 

1171 :return: A dict representing the json response from Microsoft Entra: 

1172 

1173 - A successful response would contain "access_token" key, 

1174 - an error response would contain "error" and usually "error_description". 

1175 """ 

1176 # If scope is absent on the wire, STS will give you a token associated 

1177 # to the FIRST scope sent during the authorization request. 

1178 # So in theory, you can omit scope here when you were working with only 

1179 # one scope. But, MSAL decorates your scope anyway, so they are never 

1180 # really empty. 

1181 assert isinstance(scopes, list), "Invalid parameter type" 

1182 self._validate_ssh_cert_input_data(kwargs.get("data", {})) 

1183 warnings.warn( 

1184 "Change your acquire_token_by_authorization_code() " 

1185 "to acquire_token_by_auth_code_flow()", DeprecationWarning) 

1186 with warnings.catch_warnings(record=True): 

1187 telemetry_context = self._build_telemetry_context( 

1188 self.ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID) 

1189 response = _clean_up(self.client.obtain_token_by_authorization_code( 

1190 code, redirect_uri=redirect_uri, 

1191 scope=self._decorate_scope(scopes), 

1192 headers=telemetry_context.generate_headers(), 

1193 data=dict( 

1194 kwargs.pop("data", {}), 

1195 claims=_merge_claims_challenge_and_capabilities( 

1196 self._client_capabilities, claims_challenge)), 

1197 nonce=nonce, 

1198 **kwargs)) 

1199 if "access_token" in response: 

1200 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

1201 telemetry_context.update_telemetry(response) 

1202 return response 

1203 

1204 def get_accounts(self, username=None): 

1205 """Get a list of accounts which previously signed in, i.e. exists in cache. 

1206 

1207 An account can later be used in :func:`~acquire_token_silent` 

1208 to find its tokens. 

1209 

1210 :param username: 

1211 Filter accounts with this username only. Case insensitive. 

1212 :return: A list of account objects. 

1213 Each account is a dict. For now, we only document its "username" field. 

1214 Your app can choose to display those information to end user, 

1215 and allow user to choose one of his/her accounts to proceed. 

1216 """ 

1217 accounts = self._find_msal_accounts(environment=self.authority.instance) 

1218 if not accounts: # Now try other aliases of this authority instance 

1219 for alias in self._get_authority_aliases(self.authority.instance): 

1220 accounts = self._find_msal_accounts(environment=alias) 

1221 if accounts: 

1222 break 

1223 if username: 

1224 # Federated account["username"] from AAD could contain mixed case 

1225 lowercase_username = username.lower() 

1226 accounts = [a for a in accounts 

1227 if a["username"].lower() == lowercase_username] 

1228 if not accounts: 

1229 logger.debug(( # This would also happen when the cache is empty 

1230 "get_accounts(username='{}') finds no account. " 

1231 "If tokens were acquired without 'profile' scope, " 

1232 "they would contain no username for filtering. " 

1233 "Consider calling get_accounts(username=None) instead." 

1234 ).format(username)) 

1235 # Does not further filter by existing RTs here. It probably won't matter. 

1236 # Because in most cases Accounts and RTs co-exist. 

1237 # Even in the rare case when an RT is revoked and then removed, 

1238 # acquire_token_silent() would then yield no result, 

1239 # apps would fall back to other acquire methods. This is the standard pattern. 

1240 return accounts 

1241 

1242 def _find_msal_accounts(self, environment): 

1243 interested_authority_types = [ 

1244 TokenCache.AuthorityType.ADFS, TokenCache.AuthorityType.MSSTS] 

1245 if _is_running_in_cloud_shell(): 

1246 interested_authority_types.append(_AUTHORITY_TYPE_CLOUDSHELL) 

1247 grouped_accounts = { 

1248 a.get("home_account_id"): # Grouped by home tenant's id 

1249 { # These are minimal amount of non-tenant-specific account info 

1250 "home_account_id": a.get("home_account_id"), 

1251 "environment": a.get("environment"), 

1252 "username": a.get("username"), 

1253 "account_source": a.get("account_source"), 

1254 

1255 # The following fields for backward compatibility, for now 

1256 "authority_type": a.get("authority_type"), 

1257 "local_account_id": a.get("local_account_id"), # Tenant-specific 

1258 "realm": a.get("realm"), # Tenant-specific 

1259 } 

1260 for a in self.token_cache.search( 

1261 TokenCache.CredentialType.ACCOUNT, 

1262 query={"environment": environment}) 

1263 if a["authority_type"] in interested_authority_types 

1264 } 

1265 return list(grouped_accounts.values()) 

1266 

1267 def _get_instance_metadata(self): # This exists so it can be mocked in unit test 

1268 resp = self.http_client.get( 

1269 "https://login.microsoftonline.com/common/discovery/instance?api-version=1.1&authorization_endpoint=https://login.microsoftonline.com/common/oauth2/authorize", # TBD: We may extend this to use self._instance_discovery endpoint 

1270 headers={'Accept': 'application/json'}) 

1271 resp.raise_for_status() 

1272 return json.loads(resp.text)['metadata'] 

1273 

1274 def _get_authority_aliases(self, instance): 

1275 if self._instance_discovery is False: 

1276 return [] 

1277 if self.authority._is_known_to_developer: 

1278 # Then it is an ADFS/B2C/known_authority_hosts situation 

1279 # which may not reach the central endpoint, so we skip it. 

1280 return [] 

1281 if not self.authority_groups: 

1282 self.authority_groups = [ 

1283 set(group['aliases']) for group in self._get_instance_metadata()] 

1284 for group in self.authority_groups: 

1285 if instance in group: 

1286 return [alias for alias in group if alias != instance] 

1287 return [] 

1288 

1289 def remove_account(self, account): 

1290 """Sign me out and forget me from token cache""" 

1291 if self._enable_broker: 

1292 from .broker import _signout_silently 

1293 error = _signout_silently(self.client_id, account["local_account_id"]) 

1294 if error: 

1295 logger.debug("_signout_silently() returns error: %s", error) 

1296 # Broker sign-out has been attempted, even if the _forget_me() below throws. 

1297 self._forget_me(account) 

1298 

1299 def _sign_out(self, home_account): 

1300 # Remove all relevant RTs and ATs from token cache 

1301 owned_by_home_account = { 

1302 "environment": home_account["environment"], 

1303 "home_account_id": home_account["home_account_id"],} # realm-independent 

1304 app_metadata = self._get_app_metadata(home_account["environment"]) 

1305 # Remove RTs/FRTs, and they are realm-independent 

1306 for rt in [ # Remove RTs from a static list (rather than from a dynamic generator), 

1307 # to avoid changing self.token_cache while it is being iterated 

1308 rt for rt in self.token_cache.search( 

1309 TokenCache.CredentialType.REFRESH_TOKEN, query=owned_by_home_account) 

1310 # Do RT's app ownership check as a precaution, in case family apps 

1311 # and 3rd-party apps share same token cache, although they should not. 

1312 if rt["client_id"] == self.client_id or ( 

1313 app_metadata.get("family_id") # Now let's settle family business 

1314 and rt.get("family_id") == app_metadata["family_id"]) 

1315 ]: 

1316 self.token_cache.remove_rt(rt) 

1317 for at in list(self.token_cache.search( # Remove ATs from a static list, 

1318 # to avoid changing self.token_cache while it is being iterated 

1319 TokenCache.CredentialType.ACCESS_TOKEN, query=owned_by_home_account, 

1320 # Regardless of realm, b/c we've removed realm-independent RTs anyway 

1321 )): 

1322 # To avoid the complexity of locating sibling family app's AT, 

1323 # we skip AT's app ownership check. 

1324 # It means ATs for other apps will also be removed, it is OK because: 

1325 # * non-family apps are not supposed to share token cache to begin with; 

1326 # * Even if it happens, we keep other app's RT already, so SSO still works 

1327 self.token_cache.remove_at(at) 

1328 

1329 def _forget_me(self, home_account): 

1330 # It implies signout, and then also remove all relevant accounts and IDTs 

1331 self._sign_out(home_account) 

1332 owned_by_home_account = { 

1333 "environment": home_account["environment"], 

1334 "home_account_id": home_account["home_account_id"],} # realm-independent 

1335 for idt in list(self.token_cache.search( # Remove IDTs from a static list, 

1336 # to avoid changing self.token_cache while it is being iterated 

1337 TokenCache.CredentialType.ID_TOKEN, query=owned_by_home_account, # regardless of realm 

1338 )): 

1339 self.token_cache.remove_idt(idt) 

1340 for a in list(self.token_cache.search( # Remove Accounts from a static list, 

1341 # to avoid changing self.token_cache while it is being iterated 

1342 TokenCache.CredentialType.ACCOUNT, query=owned_by_home_account, # regardless of realm 

1343 )): 

1344 self.token_cache.remove_account(a) 

1345 

1346 def _acquire_token_by_cloud_shell(self, scopes, data=None): 

1347 from .cloudshell import _obtain_token 

1348 response = _obtain_token( 

1349 self.http_client, scopes, client_id=self.client_id, data=data) 

1350 if "error" not in response: 

1351 self.token_cache.add(dict( 

1352 client_id=self.client_id, 

1353 scope=response["scope"].split() if "scope" in response else scopes, 

1354 token_endpoint=self.authority.token_endpoint, 

1355 response=response, 

1356 data=data or {}, 

1357 authority_type=_AUTHORITY_TYPE_CLOUDSHELL, 

1358 )) 

1359 if "access_token" in response: 

1360 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_BROKER 

1361 return response 

1362 

1363 def acquire_token_silent( 

1364 self, 

1365 scopes, # type: List[str] 

1366 account, # type: Optional[Account] 

1367 authority=None, # See get_authorization_request_url() 

1368 force_refresh=False, # type: Optional[boolean] 

1369 claims_challenge=None, 

1370 auth_scheme=None, 

1371 **kwargs): 

1372 """Acquire an access token for given account, without user interaction. 

1373 

1374 It has same parameters as the :func:`~acquire_token_silent_with_error`. 

1375 The difference is the behavior of the return value. 

1376 This method will combine the cache empty and refresh error 

1377 into one return value, `None`. 

1378 If your app does not care about the exact token refresh error during 

1379 token cache look-up, then this method is easier and recommended. 

1380 

1381 :return: 

1382 - A dict containing no "error" key, 

1383 and typically contains an "access_token" key, 

1384 if cache lookup succeeded. 

1385 - None when cache lookup does not yield a token. 

1386 """ 

1387 if not account: 

1388 return None # A backward-compatible NO-OP to drop the account=None usage 

1389 result = _clean_up(self._acquire_token_silent_with_error( 

1390 scopes, account, authority=authority, force_refresh=force_refresh, 

1391 claims_challenge=claims_challenge, auth_scheme=auth_scheme, **kwargs)) 

1392 return result if result and "error" not in result else None 

1393 

1394 def acquire_token_silent_with_error( 

1395 self, 

1396 scopes, # type: List[str] 

1397 account, # type: Optional[Account] 

1398 authority=None, # See get_authorization_request_url() 

1399 force_refresh=False, # type: Optional[boolean] 

1400 claims_challenge=None, 

1401 auth_scheme=None, 

1402 **kwargs): 

1403 """Acquire an access token for given account, without user interaction. 

1404 

1405 It is done either by finding a valid access token from cache, 

1406 or by finding a valid refresh token from cache and then automatically 

1407 use it to redeem a new access token. 

1408 

1409 This method will differentiate cache empty from token refresh error. 

1410 If your app cares the exact token refresh error during 

1411 token cache look-up, then this method is suitable. 

1412 Otherwise, the other method :func:`~acquire_token_silent` is recommended. 

1413 

1414 :param list[str] scopes: (Required) 

1415 Scopes requested to access a protected API (a resource). 

1416 :param account: (Required) 

1417 One of the account object returned by :func:`~get_accounts`. 

1418 Starting from MSAL Python 1.23, 

1419 a ``None`` input will become a NO-OP and always return ``None``. 

1420 :param force_refresh: 

1421 If True, it will skip Access Token look-up, 

1422 and try to find a Refresh Token to obtain a new Access Token. 

1423 :param claims_challenge: 

1424 The claims_challenge parameter requests specific claims requested by the resource provider 

1425 in the form of a claims_challenge directive in the www-authenticate header to be 

1426 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

1427 It is a string of a JSON object which contains lists of claims being requested from these locations. 

1428 :param object auth_scheme: 

1429 You can provide an ``msal.auth_scheme.PopAuthScheme`` object 

1430 so that MSAL will get a Proof-of-Possession (POP) token for you. 

1431 

1432 New in version 1.26.0. 

1433 

1434 :return: 

1435 - A dict containing no "error" key, 

1436 and typically contains an "access_token" key, 

1437 if cache lookup succeeded. 

1438 - None when there is simply no token in the cache. 

1439 - A dict containing an "error" key, when token refresh failed. 

1440 """ 

1441 if not account: 

1442 return None # A backward-compatible NO-OP to drop the account=None usage 

1443 return _clean_up(self._acquire_token_silent_with_error( 

1444 scopes, account, authority=authority, force_refresh=force_refresh, 

1445 claims_challenge=claims_challenge, auth_scheme=auth_scheme, **kwargs)) 

1446 

1447 def _acquire_token_silent_with_error( 

1448 self, 

1449 scopes, # type: List[str] 

1450 account, # type: Optional[Account] 

1451 authority=None, # See get_authorization_request_url() 

1452 force_refresh=False, # type: Optional[boolean] 

1453 claims_challenge=None, 

1454 auth_scheme=None, 

1455 **kwargs): 

1456 assert isinstance(scopes, list), "Invalid parameter type" 

1457 self._validate_ssh_cert_input_data(kwargs.get("data", {})) 

1458 correlation_id = msal.telemetry._get_new_correlation_id() 

1459 if authority: 

1460 warnings.warn("We haven't decided how/if this method will accept authority parameter") 

1461 # the_authority = Authority( 

1462 # authority, 

1463 # self.http_client, 

1464 # instance_discovery=self._instance_discovery, 

1465 # ) if authority else self.authority 

1466 result = self._acquire_token_silent_from_cache_and_possibly_refresh_it( 

1467 scopes, account, self.authority, force_refresh=force_refresh, 

1468 claims_challenge=claims_challenge, 

1469 correlation_id=correlation_id, 

1470 auth_scheme=auth_scheme, 

1471 **kwargs) 

1472 if result and "error" not in result: 

1473 return result 

1474 final_result = result 

1475 for alias in self._get_authority_aliases(self.authority.instance): 

1476 if not list(self.token_cache.search( # Need a list to test emptiness 

1477 self.token_cache.CredentialType.REFRESH_TOKEN, 

1478 # target=scopes, # MUST NOT filter by scopes, because: 

1479 # 1. AAD RTs are scope-independent; 

1480 # 2. therefore target is optional per schema; 

1481 query={"environment": alias})): 

1482 # Skip heavy weight logic when RT for this alias doesn't exist 

1483 continue 

1484 the_authority = Authority( 

1485 "https://" + alias + "/" + self.authority.tenant, 

1486 self.http_client, 

1487 instance_discovery=False, 

1488 ) 

1489 result = self._acquire_token_silent_from_cache_and_possibly_refresh_it( 

1490 scopes, account, the_authority, force_refresh=force_refresh, 

1491 claims_challenge=claims_challenge, 

1492 correlation_id=correlation_id, 

1493 auth_scheme=auth_scheme, 

1494 **kwargs) 

1495 if result: 

1496 if "error" not in result: 

1497 return result 

1498 final_result = result 

1499 if final_result and final_result.get("suberror"): 

1500 final_result["classification"] = { # Suppress these suberrors, per #57 

1501 "bad_token": "", 

1502 "token_expired": "", 

1503 "protection_policy_required": "", 

1504 "client_mismatch": "", 

1505 "device_authentication_failed": "", 

1506 }.get(final_result["suberror"], final_result["suberror"]) 

1507 return final_result 

1508 

1509 def _acquire_token_silent_from_cache_and_possibly_refresh_it( 

1510 self, 

1511 scopes, # type: List[str] 

1512 account, # type: Optional[Account] 

1513 authority, # This can be different than self.authority 

1514 force_refresh=False, # type: Optional[boolean] 

1515 claims_challenge=None, 

1516 correlation_id=None, 

1517 http_exceptions=None, 

1518 auth_scheme=None, 

1519 **kwargs): 

1520 # This internal method has two calling patterns: 

1521 # it accepts a non-empty account to find token for a user, 

1522 # and accepts account=None to find a token for the current app. 

1523 access_token_from_cache = None 

1524 if not (force_refresh or claims_challenge or auth_scheme): # Then attempt AT cache 

1525 query={ 

1526 "client_id": self.client_id, 

1527 "environment": authority.instance, 

1528 "realm": authority.tenant, 

1529 "home_account_id": (account or {}).get("home_account_id"), 

1530 } 

1531 key_id = kwargs.get("data", {}).get("key_id") 

1532 if key_id: # Some token types (SSH-certs, POP) are bound to a key 

1533 query["key_id"] = key_id 

1534 now = time.time() 

1535 refresh_reason = msal.telemetry.AT_ABSENT 

1536 for entry in self.token_cache.search( # A generator allows us to 

1537 # break early in cache-hit without finding a full list 

1538 self.token_cache.CredentialType.ACCESS_TOKEN, 

1539 target=scopes, 

1540 query=query, 

1541 ): # This loop is about token search, not about token deletion. 

1542 # Note that search() holds a lock during this loop; 

1543 # that is fine because this loop is fast 

1544 expires_in = int(entry["expires_on"]) - now 

1545 if expires_in < 5*60: # Then consider it expired 

1546 refresh_reason = msal.telemetry.AT_EXPIRED 

1547 continue # Removal is not necessary, it will be overwritten 

1548 logger.debug("Cache hit an AT") 

1549 access_token_from_cache = { # Mimic a real response 

1550 "access_token": entry["secret"], 

1551 "token_type": entry.get("token_type", "Bearer"), 

1552 "expires_in": int(expires_in), # OAuth2 specs defines it as int 

1553 self._TOKEN_SOURCE: self._TOKEN_SOURCE_CACHE, 

1554 } 

1555 if "refresh_on" in entry: 

1556 access_token_from_cache["refresh_on"] = int(entry["refresh_on"]) 

1557 if int(entry["refresh_on"]) < now: # aging 

1558 refresh_reason = msal.telemetry.AT_AGING 

1559 break # With a fallback in hand, we break here to go refresh 

1560 self._build_telemetry_context(-1).hit_an_access_token() 

1561 return access_token_from_cache # It is still good as new 

1562 else: 

1563 refresh_reason = msal.telemetry.FORCE_REFRESH # TODO: It could also mean claims_challenge 

1564 assert refresh_reason, "It should have been established at this point" 

1565 if not http_exceptions: # It can be a tuple of exceptions 

1566 # The exact HTTP exceptions are transportation-layer dependent 

1567 from requests.exceptions import RequestException # Lazy load 

1568 http_exceptions = (RequestException,) 

1569 try: 

1570 data = kwargs.get("data", {}) 

1571 if account and account.get("authority_type") == _AUTHORITY_TYPE_CLOUDSHELL: 

1572 if auth_scheme: 

1573 raise ValueError("auth_scheme is not supported in Cloud Shell") 

1574 return self._acquire_token_by_cloud_shell(scopes, data=data) 

1575 

1576 if self._enable_broker and account and account.get("account_source") in ( 

1577 _GRANT_TYPE_BROKER, # Broker successfully established this account previously. 

1578 None, # Unknown data from older MSAL. Broker might still work. 

1579 ): 

1580 from .broker import _acquire_token_silently 

1581 response = _acquire_token_silently( 

1582 "https://{}/{}".format(self.authority.instance, self.authority.tenant), 

1583 self.client_id, 

1584 account["local_account_id"], 

1585 scopes, 

1586 claims=_merge_claims_challenge_and_capabilities( 

1587 self._client_capabilities, claims_challenge), 

1588 correlation_id=correlation_id, 

1589 auth_scheme=auth_scheme, 

1590 **data) 

1591 if response: # Broker provides a decisive outcome 

1592 account_was_established_by_broker = account.get( 

1593 "account_source") == _GRANT_TYPE_BROKER 

1594 broker_attempt_succeeded_just_now = "error" not in response 

1595 if account_was_established_by_broker or broker_attempt_succeeded_just_now: 

1596 return self._process_broker_response(response, scopes, data) 

1597 

1598 if auth_scheme: 

1599 raise ValueError(self._AUTH_SCHEME_UNSUPPORTED) 

1600 if account: 

1601 result = self._acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family( 

1602 authority, self._decorate_scope(scopes), account, 

1603 refresh_reason=refresh_reason, claims_challenge=claims_challenge, 

1604 correlation_id=correlation_id, 

1605 **kwargs) 

1606 else: # The caller is acquire_token_for_client() 

1607 result = self._acquire_token_for_client( 

1608 scopes, refresh_reason, claims_challenge=claims_challenge, 

1609 **kwargs) 

1610 if result and "access_token" in result: 

1611 result[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

1612 if (result and "error" not in result) or (not access_token_from_cache): 

1613 return result 

1614 except http_exceptions: 

1615 # Typically network error. Potential AAD outage? 

1616 if not access_token_from_cache: # It means there is no fall back option 

1617 raise # We choose to bubble up the exception 

1618 return access_token_from_cache 

1619 

1620 def _process_broker_response(self, response, scopes, data): 

1621 if "error" not in response: 

1622 self.token_cache.add(dict( 

1623 client_id=self.client_id, 

1624 scope=response["scope"].split() if "scope" in response else scopes, 

1625 token_endpoint=self.authority.token_endpoint, 

1626 response=response, 

1627 data=data, 

1628 _account_id=response["_account_id"], 

1629 environment=self.authority.instance, # Be consistent with non-broker flows 

1630 grant_type=_GRANT_TYPE_BROKER, # A pseudo grant type for TokenCache to mark account_source as broker 

1631 )) 

1632 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_BROKER 

1633 return _clean_up(response) 

1634 

1635 def _acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family( 

1636 self, authority, scopes, account, **kwargs): 

1637 query = { 

1638 "environment": authority.instance, 

1639 "home_account_id": (account or {}).get("home_account_id"), 

1640 # "realm": authority.tenant, # AAD RTs are tenant-independent 

1641 } 

1642 app_metadata = self._get_app_metadata(authority.instance) 

1643 if not app_metadata: # Meaning this app is now used for the first time. 

1644 # When/if we have a way to directly detect current app's family, 

1645 # we'll rewrite this block, to support multiple families. 

1646 # For now, we try existing RTs (*). If it works, we are in that family. 

1647 # (*) RTs of a different app/family are not supposed to be 

1648 # shared with or accessible by us in the first place. 

1649 at = self._acquire_token_silent_by_finding_specific_refresh_token( 

1650 authority, scopes, 

1651 dict(query, family_id="1"), # A hack, we have only 1 family for now 

1652 rt_remover=lambda rt_item: None, # NO-OP b/c RTs are likely not mine 

1653 break_condition=lambda response: # Break loop when app not in family 

1654 # Based on an AAD-only behavior mentioned in internal doc here 

1655 # https://msazure.visualstudio.com/One/_git/ESTS-Docs/pullrequest/1138595 

1656 "client_mismatch" in response.get("error_additional_info", []), 

1657 **kwargs) 

1658 if at and "error" not in at: 

1659 return at 

1660 last_resp = None 

1661 if app_metadata.get("family_id"): # Meaning this app belongs to this family 

1662 last_resp = at = self._acquire_token_silent_by_finding_specific_refresh_token( 

1663 authority, scopes, dict(query, family_id=app_metadata["family_id"]), 

1664 **kwargs) 

1665 if at and "error" not in at: 

1666 return at 

1667 # Either this app is an orphan, so we will naturally use its own RT; 

1668 # or all attempts above have failed, so we fall back to non-foci behavior. 

1669 return self._acquire_token_silent_by_finding_specific_refresh_token( 

1670 authority, scopes, dict(query, client_id=self.client_id), 

1671 **kwargs) or last_resp 

1672 

1673 def _get_app_metadata(self, environment): 

1674 return self.token_cache._get_app_metadata( 

1675 environment=environment, client_id=self.client_id, default={}) 

1676 

1677 def _acquire_token_silent_by_finding_specific_refresh_token( 

1678 self, authority, scopes, query, 

1679 rt_remover=None, break_condition=lambda response: False, 

1680 refresh_reason=None, correlation_id=None, claims_challenge=None, 

1681 **kwargs): 

1682 matches = list(self.token_cache.search( # We want a list to test emptiness 

1683 self.token_cache.CredentialType.REFRESH_TOKEN, 

1684 # target=scopes, # AAD RTs are scope-independent 

1685 query=query)) 

1686 logger.debug("Found %d RTs matching %s", len(matches), { 

1687 k: _pii_less_home_account_id(v) if k == "home_account_id" and v else v 

1688 for k, v in query.items() 

1689 }) 

1690 

1691 response = None # A distinguishable value to mean cache is empty 

1692 if not matches: # Then exit early to avoid expensive operations 

1693 return response 

1694 client, _ = self._build_client( 

1695 # Potentially expensive if building regional client 

1696 self.client_credential, authority, skip_regional_client=True) 

1697 telemetry_context = self._build_telemetry_context( 

1698 self.ACQUIRE_TOKEN_SILENT_ID, 

1699 correlation_id=correlation_id, refresh_reason=refresh_reason) 

1700 for entry in sorted( # Since unfit RTs would not be aggressively removed, 

1701 # we start from newer RTs which are more likely fit. 

1702 matches, 

1703 key=lambda e: int(e.get("last_modification_time", "0")), 

1704 reverse=True): 

1705 logger.debug("Cache attempts an RT") 

1706 headers = telemetry_context.generate_headers() 

1707 if query.get("home_account_id"): # Then use it as CCS Routing info 

1708 headers["X-AnchorMailbox"] = "Oid:{}".format( # case-insensitive value 

1709 query["home_account_id"].replace(".", "@")) 

1710 response = client.obtain_token_by_refresh_token( 

1711 entry, rt_getter=lambda token_item: token_item["secret"], 

1712 on_removing_rt=lambda rt_item: None, # Disable RT removal, 

1713 # because an invalid_grant could be caused by new MFA policy, 

1714 # the RT could still be useful for other MFA-less scope or tenant 

1715 on_obtaining_tokens=lambda event: self.token_cache.add(dict( 

1716 event, 

1717 environment=authority.instance, 

1718 skip_account_creation=True, # To honor a concurrent remove_account() 

1719 )), 

1720 scope=scopes, 

1721 headers=headers, 

1722 data=dict( 

1723 kwargs.pop("data", {}), 

1724 claims=_merge_claims_challenge_and_capabilities( 

1725 self._client_capabilities, claims_challenge)), 

1726 **kwargs) 

1727 telemetry_context.update_telemetry(response) 

1728 if "error" not in response: 

1729 return response 

1730 logger.debug("Refresh failed. {error}: {error_description}".format( 

1731 error=response.get("error"), 

1732 error_description=response.get("error_description"), 

1733 )) 

1734 if break_condition(response): 

1735 break 

1736 return response # Returns the latest error (if any), or just None 

1737 

1738 def _validate_ssh_cert_input_data(self, data): 

1739 if data.get("token_type") == "ssh-cert": 

1740 if not data.get("req_cnf"): 

1741 raise ValueError( 

1742 "When requesting an SSH certificate, " 

1743 "you must include a string parameter named 'req_cnf' " 

1744 "containing the public key in JWK format " 

1745 "(https://tools.ietf.org/html/rfc7517).") 

1746 if not data.get("key_id"): 

1747 raise ValueError( 

1748 "When requesting an SSH certificate, " 

1749 "you must include a string parameter named 'key_id' " 

1750 "which identifies the key in the 'req_cnf' argument.") 

1751 

1752 def acquire_token_by_refresh_token(self, refresh_token, scopes, **kwargs): 

1753 """Acquire token(s) based on a refresh token (RT) obtained from elsewhere. 

1754 

1755 You use this method only when you have old RTs from elsewhere, 

1756 and now you want to migrate them into MSAL. 

1757 Calling this method results in new tokens automatically storing into MSAL. 

1758 

1759 You do NOT need to use this method if you are already using MSAL. 

1760 MSAL maintains RT automatically inside its token cache, 

1761 and an access token can be retrieved 

1762 when you call :func:`~acquire_token_silent`. 

1763 

1764 :param str refresh_token: The old refresh token, as a string. 

1765 

1766 :param list scopes: 

1767 The scopes associate with this old RT. 

1768 Each scope needs to be in the Microsoft identity platform (v2) format. 

1769 See `Scopes not resources <https://docs.microsoft.com/en-us/azure/active-directory/develop/migrate-python-adal-msal#scopes-not-resources>`_. 

1770 

1771 :return: 

1772 * A dict contains "error" and some other keys, when error happened. 

1773 * A dict contains no "error" key means migration was successful. 

1774 """ 

1775 self._validate_ssh_cert_input_data(kwargs.get("data", {})) 

1776 telemetry_context = self._build_telemetry_context( 

1777 self.ACQUIRE_TOKEN_BY_REFRESH_TOKEN, 

1778 refresh_reason=msal.telemetry.FORCE_REFRESH) 

1779 response = _clean_up(self.client.obtain_token_by_refresh_token( 

1780 refresh_token, 

1781 scope=self._decorate_scope(scopes), 

1782 headers=telemetry_context.generate_headers(), 

1783 rt_getter=lambda rt: rt, 

1784 on_updating_rt=False, 

1785 on_removing_rt=lambda rt_item: None, # No OP 

1786 **kwargs)) 

1787 if "access_token" in response: 

1788 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

1789 telemetry_context.update_telemetry(response) 

1790 return response 

1791 

1792 def acquire_token_by_username_password( 

1793 self, username, password, scopes, claims_challenge=None, 

1794 # Note: We shouldn't need to surface enable_msa_passthrough, 

1795 # because this ROPC won't work with MSA account anyway. 

1796 auth_scheme=None, 

1797 **kwargs): 

1798 """Gets a token for a given resource via user credentials. 

1799 

1800 See this page for constraints of Username Password Flow. 

1801 https://github.com/AzureAD/microsoft-authentication-library-for-python/wiki/Username-Password-Authentication 

1802 

1803 :param str username: Typically a UPN in the form of an email address. 

1804 :param str password: The password. 

1805 :param list[str] scopes: 

1806 Scopes requested to access a protected API (a resource). 

1807 :param claims_challenge: 

1808 The claims_challenge parameter requests specific claims requested by the resource provider 

1809 in the form of a claims_challenge directive in the www-authenticate header to be 

1810 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

1811 It is a string of a JSON object which contains lists of claims being requested from these locations. 

1812 

1813 :param object auth_scheme: 

1814 You can provide an ``msal.auth_scheme.PopAuthScheme`` object 

1815 so that MSAL will get a Proof-of-Possession (POP) token for you. 

1816 

1817 New in version 1.26.0. 

1818 

1819 :return: A dict representing the json response from Microsoft Entra: 

1820 

1821 - A successful response would contain "access_token" key, 

1822 - an error response would contain "error" and usually "error_description". 

1823 """ 

1824 claims = _merge_claims_challenge_and_capabilities( 

1825 self._client_capabilities, claims_challenge) 

1826 if self._enable_broker: 

1827 from .broker import _signin_silently 

1828 response = _signin_silently( 

1829 "https://{}/{}".format(self.authority.instance, self.authority.tenant), 

1830 self.client_id, 

1831 scopes, # Decorated scopes won't work due to offline_access 

1832 MSALRuntime_Username=username, 

1833 MSALRuntime_Password=password, 

1834 validateAuthority="no" if ( 

1835 self.authority._is_known_to_developer 

1836 or self._instance_discovery is False) else None, 

1837 claims=claims, 

1838 auth_scheme=auth_scheme, 

1839 ) 

1840 return self._process_broker_response(response, scopes, kwargs.get("data", {})) 

1841 

1842 if auth_scheme: 

1843 raise ValueError(self._AUTH_SCHEME_UNSUPPORTED) 

1844 scopes = self._decorate_scope(scopes) 

1845 telemetry_context = self._build_telemetry_context( 

1846 self.ACQUIRE_TOKEN_BY_USERNAME_PASSWORD_ID) 

1847 headers = telemetry_context.generate_headers() 

1848 data = dict(kwargs.pop("data", {}), claims=claims) 

1849 response = None 

1850 if not self.authority.is_adfs: 

1851 user_realm_result = self.authority.user_realm_discovery( 

1852 username, correlation_id=headers[msal.telemetry.CLIENT_REQUEST_ID]) 

1853 if user_realm_result.get("account_type") == "Federated": 

1854 response = _clean_up(self._acquire_token_by_username_password_federated( 

1855 user_realm_result, username, password, scopes=scopes, 

1856 data=data, 

1857 headers=headers, **kwargs)) 

1858 if response is None: # Either ADFS or not federated 

1859 response = _clean_up(self.client.obtain_token_by_username_password( 

1860 username, password, scope=scopes, 

1861 headers=headers, 

1862 data=data, 

1863 **kwargs)) 

1864 if "access_token" in response: 

1865 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

1866 telemetry_context.update_telemetry(response) 

1867 return response 

1868 

1869 def _acquire_token_by_username_password_federated( 

1870 self, user_realm_result, username, password, scopes=None, **kwargs): 

1871 wstrust_endpoint = {} 

1872 if user_realm_result.get("federation_metadata_url"): 

1873 wstrust_endpoint = mex_send_request( 

1874 user_realm_result["federation_metadata_url"], 

1875 self.http_client) 

1876 if wstrust_endpoint is None: 

1877 raise ValueError("Unable to find wstrust endpoint from MEX. " 

1878 "This typically happens when attempting MSA accounts. " 

1879 "More details available here. " 

1880 "https://github.com/AzureAD/microsoft-authentication-library-for-python/wiki/Username-Password-Authentication") 

1881 logger.debug("wstrust_endpoint = %s", wstrust_endpoint) 

1882 wstrust_result = wst_send_request( 

1883 username, password, 

1884 user_realm_result.get("cloud_audience_urn", "urn:federation:MicrosoftOnline"), 

1885 wstrust_endpoint.get("address", 

1886 # Fallback to an AAD supplied endpoint 

1887 user_realm_result.get("federation_active_auth_url")), 

1888 wstrust_endpoint.get("action"), self.http_client) 

1889 if not ("token" in wstrust_result and "type" in wstrust_result): 

1890 raise RuntimeError("Unsuccessful RSTR. %s" % wstrust_result) 

1891 GRANT_TYPE_SAML1_1 = 'urn:ietf:params:oauth:grant-type:saml1_1-bearer' 

1892 grant_type = { 

1893 SAML_TOKEN_TYPE_V1: GRANT_TYPE_SAML1_1, 

1894 SAML_TOKEN_TYPE_V2: self.client.GRANT_TYPE_SAML2, 

1895 WSS_SAML_TOKEN_PROFILE_V1_1: GRANT_TYPE_SAML1_1, 

1896 WSS_SAML_TOKEN_PROFILE_V2: self.client.GRANT_TYPE_SAML2 

1897 }.get(wstrust_result.get("type")) 

1898 if not grant_type: 

1899 raise RuntimeError( 

1900 "RSTR returned unknown token type: %s", wstrust_result.get("type")) 

1901 self.client.grant_assertion_encoders.setdefault( # Register a non-standard type 

1902 grant_type, self.client.encode_saml_assertion) 

1903 return self.client.obtain_token_by_assertion( 

1904 wstrust_result["token"], grant_type, scope=scopes, 

1905 on_obtaining_tokens=lambda event: self.token_cache.add(dict( 

1906 event, 

1907 environment=self.authority.instance, 

1908 username=username, # Useful in case IDT contains no such info 

1909 )), 

1910 **kwargs) 

1911 

1912 

1913class PublicClientApplication(ClientApplication): # browser app or mobile app 

1914 

1915 DEVICE_FLOW_CORRELATION_ID = "_correlation_id" 

1916 CONSOLE_WINDOW_HANDLE = object() 

1917 

1918 def __init__(self, client_id, client_credential=None, **kwargs): 

1919 """Same as :func:`ClientApplication.__init__`, 

1920 except that ``client_credential`` parameter shall remain ``None``. 

1921 

1922 .. note:: 

1923 

1924 You may set enable_broker_on_windows and/or enable_broker_on_mac to True. 

1925 

1926 **What is a broker, and why use it?** 

1927 

1928 A broker is a component installed on your device. 

1929 Broker implicitly gives your device an identity. By using a broker, 

1930 your device becomes a factor that can satisfy MFA (Multi-factor authentication). 

1931 This factor would become mandatory 

1932 if a tenant's admin enables a corresponding Conditional Access (CA) policy. 

1933 The broker's presence allows Microsoft identity platform 

1934 to have higher confidence that the tokens are being issued to your device, 

1935 and that is more secure. 

1936 

1937 An additional benefit of broker is, 

1938 it runs as a long-lived process with your device's OS, 

1939 and maintains its own cache, 

1940 so that your broker-enabled apps (even a CLI) 

1941 could automatically SSO from a previously established signed-in session. 

1942 

1943 **You shall only enable broker when your app:** 

1944 

1945 1. is running on supported platforms, 

1946 and already registered their corresponding redirect_uri 

1947 

1948 * ``ms-appx-web://Microsoft.AAD.BrokerPlugin/your_client_id`` 

1949 if your app is expected to run on Windows 10+ 

1950 * ``msauth.com.msauth.unsignedapp://auth`` 

1951 if your app is expected to run on Mac 

1952 

1953 2. installed broker dependency, 

1954 e.g. ``pip install msal[broker]>=1.31,<2``. 

1955 

1956 3. tested with ``acquire_token_interactive()`` and ``acquire_token_silent()``. 

1957 

1958 **The fallback behaviors of MSAL Python's broker support** 

1959 

1960 MSAL will either error out, or silently fallback to non-broker flows. 

1961 

1962 1. MSAL will ignore the `enable_broker_...` and bypass broker 

1963 on those auth flows that are known to be NOT supported by broker. 

1964 This includes ADFS, B2C, etc.. 

1965 For other "could-use-broker" scenarios, please see below. 

1966 2. MSAL errors out when app developer opted-in to use broker 

1967 but a direct dependency "mid-tier" package is not installed. 

1968 Error message guides app developer to declare the correct dependency 

1969 ``msal[broker]``. 

1970 We error out here because the error is actionable to app developers. 

1971 3. MSAL silently "deactivates" the broker and fallback to non-broker, 

1972 when opted-in, dependency installed yet failed to initialize. 

1973 We anticipate this would happen on a device whose OS is too old 

1974 or the underlying broker component is somehow unavailable. 

1975 There is not much an app developer or the end user can do here. 

1976 Eventually, the conditional access policy shall 

1977 force the user to switch to a different device. 

1978 4. MSAL errors out when broker is opted in, installed, initialized, 

1979 but subsequent token request(s) failed. 

1980 

1981 :param boolean enable_broker_on_windows: 

1982 This setting is only effective if your app is running on Windows 10+. 

1983 This parameter defaults to None, which means MSAL will not utilize a broker. 

1984 

1985 New in MSAL Python 1.25.0. 

1986 

1987 :param boolean enable_broker_on_mac: 

1988 This setting is only effective if your app is running on Mac. 

1989 This parameter defaults to None, which means MSAL will not utilize a broker. 

1990 

1991 New in MSAL Python 1.31.0. 

1992 """ 

1993 if client_credential is not None: 

1994 raise ValueError("Public Client should not possess credentials") 

1995 # Using kwargs notation for now. We will switch to keyword-only arguments. 

1996 enable_broker_on_windows = kwargs.pop("enable_broker_on_windows", False) 

1997 enable_broker_on_mac = kwargs.pop("enable_broker_on_mac", False) 

1998 self._enable_broker = bool( 

1999 enable_broker_on_windows and sys.platform == "win32" 

2000 or enable_broker_on_mac and sys.platform == "darwin") 

2001 super(PublicClientApplication, self).__init__( 

2002 client_id, client_credential=None, **kwargs) 

2003 

2004 def acquire_token_interactive( 

2005 self, 

2006 scopes, # type: list[str] 

2007 prompt=None, 

2008 login_hint=None, # type: Optional[str] 

2009 domain_hint=None, # type: Optional[str] 

2010 claims_challenge=None, 

2011 timeout=None, 

2012 port=None, 

2013 extra_scopes_to_consent=None, 

2014 max_age=None, 

2015 parent_window_handle=None, 

2016 on_before_launching_ui=None, 

2017 auth_scheme=None, 

2018 **kwargs): 

2019 """Acquire token interactively i.e. via a local browser. 

2020 

2021 Prerequisite: In Azure Portal, configure the Redirect URI of your 

2022 "Mobile and Desktop application" as ``http://localhost``. 

2023 If you opts in to use broker during ``PublicClientApplication`` creation, 

2024 your app also need this Redirect URI: 

2025 ``ms-appx-web://Microsoft.AAD.BrokerPlugin/YOUR_CLIENT_ID`` 

2026 

2027 :param list scopes: 

2028 It is a list of case-sensitive strings. 

2029 :param str prompt: 

2030 By default, no prompt value will be sent, not even string ``"none"``. 

2031 You will have to specify a value explicitly. 

2032 Its valid values are the constants defined in 

2033 :class:`Prompt <msal.Prompt>`. 

2034 :param str login_hint: 

2035 Optional. Identifier of the user. Generally a User Principal Name (UPN). 

2036 :param domain_hint: 

2037 Can be one of "consumers" or "organizations" or your tenant domain "contoso.com". 

2038 If included, it will skip the email-based discovery process that user goes 

2039 through on the sign-in page, leading to a slightly more streamlined user experience. 

2040 More information on possible values available in 

2041 `Auth Code Flow doc <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-auth-code-flow#request-an-authorization-code>`_ and 

2042 `domain_hint doc <https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-oapx/86fb452d-e34a-494e-ac61-e526e263b6d8>`_. 

2043 

2044 :param claims_challenge: 

2045 The claims_challenge parameter requests specific claims requested by the resource provider 

2046 in the form of a claims_challenge directive in the www-authenticate header to be 

2047 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

2048 It is a string of a JSON object which contains lists of claims being requested from these locations. 

2049 

2050 :param int timeout: 

2051 This method will block the current thread. 

2052 This parameter specifies the timeout value in seconds. 

2053 Default value ``None`` means wait indefinitely. 

2054 

2055 :param int port: 

2056 The port to be used to listen to an incoming auth response. 

2057 By default we will use a system-allocated port. 

2058 (The rest of the redirect_uri is hard coded as ``http://localhost``.) 

2059 

2060 :param list extra_scopes_to_consent: 

2061 "Extra scopes to consent" is a concept only available in Microsoft Entra. 

2062 It refers to other resources you might want to prompt to consent for, 

2063 in the same interaction, but for which you won't get back a 

2064 token for in this particular operation. 

2065 

2066 :param int max_age: 

2067 OPTIONAL. Maximum Authentication Age. 

2068 Specifies the allowable elapsed time in seconds 

2069 since the last time the End-User was actively authenticated. 

2070 If the elapsed time is greater than this value, 

2071 Microsoft identity platform will actively re-authenticate the End-User. 

2072 

2073 MSAL Python will also automatically validate the auth_time in ID token. 

2074 

2075 New in version 1.15. 

2076 

2077 :param int parent_window_handle: 

2078 OPTIONAL. 

2079 

2080 * If your app does not opt in to use broker, 

2081 you do not need to provide a ``parent_window_handle`` here. 

2082 

2083 * If your app opts in to use broker, 

2084 ``parent_window_handle`` is required. 

2085 

2086 - If your app is a GUI app running on Windows or Mac system, 

2087 you are required to also provide its window handle, 

2088 so that the sign-in window will pop up on top of your window. 

2089 - If your app is a console app running on Windows or Mac system, 

2090 you can use a placeholder 

2091 ``PublicClientApplication.CONSOLE_WINDOW_HANDLE``. 

2092 

2093 Most Python scripts are console apps. 

2094 

2095 New in version 1.20.0. 

2096 

2097 :param function on_before_launching_ui: 

2098 A callback with the form of 

2099 ``lambda ui="xyz", **kwargs: print("A {} will be launched".format(ui))``, 

2100 where ``ui`` will be either "browser" or "broker". 

2101 You can use it to inform your end user to expect a pop-up window. 

2102 

2103 New in version 1.20.0. 

2104 

2105 :param object auth_scheme: 

2106 You can provide an ``msal.auth_scheme.PopAuthScheme`` object 

2107 so that MSAL will get a Proof-of-Possession (POP) token for you. 

2108 

2109 New in version 1.26.0. 

2110 

2111 :return: 

2112 - A dict containing no "error" key, 

2113 and typically contains an "access_token" key. 

2114 - A dict containing an "error" key, when token refresh failed. 

2115 """ 

2116 data = kwargs.pop("data", {}) 

2117 enable_msa_passthrough = kwargs.pop( # MUST remove it from kwargs 

2118 "enable_msa_passthrough", # Keep it as a hidden param, for now. 

2119 # OPTIONAL. MSA-Passthrough is a legacy configuration, 

2120 # needed by a small amount of Microsoft first-party apps, 

2121 # which would login MSA accounts via ".../organizations" authority. 

2122 # If you app belongs to this category, AND you are enabling broker, 

2123 # you would want to enable this flag. Default value is False. 

2124 # More background of MSA-PT is available from this internal docs: 

2125 # https://microsoft.sharepoint.com/:w:/t/Identity-DevEx/EatIUauX3c9Ctw1l7AQ6iM8B5CeBZxc58eoQCE0IuZ0VFw?e=tgc3jP&CID=39c853be-76ea-79d7-ee73-f1b2706ede05 

2126 False 

2127 ) and data.get("token_type") != "ssh-cert" # Work around a known issue as of PyMsalRuntime 0.8 

2128 self._validate_ssh_cert_input_data(data) 

2129 if not on_before_launching_ui: 

2130 on_before_launching_ui = lambda **kwargs: None 

2131 if _is_running_in_cloud_shell() and prompt == "none": 

2132 # Note: _acquire_token_by_cloud_shell() is always silent, 

2133 # so we would not fire on_before_launching_ui() 

2134 return self._acquire_token_by_cloud_shell(scopes, data=data) 

2135 claims = _merge_claims_challenge_and_capabilities( 

2136 self._client_capabilities, claims_challenge) 

2137 if self._enable_broker: 

2138 if parent_window_handle is None: 

2139 raise ValueError( 

2140 "parent_window_handle is required when you opted into using broker. " 

2141 "You need to provide the window handle of your GUI application, " 

2142 "or use msal.PublicClientApplication.CONSOLE_WINDOW_HANDLE " 

2143 "when and only when your application is a console app.") 

2144 if extra_scopes_to_consent: 

2145 logger.warning( 

2146 "Ignoring parameter extra_scopes_to_consent, " 

2147 "which is not supported by broker") 

2148 response = self._acquire_token_interactive_via_broker( 

2149 scopes, 

2150 parent_window_handle, 

2151 enable_msa_passthrough, 

2152 claims, 

2153 data, 

2154 on_before_launching_ui, 

2155 auth_scheme, 

2156 prompt=prompt, 

2157 login_hint=login_hint, 

2158 max_age=max_age, 

2159 ) 

2160 return self._process_broker_response(response, scopes, data) 

2161 

2162 if auth_scheme: 

2163 raise ValueError(self._AUTH_SCHEME_UNSUPPORTED) 

2164 on_before_launching_ui(ui="browser") 

2165 telemetry_context = self._build_telemetry_context( 

2166 self.ACQUIRE_TOKEN_INTERACTIVE) 

2167 response = _clean_up(self.client.obtain_token_by_browser( 

2168 scope=self._decorate_scope(scopes) if scopes else None, 

2169 extra_scope_to_consent=extra_scopes_to_consent, 

2170 redirect_uri="http://localhost:{port}".format( 

2171 # Hardcode the host, for now. AAD portal rejects 127.0.0.1 anyway 

2172 port=port or 0), 

2173 prompt=prompt, 

2174 login_hint=login_hint, 

2175 max_age=max_age, 

2176 timeout=timeout, 

2177 auth_params={ 

2178 "claims": claims, 

2179 "domain_hint": domain_hint, 

2180 }, 

2181 data=dict(data, claims=claims), 

2182 headers=telemetry_context.generate_headers(), 

2183 browser_name=_preferred_browser(), 

2184 **kwargs)) 

2185 if "access_token" in response: 

2186 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

2187 telemetry_context.update_telemetry(response) 

2188 return response 

2189 

2190 def _acquire_token_interactive_via_broker( 

2191 self, 

2192 scopes, # type: list[str] 

2193 parent_window_handle, # type: int 

2194 enable_msa_passthrough, # type: boolean 

2195 claims, # type: str 

2196 data, # type: dict 

2197 on_before_launching_ui, # type: callable 

2198 auth_scheme, # type: object 

2199 prompt=None, 

2200 login_hint=None, # type: Optional[str] 

2201 max_age=None, 

2202 **kwargs): 

2203 from .broker import _signin_interactively, _signin_silently, _acquire_token_silently 

2204 if "welcome_template" in kwargs: 

2205 logger.debug(kwargs["welcome_template"]) # Experimental 

2206 authority = "https://{}/{}".format( 

2207 self.authority.instance, self.authority.tenant) 

2208 validate_authority = "no" if ( 

2209 self.authority._is_known_to_developer 

2210 or self._instance_discovery is False) else None 

2211 # Calls different broker methods to mimic the OIDC behaviors 

2212 if login_hint and prompt != "select_account": # OIDC prompts when the user did not sign in 

2213 accounts = self.get_accounts(username=login_hint) 

2214 if len(accounts) == 1: # Unambiguously proceed with this account 

2215 logger.debug("Calling broker._acquire_token_silently()") 

2216 response = _acquire_token_silently( # When it works, it bypasses prompt 

2217 authority, 

2218 self.client_id, 

2219 accounts[0]["local_account_id"], 

2220 scopes, 

2221 claims=claims, 

2222 auth_scheme=auth_scheme, 

2223 **data) 

2224 if response and "error" not in response: 

2225 return response 

2226 # login_hint undecisive or not exists 

2227 if prompt == "none" or not prompt: # Must/Can attempt _signin_silently() 

2228 logger.debug("Calling broker._signin_silently()") 

2229 response = _signin_silently( # Unlike OIDC, it doesn't honor login_hint 

2230 authority, self.client_id, scopes, 

2231 validateAuthority=validate_authority, 

2232 claims=claims, 

2233 max_age=max_age, 

2234 enable_msa_pt=enable_msa_passthrough, 

2235 auth_scheme=auth_scheme, 

2236 **data) 

2237 is_wrong_account = bool( 

2238 # _signin_silently() only gets tokens for default account, 

2239 # but this seems to have been fixed in PyMsalRuntime 0.11.2 

2240 "access_token" in response and login_hint 

2241 and response.get("id_token_claims", {}) != login_hint) 

2242 wrong_account_error_message = ( 

2243 'prompt="none" will not work for login_hint="non-default-user"') 

2244 if is_wrong_account: 

2245 logger.debug(wrong_account_error_message) 

2246 if prompt == "none": 

2247 return response if not is_wrong_account else { 

2248 "error": "broker_error", 

2249 "error_description": wrong_account_error_message, 

2250 } 

2251 else: 

2252 assert bool(prompt) is False 

2253 from pymsalruntime import Response_Status 

2254 recoverable_errors = frozenset([ 

2255 Response_Status.Status_AccountUnusable, 

2256 Response_Status.Status_InteractionRequired, 

2257 ]) 

2258 if is_wrong_account or "error" in response and response.get( 

2259 "_broker_status") in recoverable_errors: 

2260 pass # It will fall back to the _signin_interactively() 

2261 else: 

2262 return response 

2263 

2264 logger.debug("Falls back to broker._signin_interactively()") 

2265 on_before_launching_ui(ui="broker") 

2266 return _signin_interactively( 

2267 authority, self.client_id, scopes, 

2268 None if parent_window_handle is self.CONSOLE_WINDOW_HANDLE 

2269 else parent_window_handle, 

2270 validateAuthority=validate_authority, 

2271 login_hint=login_hint, 

2272 prompt=prompt, 

2273 claims=claims, 

2274 max_age=max_age, 

2275 enable_msa_pt=enable_msa_passthrough, 

2276 auth_scheme=auth_scheme, 

2277 **data) 

2278 

2279 def initiate_device_flow(self, scopes=None, **kwargs): 

2280 """Initiate a Device Flow instance, 

2281 which will be used in :func:`~acquire_token_by_device_flow`. 

2282 

2283 :param list[str] scopes: 

2284 Scopes requested to access a protected API (a resource). 

2285 :return: A dict representing a newly created Device Flow object. 

2286 

2287 - A successful response would contain "user_code" key, among others 

2288 - an error response would contain some other readable key/value pairs. 

2289 """ 

2290 correlation_id = msal.telemetry._get_new_correlation_id() 

2291 flow = self.client.initiate_device_flow( 

2292 scope=self._decorate_scope(scopes or []), 

2293 headers={msal.telemetry.CLIENT_REQUEST_ID: correlation_id}, 

2294 **kwargs) 

2295 flow[self.DEVICE_FLOW_CORRELATION_ID] = correlation_id 

2296 return flow 

2297 

2298 def acquire_token_by_device_flow(self, flow, claims_challenge=None, **kwargs): 

2299 """Obtain token by a device flow object, with customizable polling effect. 

2300 

2301 :param dict flow: 

2302 A dict previously generated by :func:`~initiate_device_flow`. 

2303 By default, this method's polling effect will block current thread. 

2304 You can abort the polling loop at any time, 

2305 by changing the value of the flow's "expires_at" key to 0. 

2306 :param claims_challenge: 

2307 The claims_challenge parameter requests specific claims requested by the resource provider 

2308 in the form of a claims_challenge directive in the www-authenticate header to be 

2309 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

2310 It is a string of a JSON object which contains lists of claims being requested from these locations. 

2311 

2312 :return: A dict representing the json response from Microsoft Entra: 

2313 

2314 - A successful response would contain "access_token" key, 

2315 - an error response would contain "error" and usually "error_description". 

2316 """ 

2317 telemetry_context = self._build_telemetry_context( 

2318 self.ACQUIRE_TOKEN_BY_DEVICE_FLOW_ID, 

2319 correlation_id=flow.get(self.DEVICE_FLOW_CORRELATION_ID)) 

2320 response = _clean_up(self.client.obtain_token_by_device_flow( 

2321 flow, 

2322 data=dict( 

2323 kwargs.pop("data", {}), 

2324 code=flow["device_code"], # 2018-10-4 Hack: 

2325 # during transition period, 

2326 # service seemingly need both device_code and code parameter. 

2327 claims=_merge_claims_challenge_and_capabilities( 

2328 self._client_capabilities, claims_challenge), 

2329 ), 

2330 headers=telemetry_context.generate_headers(), 

2331 **kwargs)) 

2332 if "access_token" in response: 

2333 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

2334 telemetry_context.update_telemetry(response) 

2335 return response 

2336 

2337 

2338class ConfidentialClientApplication(ClientApplication): # server-side web app 

2339 """Same as :func:`ClientApplication.__init__`, 

2340 except that ``allow_broker`` parameter shall remain ``None``. 

2341 """ 

2342 

2343 def acquire_token_for_client(self, scopes, claims_challenge=None, **kwargs): 

2344 """Acquires token for the current confidential client, not for an end user. 

2345 

2346 Since MSAL Python 1.23, it will automatically look for token from cache, 

2347 and only send request to Identity Provider when cache misses. 

2348 

2349 :param list[str] scopes: (Required) 

2350 Scopes requested to access a protected API (a resource). 

2351 :param claims_challenge: 

2352 The claims_challenge parameter requests specific claims requested by the resource provider 

2353 in the form of a claims_challenge directive in the www-authenticate header to be 

2354 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

2355 It is a string of a JSON object which contains lists of claims being requested from these locations. 

2356 

2357 :return: A dict representing the json response from Microsoft Entra: 

2358 

2359 - A successful response would contain "access_token" key, 

2360 - an error response would contain "error" and usually "error_description". 

2361 """ 

2362 if kwargs.get("force_refresh"): 

2363 raise ValueError( # We choose to disallow force_refresh 

2364 "Historically, this method does not support force_refresh behavior. " 

2365 ) 

2366 return _clean_up(self._acquire_token_silent_with_error( 

2367 scopes, None, claims_challenge=claims_challenge, **kwargs)) 

2368 

2369 def _acquire_token_for_client( 

2370 self, 

2371 scopes, 

2372 refresh_reason, 

2373 claims_challenge=None, 

2374 **kwargs 

2375 ): 

2376 if self.authority.tenant.lower() in ["common", "organizations"]: 

2377 warnings.warn( 

2378 "Using /common or /organizations authority " 

2379 "in acquire_token_for_client() is unreliable. " 

2380 "Please use a specific tenant instead.", DeprecationWarning) 

2381 self._validate_ssh_cert_input_data(kwargs.get("data", {})) 

2382 telemetry_context = self._build_telemetry_context( 

2383 self.ACQUIRE_TOKEN_FOR_CLIENT_ID, refresh_reason=refresh_reason) 

2384 client = self._regional_client or self.client 

2385 response = client.obtain_token_for_client( 

2386 scope=scopes, # This grant flow requires no scope decoration 

2387 headers=telemetry_context.generate_headers(), 

2388 data=dict( 

2389 kwargs.pop("data", {}), 

2390 claims=_merge_claims_challenge_and_capabilities( 

2391 self._client_capabilities, claims_challenge)), 

2392 **kwargs) 

2393 telemetry_context.update_telemetry(response) 

2394 return response 

2395 

2396 def remove_tokens_for_client(self): 

2397 """Remove all tokens that were previously acquired via 

2398 :func:`~acquire_token_for_client()` for the current client.""" 

2399 for env in [self.authority.instance] + self._get_authority_aliases( 

2400 self.authority.instance): 

2401 for at in list(self.token_cache.search( # Remove ATs from a snapshot 

2402 TokenCache.CredentialType.ACCESS_TOKEN, query={ 

2403 "client_id": self.client_id, 

2404 "environment": env, 

2405 "home_account_id": None, # These are mostly app-only tokens 

2406 })): 

2407 self.token_cache.remove_at(at) 

2408 # acquire_token_for_client() obtains no RTs, so we have no RT to remove 

2409 

2410 def acquire_token_on_behalf_of(self, user_assertion, scopes, claims_challenge=None, **kwargs): 

2411 """Acquires token using on-behalf-of (OBO) flow. 

2412 

2413 The current app is a middle-tier service which was called with a token 

2414 representing an end user. 

2415 The current app can use such token (a.k.a. a user assertion) to request 

2416 another token to access downstream web API, on behalf of that user. 

2417 See `detail docs here <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-on-behalf-of-flow>`_ . 

2418 

2419 The current middle-tier app has no user interaction to obtain consent. 

2420 See how to gain consent upfront for your middle-tier app from this article. 

2421 https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-on-behalf-of-flow#gaining-consent-for-the-middle-tier-application 

2422 

2423 :param str user_assertion: The incoming token already received by this app 

2424 :param list[str] scopes: Scopes required by downstream API (a resource). 

2425 :param claims_challenge: 

2426 The claims_challenge parameter requests specific claims requested by the resource provider 

2427 in the form of a claims_challenge directive in the www-authenticate header to be 

2428 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

2429 It is a string of a JSON object which contains lists of claims being requested from these locations. 

2430 

2431 :return: A dict representing the json response from Microsoft Entra: 

2432 

2433 - A successful response would contain "access_token" key, 

2434 - an error response would contain "error" and usually "error_description". 

2435 """ 

2436 telemetry_context = self._build_telemetry_context( 

2437 self.ACQUIRE_TOKEN_ON_BEHALF_OF_ID) 

2438 # The implementation is NOT based on Token Exchange (RFC 8693) 

2439 response = _clean_up(self.client.obtain_token_by_assertion( # bases on assertion RFC 7521 

2440 user_assertion, 

2441 self.client.GRANT_TYPE_JWT, # IDTs and AAD ATs are all JWTs 

2442 scope=self._decorate_scope(scopes), # Decoration is used for: 

2443 # 1. Explicitly requesting an RT, without relying on AAD default 

2444 # behavior, even though it currently still issues an RT. 

2445 # 2. Requesting an IDT (which would otherwise be unavailable) 

2446 # so that the calling app could use id_token_claims to implement 

2447 # their own cache mapping, which is likely needed in web apps. 

2448 data=dict( 

2449 kwargs.pop("data", {}), 

2450 requested_token_use="on_behalf_of", 

2451 claims=_merge_claims_challenge_and_capabilities( 

2452 self._client_capabilities, claims_challenge)), 

2453 headers=telemetry_context.generate_headers(), 

2454 # TBD: Expose a login_hint (or ccs_routing_hint) param for web app 

2455 **kwargs)) 

2456 if "access_token" in response: 

2457 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

2458 telemetry_context.update_telemetry(response) 

2459 return response