Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/msal/application.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

633 statements  

1import functools 

2import json 

3import time 

4import logging 

5import sys 

6import warnings 

7from threading import Lock 

8import os 

9 

10from .oauth2cli import Client, JwtAssertionCreator 

11from .oauth2cli.oidc import decode_part 

12from .authority import Authority, WORLD_WIDE 

13from .mex import send_request as mex_send_request 

14from .wstrust_request import send_request as wst_send_request 

15from .wstrust_response import * 

16from .token_cache import TokenCache, _get_username, _GRANT_TYPE_BROKER 

17import msal.telemetry 

18from .region import _detect_region 

19from .throttled_http_client import ThrottledHttpClient 

20from .cloudshell import _is_running_in_cloud_shell 

21 

22 

23# The __init__.py will import this. Not the other way around. 

24__version__ = "1.30.0" # When releasing, also check and bump our dependencies's versions if needed 

25 

26logger = logging.getLogger(__name__) 

27_AUTHORITY_TYPE_CLOUDSHELL = "CLOUDSHELL" 

28 

29def extract_certs(public_cert_content): 

30 # Parses raw public certificate file contents and returns a list of strings 

31 # Usage: headers = {"x5c": extract_certs(open("my_cert.pem").read())} 

32 public_certificates = re.findall( 

33 r'-----BEGIN CERTIFICATE-----(?P<cert_value>[^-]+)-----END CERTIFICATE-----', 

34 public_cert_content, re.I) 

35 if public_certificates: 

36 return [cert.strip() for cert in public_certificates] 

37 # The public cert tags are not found in the input, 

38 # let's make best effort to exclude a private key pem file. 

39 if "PRIVATE KEY" in public_cert_content: 

40 raise ValueError( 

41 "We expect your public key but detect a private key instead") 

42 return [public_cert_content.strip()] 

43 

44 

45def _merge_claims_challenge_and_capabilities(capabilities, claims_challenge): 

46 # Represent capabilities as {"access_token": {"xms_cc": {"values": capabilities}}} 

47 # and then merge/add it into incoming claims 

48 if not capabilities: 

49 return claims_challenge 

50 claims_dict = json.loads(claims_challenge) if claims_challenge else {} 

51 for key in ["access_token"]: # We could add "id_token" if we'd decide to 

52 claims_dict.setdefault(key, {}).update(xms_cc={"values": capabilities}) 

53 return json.dumps(claims_dict) 

54 

55 

56def _str2bytes(raw): 

57 # A conversion based on duck-typing rather than six.text_type 

58 try: 

59 return raw.encode(encoding="utf-8") 

60 except: 

61 return raw 

62 

63 

64def _parse_pfx(pfx_path, passphrase_bytes): 

65 # Cert concepts https://security.stackexchange.com/a/226758/125264 

66 from cryptography.hazmat.primitives import hashes, serialization 

67 from cryptography.hazmat.primitives.serialization import pkcs12 

68 with open(pfx_path, 'rb') as f: 

69 private_key, cert, _ = pkcs12.load_key_and_certificates( # cryptography 2.5+ 

70 # https://cryptography.io/en/latest/hazmat/primitives/asymmetric/serialization/#cryptography.hazmat.primitives.serialization.pkcs12.load_key_and_certificates 

71 f.read(), passphrase_bytes) 

72 if not (private_key and cert): 

73 raise ValueError("Your PFX file shall contain both private key and cert") 

74 cert_pem = cert.public_bytes(encoding=serialization.Encoding.PEM).decode() # cryptography 1.0+ 

75 x5c = [ 

76 '\n'.join(cert_pem.splitlines()[1:-1]) # Strip the "--- header ---" and "--- footer ---" 

77 ] 

78 sha256_thumbprint = cert.fingerprint(hashes.SHA256()).hex() # cryptography 0.7+ 

79 sha1_thumbprint = cert.fingerprint(hashes.SHA1()).hex() # cryptography 0.7+ 

80 # https://cryptography.io/en/latest/x509/reference/#x-509-certificate-object 

81 return private_key, sha256_thumbprint, sha1_thumbprint, x5c 

82 

83 

84def _load_private_key_from_pem_str(private_key_pem_str, passphrase_bytes): 

85 from cryptography.hazmat.primitives import serialization 

86 from cryptography.hazmat.backends import default_backend 

87 return serialization.load_pem_private_key( # cryptography 0.6+ 

88 _str2bytes(private_key_pem_str), 

89 passphrase_bytes, 

90 backend=default_backend(), # It was a required param until 2020 

91 ) 

92 

93 

94def _pii_less_home_account_id(home_account_id): 

95 parts = home_account_id.split(".") # It could contain one or two parts 

96 parts[0] = "********" 

97 return ".".join(parts) 

98 

99 

100def _clean_up(result): 

101 if isinstance(result, dict): 

102 if "_msalruntime_telemetry" in result or "_msal_python_telemetry" in result: 

103 result["msal_telemetry"] = json.dumps({ # Telemetry as an opaque string 

104 "msalruntime_telemetry": result.get("_msalruntime_telemetry"), 

105 "msal_python_telemetry": result.get("_msal_python_telemetry"), 

106 }, separators=(",", ":")) 

107 return_value = { 

108 k: result[k] for k in result 

109 if k != "refresh_in" # MSAL handled refresh_in, customers need not 

110 and not k.startswith('_') # Skim internal properties 

111 } 

112 if "refresh_in" in result: # To encourage proactive refresh 

113 return_value["refresh_on"] = int(time.time() + result["refresh_in"]) 

114 return return_value 

115 return result # It could be None 

116 

117 

118def _preferred_browser(): 

119 """Register Edge and return a name suitable for subsequent webbrowser.get(...) 

120 when appropriate. Otherwise return None. 

121 """ 

122 # On Linux, only Edge will provide device-based Conditional Access support 

123 if sys.platform != "linux": # On other platforms, we have no browser preference 

124 return None 

125 browser_path = "/usr/bin/microsoft-edge" # Use a full path owned by sys admin 

126 # Note: /usr/bin/microsoft-edge, /usr/bin/microsoft-edge-stable, etc. 

127 # are symlinks that point to the actual binaries which are found under 

128 # /opt/microsoft/msedge/msedge or /opt/microsoft/msedge-beta/msedge. 

129 # Either method can be used to detect an Edge installation. 

130 user_has_no_preference = "BROWSER" not in os.environ 

131 user_wont_mind_edge = "microsoft-edge" in os.environ.get("BROWSER", "") # Note: 

132 # BROWSER could contain "microsoft-edge" or "/path/to/microsoft-edge". 

133 # Python documentation (https://docs.python.org/3/library/webbrowser.html) 

134 # does not document the name being implicitly register, 

135 # so there is no public API to know whether the ENV VAR browser would work. 

136 # Therefore, we would not bother examine the env var browser's type. 

137 # We would just register our own Edge instance. 

138 if (user_has_no_preference or user_wont_mind_edge) and os.path.exists(browser_path): 

139 try: 

140 import webbrowser # Lazy import. Some distro may not have this. 

141 browser_name = "msal-edge" # Avoid popular name "microsoft-edge" 

142 # otherwise `BROWSER="microsoft-edge"; webbrowser.get("microsoft-edge")` 

143 # would return a GenericBrowser instance which won't work. 

144 try: 

145 registration_available = isinstance( 

146 webbrowser.get(browser_name), webbrowser.BackgroundBrowser) 

147 except webbrowser.Error: 

148 registration_available = False 

149 if not registration_available: 

150 logger.debug("Register %s with %s", browser_name, browser_path) 

151 # By registering our own browser instance with our own name, 

152 # rather than populating a process-wide BROWSER enn var, 

153 # this approach does not have side effect on non-MSAL code path. 

154 webbrowser.register( # Even double-register happens to work fine 

155 browser_name, None, webbrowser.BackgroundBrowser(browser_path)) 

156 return browser_name 

157 except ImportError: 

158 pass # We may still proceed 

159 return None 

160 

161 

162class _ClientWithCcsRoutingInfo(Client): 

163 

164 def initiate_auth_code_flow(self, **kwargs): 

165 if kwargs.get("login_hint"): # eSTS could have utilized this as-is, but nope 

166 kwargs["X-AnchorMailbox"] = "UPN:%s" % kwargs["login_hint"] 

167 return super(_ClientWithCcsRoutingInfo, self).initiate_auth_code_flow( 

168 client_info=1, # To be used as CSS Routing info 

169 **kwargs) 

170 

171 def obtain_token_by_auth_code_flow( 

172 self, auth_code_flow, auth_response, **kwargs): 

173 # Note: the obtain_token_by_browser() is also covered by this 

174 assert isinstance(auth_code_flow, dict) and isinstance(auth_response, dict) 

175 headers = kwargs.pop("headers", {}) 

176 client_info = json.loads( 

177 decode_part(auth_response["client_info"]) 

178 ) if auth_response.get("client_info") else {} 

179 if "uid" in client_info and "utid" in client_info: 

180 # Note: The value of X-AnchorMailbox is also case-insensitive 

181 headers["X-AnchorMailbox"] = "Oid:{uid}@{utid}".format(**client_info) 

182 return super(_ClientWithCcsRoutingInfo, self).obtain_token_by_auth_code_flow( 

183 auth_code_flow, auth_response, headers=headers, **kwargs) 

184 

185 def obtain_token_by_username_password(self, username, password, **kwargs): 

186 headers = kwargs.pop("headers", {}) 

187 headers["X-AnchorMailbox"] = "upn:{}".format(username) 

188 return super(_ClientWithCcsRoutingInfo, self).obtain_token_by_username_password( 

189 username, password, headers=headers, **kwargs) 

190 

191 

192class ClientApplication(object): 

193 """You do not usually directly use this class. Use its subclasses instead: 

194 :class:`PublicClientApplication` and :class:`ConfidentialClientApplication`. 

195 """ 

196 ACQUIRE_TOKEN_SILENT_ID = "84" 

197 ACQUIRE_TOKEN_BY_REFRESH_TOKEN = "85" 

198 ACQUIRE_TOKEN_BY_USERNAME_PASSWORD_ID = "301" 

199 ACQUIRE_TOKEN_ON_BEHALF_OF_ID = "523" 

200 ACQUIRE_TOKEN_BY_DEVICE_FLOW_ID = "622" 

201 ACQUIRE_TOKEN_FOR_CLIENT_ID = "730" 

202 ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID = "832" 

203 ACQUIRE_TOKEN_INTERACTIVE = "169" 

204 GET_ACCOUNTS_ID = "902" 

205 REMOVE_ACCOUNT_ID = "903" 

206 

207 ATTEMPT_REGION_DISCOVERY = True # "TryAutoDetect" 

208 _TOKEN_SOURCE = "token_source" 

209 _TOKEN_SOURCE_IDP = "identity_provider" 

210 _TOKEN_SOURCE_CACHE = "cache" 

211 _TOKEN_SOURCE_BROKER = "broker" 

212 

213 _enable_broker = False 

214 _AUTH_SCHEME_UNSUPPORTED = ( 

215 "auth_scheme is currently only available from broker. " 

216 "You can enable broker by following these instructions. " 

217 "https://msal-python.readthedocs.io/en/latest/#publicclientapplication") 

218 

219 def __init__( 

220 self, client_id, 

221 client_credential=None, authority=None, validate_authority=True, 

222 token_cache=None, 

223 http_client=None, 

224 verify=True, proxies=None, timeout=None, 

225 client_claims=None, app_name=None, app_version=None, 

226 client_capabilities=None, 

227 azure_region=None, # Note: We choose to add this param in this base class, 

228 # despite it is currently only needed by ConfidentialClientApplication. 

229 # This way, it holds the same positional param place for PCA, 

230 # when we would eventually want to add this feature to PCA in future. 

231 exclude_scopes=None, 

232 http_cache=None, 

233 instance_discovery=None, 

234 allow_broker=None, 

235 enable_pii_log=None, 

236 oidc_authority=None, 

237 ): 

238 """Create an instance of application. 

239 

240 :param str client_id: Your app has a client_id after you register it on Microsoft Entra admin center. 

241 

242 :param client_credential: 

243 For :class:`PublicClientApplication`, you use `None` here. 

244 

245 For :class:`ConfidentialClientApplication`, 

246 it supports many different input formats for different scenarios. 

247 

248 .. admonition:: Support using a client secret. 

249 

250 Just feed in a string, such as ``"your client secret"``. 

251 

252 .. admonition:: Support using a certificate in X.509 (.pem) format 

253 

254 Feed in a dict in this form:: 

255 

256 { 

257 "private_key": "...-----BEGIN PRIVATE KEY-----... in PEM format", 

258 "thumbprint": "A1B2C3D4E5F6...", 

259 "passphrase": "Passphrase if the private_key is encrypted (Optional. Added in version 1.6.0)", 

260 } 

261 

262 MSAL Python requires a "private_key" in PEM format. 

263 If your cert is in PKCS12 (.pfx) format, 

264 you can convert it to X.509 (.pem) format, 

265 by ``openssl pkcs12 -in file.pfx -out file.pem -nodes``. 

266 

267 The thumbprint is available in your app's registration in Azure Portal. 

268 Alternatively, you can `calculate the thumbprint <https://github.com/Azure/azure-sdk-for-python/blob/07d10639d7e47f4852eaeb74aef5d569db499d6e/sdk/identity/azure-identity/azure/identity/_credentials/certificate.py#L94-L97>`_. 

269 

270 .. admonition:: Support Subject Name/Issuer Auth with a cert in .pem 

271 

272 `Subject Name/Issuer Auth 

273 <https://github.com/AzureAD/microsoft-authentication-library-for-python/issues/60>`_ 

274 is an approach to allow easier certificate rotation. 

275 

276 *Added in version 0.5.0*:: 

277 

278 { 

279 "private_key": "...-----BEGIN PRIVATE KEY-----... in PEM format", 

280 "thumbprint": "A1B2C3D4E5F6...", 

281 "public_certificate": "...-----BEGIN CERTIFICATE-----...", 

282 "passphrase": "Passphrase if the private_key is encrypted (Optional. Added in version 1.6.0)", 

283 } 

284 

285 ``public_certificate`` (optional) is public key certificate 

286 which will be sent through 'x5c' JWT header only for 

287 subject name and issuer authentication to support cert auto rolls. 

288 

289 Per `specs <https://tools.ietf.org/html/rfc7515#section-4.1.6>`_, 

290 "the certificate containing 

291 the public key corresponding to the key used to digitally sign the 

292 JWS MUST be the first certificate. This MAY be followed by 

293 additional certificates, with each subsequent certificate being the 

294 one used to certify the previous one." 

295 However, your certificate's issuer may use a different order. 

296 So, if your attempt ends up with an error AADSTS700027 - 

297 "The provided signature value did not match the expected signature value", 

298 you may try use only the leaf cert (in PEM/str format) instead. 

299 

300 .. admonition:: Supporting raw assertion obtained from elsewhere 

301 

302 *Added in version 1.13.0*: 

303 It can also be a completely pre-signed assertion that you've assembled yourself. 

304 Simply pass a container containing only the key "client_assertion", like this:: 

305 

306 { 

307 "client_assertion": "...a JWT with claims aud, exp, iss, jti, nbf, and sub..." 

308 } 

309 

310 .. admonition:: Supporting reading client cerficates from PFX files 

311 

312 *Added in version 1.29.0*: 

313 Feed in a dictionary containing the path to a PFX file:: 

314 

315 { 

316 "private_key_pfx_path": "/path/to/your.pfx", 

317 "passphrase": "Passphrase if the private_key is encrypted (Optional)", 

318 } 

319 

320 The following command will generate a .pfx file from your .key and .pem file:: 

321 

322 openssl pkcs12 -export -out certificate.pfx -inkey privateKey.key -in certificate.pem 

323 

324 .. admonition:: Support Subject Name/Issuer Auth with a cert in .pfx 

325 

326 *Added in version 1.30.0*: 

327 If your .pfx file contains both the private key and public cert, 

328 you can opt in for Subject Name/Issuer Auth like this:: 

329 

330 { 

331 "private_key_pfx_path": "/path/to/your.pfx", 

332 "public_certificate": True, 

333 "passphrase": "Passphrase if the private_key is encrypted (Optional)", 

334 } 

335 

336 :type client_credential: Union[dict, str, None] 

337 

338 :param dict client_claims: 

339 *Added in version 0.5.0*: 

340 It is a dictionary of extra claims that would be signed by 

341 by this :class:`ConfidentialClientApplication` 's private key. 

342 For example, you can use {"client_ip": "x.x.x.x"}. 

343 You may also override any of the following default claims:: 

344 

345 { 

346 "aud": the_token_endpoint, 

347 "iss": self.client_id, 

348 "sub": same_as_issuer, 

349 "exp": now + 10_min, 

350 "iat": now, 

351 "jti": a_random_uuid 

352 } 

353 

354 :param str authority: 

355 A URL that identifies a token authority. It should be of the format 

356 ``https://login.microsoftonline.com/your_tenant`` 

357 By default, we will use ``https://login.microsoftonline.com/common`` 

358 

359 *Changed in version 1.17*: you can also use predefined constant 

360 and a builder like this:: 

361 

362 from msal.authority import ( 

363 AuthorityBuilder, 

364 AZURE_US_GOVERNMENT, AZURE_CHINA, AZURE_PUBLIC) 

365 my_authority = AuthorityBuilder(AZURE_PUBLIC, "contoso.onmicrosoft.com") 

366 # Now you get an equivalent of 

367 # "https://login.microsoftonline.com/contoso.onmicrosoft.com" 

368 

369 # You can feed such an authority to msal's ClientApplication 

370 from msal import PublicClientApplication 

371 app = PublicClientApplication("my_client_id", authority=my_authority, ...) 

372 

373 :param bool validate_authority: (optional) Turns authority validation 

374 on or off. This parameter default to true. 

375 :param TokenCache token_cache: 

376 Sets the token cache used by this ClientApplication instance. 

377 By default, an in-memory cache will be created and used. 

378 :param http_client: (optional) 

379 Your implementation of abstract class HttpClient <msal.oauth2cli.http.http_client> 

380 Defaults to a requests session instance. 

381 Since MSAL 1.11.0, the default session would be configured 

382 to attempt one retry on connection error. 

383 If you are providing your own http_client, 

384 it will be your http_client's duty to decide whether to perform retry. 

385 

386 :param verify: (optional) 

387 It will be passed to the 

388 `verify parameter in the underlying requests library 

389 <http://docs.python-requests.org/en/v2.9.1/user/advanced/#ssl-cert-verification>`_ 

390 This does not apply if you have chosen to pass your own Http client 

391 :param proxies: (optional) 

392 It will be passed to the 

393 `proxies parameter in the underlying requests library 

394 <http://docs.python-requests.org/en/v2.9.1/user/advanced/#proxies>`_ 

395 This does not apply if you have chosen to pass your own Http client 

396 :param timeout: (optional) 

397 It will be passed to the 

398 `timeout parameter in the underlying requests library 

399 <http://docs.python-requests.org/en/v2.9.1/user/advanced/#timeouts>`_ 

400 This does not apply if you have chosen to pass your own Http client 

401 :param app_name: (optional) 

402 You can provide your application name for Microsoft telemetry purposes. 

403 Default value is None, means it will not be passed to Microsoft. 

404 :param app_version: (optional) 

405 You can provide your application version for Microsoft telemetry purposes. 

406 Default value is None, means it will not be passed to Microsoft. 

407 :param list[str] client_capabilities: (optional) 

408 Allows configuration of one or more client capabilities, e.g. ["CP1"]. 

409 

410 Client capability is meant to inform the Microsoft identity platform 

411 (STS) what this client is capable for, 

412 so STS can decide to turn on certain features. 

413 For example, if client is capable to handle *claims challenge*, 

414 STS can then issue CAE access tokens to resources 

415 knowing when the resource emits *claims challenge* 

416 the client will be capable to handle. 

417 

418 Implementation details: 

419 Client capability is implemented using "claims" parameter on the wire, 

420 for now. 

421 MSAL will combine them into 

422 `claims parameter <https://openid.net/specs/openid-connect-core-1_0-final.html#ClaimsParameter>`_ 

423 which you will later provide via one of the acquire-token request. 

424 

425 :param str azure_region: (optional) 

426 Instructs MSAL to use the Entra regional token service. This legacy feature is only available to 

427 first-party applications. Only ``acquire_token_for_client()`` is supported. 

428 

429 Supports 3 values: 

430 

431 ``azure_region=None`` - meaning no region is used. This is the default value. 

432 ``azure_region="some_region"`` - meaning the specified region is used. 

433 ``azure_region=True`` - meaning MSAL will try to auto-detect the region. This is not recommended. 

434 

435 .. note:: 

436 Region auto-discovery has been tested on VMs and on Azure Functions. It is unreliable. 

437 Applications using this option should configure a short timeout. 

438 

439 For more details and for the values of the region string 

440 see https://learn.microsoft.com/entra/msal/dotnet/resources/region-discovery-troubleshooting 

441 

442 New in version 1.12.0. 

443 

444 :param list[str] exclude_scopes: (optional) 

445 Historically MSAL hardcodes `offline_access` scope, 

446 which would allow your app to have prolonged access to user's data. 

447 If that is unnecessary or undesirable for your app, 

448 now you can use this parameter to supply an exclusion list of scopes, 

449 such as ``exclude_scopes = ["offline_access"]``. 

450 

451 :param dict http_cache: 

452 MSAL has long been caching tokens in the ``token_cache``. 

453 Recently, MSAL also introduced a concept of ``http_cache``, 

454 by automatically caching some finite amount of non-token http responses, 

455 so that *long-lived* 

456 ``PublicClientApplication`` and ``ConfidentialClientApplication`` 

457 would be more performant and responsive in some situations. 

458 

459 This ``http_cache`` parameter accepts any dict-like object. 

460 If not provided, MSAL will use an in-memory dict. 

461 

462 If your app is a command-line app (CLI), 

463 you would want to persist your http_cache across different CLI runs. 

464 The following recipe shows a way to do so:: 

465 

466 # Just add the following lines at the beginning of your CLI script 

467 import sys, atexit, pickle 

468 http_cache_filename = sys.argv[0] + ".http_cache" 

469 try: 

470 with open(http_cache_filename, "rb") as f: 

471 persisted_http_cache = pickle.load(f) # Take a snapshot 

472 except ( 

473 FileNotFoundError, # Or IOError in Python 2 

474 pickle.UnpicklingError, # A corrupted http cache file 

475 ): 

476 persisted_http_cache = {} # Recover by starting afresh 

477 atexit.register(lambda: pickle.dump( 

478 # When exit, flush it back to the file. 

479 # It may occasionally overwrite another process's concurrent write, 

480 # but that is fine. Subsequent runs will reach eventual consistency. 

481 persisted_http_cache, open(http_cache_file, "wb"))) 

482 

483 # And then you can implement your app as you normally would 

484 app = msal.PublicClientApplication( 

485 "your_client_id", 

486 ..., 

487 http_cache=persisted_http_cache, # Utilize persisted_http_cache 

488 ..., 

489 #token_cache=..., # You may combine the old token_cache trick 

490 # Please refer to token_cache recipe at 

491 # https://msal-python.readthedocs.io/en/latest/#msal.SerializableTokenCache 

492 ) 

493 app.acquire_token_interactive(["your", "scope"], ...) 

494 

495 Content inside ``http_cache`` are cheap to obtain. 

496 There is no need to share them among different apps. 

497 

498 Content inside ``http_cache`` will contain no tokens nor 

499 Personally Identifiable Information (PII). Encryption is unnecessary. 

500 

501 New in version 1.16.0. 

502 

503 :param boolean instance_discovery: 

504 Historically, MSAL would connect to a central endpoint located at 

505 ``https://login.microsoftonline.com`` to acquire some metadata, 

506 especially when using an unfamiliar authority. 

507 This behavior is known as Instance Discovery. 

508 

509 This parameter defaults to None, which enables the Instance Discovery. 

510 

511 If you know some authorities which you allow MSAL to operate with as-is, 

512 without involving any Instance Discovery, the recommended pattern is:: 

513 

514 known_authorities = frozenset([ # Treat your known authorities as const 

515 "https://contoso.com/adfs", "https://login.azs/foo"]) 

516 ... 

517 authority = "https://contoso.com/adfs" # Assuming your app will use this 

518 app1 = PublicClientApplication( 

519 "client_id", 

520 authority=authority, 

521 # Conditionally disable Instance Discovery for known authorities 

522 instance_discovery=authority not in known_authorities, 

523 ) 

524 

525 If you do not know some authorities beforehand, 

526 yet still want MSAL to accept any authority that you will provide, 

527 you can use a ``False`` to unconditionally disable Instance Discovery. 

528 

529 New in version 1.19.0. 

530 

531 :param boolean allow_broker: 

532 Deprecated. Please use ``enable_broker_on_windows`` instead. 

533 

534 :param boolean enable_pii_log: 

535 When enabled, logs may include PII (Personal Identifiable Information). 

536 This can be useful in troubleshooting broker behaviors. 

537 The default behavior is False. 

538 

539 New in version 1.24.0. 

540 

541 :param str oidc_authority: 

542 *Added in version 1.28.0*: 

543 It is a URL that identifies an OpenID Connect (OIDC) authority of 

544 the format ``https://contoso.com/tenant``. 

545 MSAL will append ".well-known/openid-configuration" to the authority 

546 and retrieve the OIDC metadata from there, to figure out the endpoints. 

547 

548 Note: Broker will NOT be used for OIDC authority. 

549 """ 

550 self.client_id = client_id 

551 self.client_credential = client_credential 

552 self.client_claims = client_claims 

553 self._client_capabilities = client_capabilities 

554 self._instance_discovery = instance_discovery 

555 

556 if exclude_scopes and not isinstance(exclude_scopes, list): 

557 raise ValueError( 

558 "Invalid exclude_scopes={}. It need to be a list of strings.".format( 

559 repr(exclude_scopes))) 

560 self._exclude_scopes = frozenset(exclude_scopes or []) 

561 if "openid" in self._exclude_scopes: 

562 raise ValueError( 

563 'Invalid exclude_scopes={}. You can not opt out "openid" scope'.format( 

564 repr(exclude_scopes))) 

565 

566 if http_client: 

567 self.http_client = http_client 

568 else: 

569 import requests # Lazy load 

570 

571 self.http_client = requests.Session() 

572 self.http_client.verify = verify 

573 self.http_client.proxies = proxies 

574 # Requests, does not support session - wide timeout 

575 # But you can patch that (https://github.com/psf/requests/issues/3341): 

576 self.http_client.request = functools.partial( 

577 self.http_client.request, timeout=timeout) 

578 

579 # Enable a minimal retry. Better than nothing. 

580 # https://github.com/psf/requests/blob/v2.25.1/requests/adapters.py#L94-L108 

581 a = requests.adapters.HTTPAdapter(max_retries=1) 

582 self.http_client.mount("http://", a) 

583 self.http_client.mount("https://", a) 

584 self.http_client = ThrottledHttpClient( 

585 self.http_client, 

586 http_cache=http_cache, 

587 default_throttle_time=60 

588 # The default value 60 was recommended mainly for PCA at the end of 

589 # https://identitydivision.visualstudio.com/devex/_git/AuthLibrariesApiReview?version=GBdev&path=%2FService%20protection%2FIntial%20set%20of%20protection%20measures.md&_a=preview 

590 if isinstance(self, PublicClientApplication) else 5, 

591 ) 

592 

593 self.app_name = app_name 

594 self.app_version = app_version 

595 

596 # Here the self.authority will not be the same type as authority in input 

597 if oidc_authority and authority: 

598 raise ValueError("You can not provide both authority and oidc_authority") 

599 try: 

600 authority_to_use = authority or "https://{}/common/".format(WORLD_WIDE) 

601 self.authority = Authority( 

602 authority_to_use, 

603 self.http_client, 

604 validate_authority=validate_authority, 

605 instance_discovery=self._instance_discovery, 

606 oidc_authority_url=oidc_authority, 

607 ) 

608 except ValueError: # Those are explicit authority validation errors 

609 raise 

610 except Exception: # The rest are typically connection errors 

611 if validate_authority and azure_region and not oidc_authority: 

612 # Since caller opts in to use region, here we tolerate connection 

613 # errors happened during authority validation at non-region endpoint 

614 self.authority = Authority( 

615 authority_to_use, 

616 self.http_client, 

617 instance_discovery=False, 

618 ) 

619 else: 

620 raise 

621 

622 self._decide_broker(allow_broker, enable_pii_log) 

623 self.token_cache = token_cache or TokenCache() 

624 self._region_configured = azure_region 

625 self._region_detected = None 

626 self.client, self._regional_client = self._build_client( 

627 client_credential, self.authority) 

628 self.authority_groups = None 

629 self._telemetry_buffer = {} 

630 self._telemetry_lock = Lock() 

631 

632 def _decide_broker(self, allow_broker, enable_pii_log): 

633 is_confidential_app = self.client_credential or isinstance( 

634 self, ConfidentialClientApplication) 

635 if is_confidential_app and allow_broker: 

636 raise ValueError("allow_broker=True is only supported in PublicClientApplication") 

637 # Historically, we chose to support ClientApplication("client_id", allow_broker=True) 

638 if allow_broker: 

639 warnings.warn( 

640 "allow_broker is deprecated. " 

641 "Please use PublicClientApplication(..., enable_broker_on_windows=True)", 

642 DeprecationWarning) 

643 self._enable_broker = self._enable_broker or ( 

644 # When we started the broker project on Windows platform, 

645 # the allow_broker was meant to be cross-platform. Now we realize 

646 # that other platforms have different redirect_uri requirements, 

647 # so the old allow_broker is deprecated and will only for Windows. 

648 allow_broker and sys.platform == "win32") 

649 if (self._enable_broker and not is_confidential_app 

650 and not self.authority.is_adfs and not self.authority._is_b2c): 

651 try: 

652 from . import broker # Trigger Broker's initialization 

653 if enable_pii_log: 

654 broker._enable_pii_log() 

655 except RuntimeError: 

656 self._enable_broker = False 

657 logger.exception( 

658 "Broker is unavailable on this platform. " 

659 "We will fallback to non-broker.") 

660 logger.debug("Broker enabled? %s", self._enable_broker) 

661 

662 def is_pop_supported(self): 

663 """Returns True if this client supports Proof-of-Possession Access Token.""" 

664 return self._enable_broker 

665 

666 def _decorate_scope( 

667 self, scopes, 

668 reserved_scope=frozenset(['openid', 'profile', 'offline_access'])): 

669 if not isinstance(scopes, (list, set, tuple)): 

670 raise ValueError("The input scopes should be a list, tuple, or set") 

671 scope_set = set(scopes) # Input scopes is typically a list. Copy it to a set. 

672 if scope_set & reserved_scope: 

673 # These scopes are reserved for the API to provide good experience. 

674 # We could make the developer pass these and then if they do they will 

675 # come back asking why they don't see refresh token or user information. 

676 raise ValueError( 

677 """You cannot use any scope value that is reserved. 

678Your input: {} 

679The reserved list: {}""".format(list(scope_set), list(reserved_scope))) 

680 raise ValueError( 

681 "You cannot use any scope value that is in this reserved list: {}".format( 

682 list(reserved_scope))) 

683 

684 # client_id can also be used as a scope in B2C 

685 decorated = scope_set | reserved_scope 

686 decorated -= self._exclude_scopes 

687 return list(decorated) 

688 

689 def _build_telemetry_context( 

690 self, api_id, correlation_id=None, refresh_reason=None): 

691 return msal.telemetry._TelemetryContext( 

692 self._telemetry_buffer, self._telemetry_lock, api_id, 

693 correlation_id=correlation_id, refresh_reason=refresh_reason) 

694 

695 def _get_regional_authority(self, central_authority): 

696 if not self._region_configured: # User did not opt-in to ESTS-R 

697 return None # Short circuit to completely bypass region detection 

698 self._region_detected = self._region_detected or _detect_region( 

699 self.http_client if self._region_configured is not None else None) 

700 if (self._region_configured != self.ATTEMPT_REGION_DISCOVERY 

701 and self._region_configured != self._region_detected): 

702 logger.warning('Region configured ({}) != region detected ({})'.format( 

703 repr(self._region_configured), repr(self._region_detected))) 

704 region_to_use = ( 

705 self._region_detected 

706 if self._region_configured == self.ATTEMPT_REGION_DISCOVERY 

707 else self._region_configured) # It will retain the None i.e. opted out 

708 logger.debug('Region to be used: {}'.format(repr(region_to_use))) 

709 if region_to_use: 

710 regional_host = ("{}.login.microsoft.com".format(region_to_use) 

711 if central_authority.instance in ( 

712 # The list came from point 3 of the algorithm section in this internal doc 

713 # https://identitydivision.visualstudio.com/DevEx/_git/AuthLibrariesApiReview?path=/PinAuthToRegion/AAD%20SDK%20Proposal%20to%20Pin%20Auth%20to%20region.md&anchor=algorithm&_a=preview 

714 "login.microsoftonline.com", 

715 "login.microsoft.com", 

716 "login.windows.net", 

717 "sts.windows.net", 

718 ) 

719 else "{}.{}".format(region_to_use, central_authority.instance)) 

720 return Authority( # The central_authority has already been validated 

721 "https://{}/{}".format(regional_host, central_authority.tenant), 

722 self.http_client, 

723 instance_discovery=False, 

724 ) 

725 return None 

726 

727 def _build_client(self, client_credential, authority, skip_regional_client=False): 

728 client_assertion = None 

729 client_assertion_type = None 

730 default_headers = { 

731 "x-client-sku": "MSAL.Python", "x-client-ver": __version__, 

732 "x-client-os": sys.platform, 

733 "x-ms-lib-capability": "retry-after, h429", 

734 } 

735 if self.app_name: 

736 default_headers['x-app-name'] = self.app_name 

737 if self.app_version: 

738 default_headers['x-app-ver'] = self.app_version 

739 default_body = {"client_info": 1} 

740 if isinstance(client_credential, dict): 

741 client_assertion_type = Client.CLIENT_ASSERTION_TYPE_JWT 

742 # Use client_credential.get("...") rather than "..." in client_credential 

743 # so that we can ignore an empty string came from an empty ENV VAR. 

744 if client_credential.get("client_assertion"): 

745 client_assertion = client_credential['client_assertion'] 

746 else: 

747 headers = {} 

748 sha1_thumbprint = sha256_thumbprint = None 

749 passphrase_bytes = _str2bytes( 

750 client_credential["passphrase"] 

751 ) if client_credential.get("passphrase") else None 

752 if client_credential.get("private_key_pfx_path"): 

753 private_key, sha256_thumbprint, sha1_thumbprint, x5c = _parse_pfx( 

754 client_credential["private_key_pfx_path"], 

755 passphrase_bytes) 

756 if client_credential.get("public_certificate") is True and x5c: 

757 headers["x5c"] = x5c 

758 elif ( 

759 client_credential.get("private_key") # PEM blob 

760 and client_credential.get("thumbprint")): 

761 sha1_thumbprint = client_credential["thumbprint"] 

762 if passphrase_bytes: 

763 private_key = _load_private_key_from_pem_str( 

764 client_credential['private_key'], passphrase_bytes) 

765 else: # PEM without passphrase 

766 private_key = client_credential['private_key'] 

767 else: 

768 raise ValueError( 

769 "client_credential needs to follow this format " 

770 "https://msal-python.readthedocs.io/en/latest/#msal.ClientApplication.params.client_credential") 

771 if ("x5c" not in headers # So the .pfx file contains no certificate 

772 and isinstance(client_credential.get('public_certificate'), str) 

773 ): # Then we treat the public_certificate value as PEM content 

774 headers["x5c"] = extract_certs(client_credential['public_certificate']) 

775 if sha256_thumbprint and not authority.is_adfs: 

776 assertion_params = { 

777 "algorithm": "PS256", "sha256_thumbprint": sha256_thumbprint, 

778 } 

779 else: # Fall back 

780 if not sha1_thumbprint: 

781 raise ValueError("You shall provide a thumbprint in SHA1.") 

782 assertion_params = { 

783 "algorithm": "RS256", "sha1_thumbprint": sha1_thumbprint, 

784 } 

785 assertion = JwtAssertionCreator( 

786 private_key, headers=headers, **assertion_params) 

787 client_assertion = assertion.create_regenerative_assertion( 

788 audience=authority.token_endpoint, issuer=self.client_id, 

789 additional_claims=self.client_claims or {}) 

790 else: 

791 default_body['client_secret'] = client_credential 

792 central_configuration = { 

793 "authorization_endpoint": authority.authorization_endpoint, 

794 "token_endpoint": authority.token_endpoint, 

795 "device_authorization_endpoint": authority.device_authorization_endpoint, 

796 } 

797 central_client = _ClientWithCcsRoutingInfo( 

798 central_configuration, 

799 self.client_id, 

800 http_client=self.http_client, 

801 default_headers=default_headers, 

802 default_body=default_body, 

803 client_assertion=client_assertion, 

804 client_assertion_type=client_assertion_type, 

805 on_obtaining_tokens=lambda event: self.token_cache.add(dict( 

806 event, environment=authority.instance)), 

807 on_removing_rt=self.token_cache.remove_rt, 

808 on_updating_rt=self.token_cache.update_rt) 

809 

810 regional_client = None 

811 if (client_credential # Currently regional endpoint only serves some CCA flows 

812 and not skip_regional_client): 

813 regional_authority = self._get_regional_authority(authority) 

814 if regional_authority: 

815 regional_configuration = { 

816 "authorization_endpoint": regional_authority.authorization_endpoint, 

817 "token_endpoint": regional_authority.token_endpoint, 

818 "device_authorization_endpoint": 

819 regional_authority.device_authorization_endpoint, 

820 } 

821 regional_client = _ClientWithCcsRoutingInfo( 

822 regional_configuration, 

823 self.client_id, 

824 http_client=self.http_client, 

825 default_headers=default_headers, 

826 default_body=default_body, 

827 client_assertion=client_assertion, 

828 client_assertion_type=client_assertion_type, 

829 on_obtaining_tokens=lambda event: self.token_cache.add(dict( 

830 event, environment=authority.instance)), 

831 on_removing_rt=self.token_cache.remove_rt, 

832 on_updating_rt=self.token_cache.update_rt) 

833 return central_client, regional_client 

834 

835 def initiate_auth_code_flow( 

836 self, 

837 scopes, # type: list[str] 

838 redirect_uri=None, 

839 state=None, # Recommended by OAuth2 for CSRF protection 

840 prompt=None, 

841 login_hint=None, # type: Optional[str] 

842 domain_hint=None, # type: Optional[str] 

843 claims_challenge=None, 

844 max_age=None, 

845 response_mode=None, # type: Optional[str] 

846 ): 

847 """Initiate an auth code flow. 

848 

849 Later when the response reaches your redirect_uri, 

850 you can use :func:`~acquire_token_by_auth_code_flow()` 

851 to complete the authentication/authorization. 

852 

853 :param list scopes: 

854 It is a list of case-sensitive strings. 

855 :param str redirect_uri: 

856 Optional. If not specified, server will use the pre-registered one. 

857 :param str state: 

858 An opaque value used by the client to 

859 maintain state between the request and callback. 

860 If absent, this library will automatically generate one internally. 

861 :param str prompt: 

862 By default, no prompt value will be sent, not even string ``"none"``. 

863 You will have to specify a value explicitly. 

864 Its valid values are the constants defined in 

865 :class:`Prompt <msal.Prompt>`. 

866 

867 :param str login_hint: 

868 Optional. Identifier of the user. Generally a User Principal Name (UPN). 

869 :param domain_hint: 

870 Can be one of "consumers" or "organizations" or your tenant domain "contoso.com". 

871 If included, it will skip the email-based discovery process that user goes 

872 through on the sign-in page, leading to a slightly more streamlined user experience. 

873 More information on possible values available in 

874 `Auth Code Flow doc <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-auth-code-flow#request-an-authorization-code>`_ and 

875 `domain_hint doc <https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-oapx/86fb452d-e34a-494e-ac61-e526e263b6d8>`_. 

876 

877 :param int max_age: 

878 OPTIONAL. Maximum Authentication Age. 

879 Specifies the allowable elapsed time in seconds 

880 since the last time the End-User was actively authenticated. 

881 If the elapsed time is greater than this value, 

882 Microsoft identity platform will actively re-authenticate the End-User. 

883 

884 MSAL Python will also automatically validate the auth_time in ID token. 

885 

886 New in version 1.15. 

887 

888 :param str response_mode: 

889 OPTIONAL. Specifies the method with which response parameters should be returned. 

890 The default value is equivalent to ``query``, which is still secure enough in MSAL Python 

891 (because MSAL Python does not transfer tokens via query parameter in the first place). 

892 For even better security, we recommend using the value ``form_post``. 

893 In "form_post" mode, response parameters 

894 will be encoded as HTML form values that are transmitted via the HTTP POST method and 

895 encoded in the body using the application/x-www-form-urlencoded format. 

896 Valid values can be either "form_post" for HTTP POST to callback URI or 

897 "query" (the default) for HTTP GET with parameters encoded in query string. 

898 More information on possible values 

899 `here <https://openid.net/specs/oauth-v2-multiple-response-types-1_0.html#ResponseModes>` 

900 and `here <https://openid.net/specs/oauth-v2-form-post-response-mode-1_0.html#FormPostResponseMode>` 

901 

902 :return: 

903 The auth code flow. It is a dict in this form:: 

904 

905 { 

906 "auth_uri": "https://...", // Guide user to visit this 

907 "state": "...", // You may choose to verify it by yourself, 

908 // or just let acquire_token_by_auth_code_flow() 

909 // do that for you. 

910 "...": "...", // Everything else are reserved and internal 

911 } 

912 

913 The caller is expected to: 

914 

915 1. somehow store this content, typically inside the current session, 

916 2. guide the end user (i.e. resource owner) to visit that auth_uri, 

917 3. and then relay this dict and subsequent auth response to 

918 :func:`~acquire_token_by_auth_code_flow()`. 

919 """ 

920 client = _ClientWithCcsRoutingInfo( 

921 {"authorization_endpoint": self.authority.authorization_endpoint}, 

922 self.client_id, 

923 http_client=self.http_client) 

924 flow = client.initiate_auth_code_flow( 

925 redirect_uri=redirect_uri, state=state, login_hint=login_hint, 

926 prompt=prompt, 

927 scope=self._decorate_scope(scopes), 

928 domain_hint=domain_hint, 

929 claims=_merge_claims_challenge_and_capabilities( 

930 self._client_capabilities, claims_challenge), 

931 max_age=max_age, 

932 response_mode=response_mode, 

933 ) 

934 flow["claims_challenge"] = claims_challenge 

935 return flow 

936 

937 def get_authorization_request_url( 

938 self, 

939 scopes, # type: list[str] 

940 login_hint=None, # type: Optional[str] 

941 state=None, # Recommended by OAuth2 for CSRF protection 

942 redirect_uri=None, 

943 response_type="code", # Could be "token" if you use Implicit Grant 

944 prompt=None, 

945 nonce=None, 

946 domain_hint=None, # type: Optional[str] 

947 claims_challenge=None, 

948 **kwargs): 

949 """Constructs a URL for you to start a Authorization Code Grant. 

950 

951 :param list[str] scopes: (Required) 

952 Scopes requested to access a protected API (a resource). 

953 :param str state: Recommended by OAuth2 for CSRF protection. 

954 :param str login_hint: 

955 Identifier of the user. Generally a User Principal Name (UPN). 

956 :param str redirect_uri: 

957 Address to return to upon receiving a response from the authority. 

958 :param str response_type: 

959 Default value is "code" for an OAuth2 Authorization Code grant. 

960 

961 You could use other content such as "id_token" or "token", 

962 which would trigger an Implicit Grant, but that is 

963 `not recommended <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-implicit-grant-flow#is-the-implicit-grant-suitable-for-my-app>`_. 

964 

965 :param str prompt: 

966 By default, no prompt value will be sent, not even string ``"none"``. 

967 You will have to specify a value explicitly. 

968 Its valid values are the constants defined in 

969 :class:`Prompt <msal.Prompt>`. 

970 :param nonce: 

971 A cryptographically random value used to mitigate replay attacks. See also 

972 `OIDC specs <https://openid.net/specs/openid-connect-core-1_0.html#AuthRequest>`_. 

973 :param domain_hint: 

974 Can be one of "consumers" or "organizations" or your tenant domain "contoso.com". 

975 If included, it will skip the email-based discovery process that user goes 

976 through on the sign-in page, leading to a slightly more streamlined user experience. 

977 More information on possible values available in 

978 `Auth Code Flow doc <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-auth-code-flow#request-an-authorization-code>`_ and 

979 `domain_hint doc <https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-oapx/86fb452d-e34a-494e-ac61-e526e263b6d8>`_. 

980 :param claims_challenge: 

981 The claims_challenge parameter requests specific claims requested by the resource provider 

982 in the form of a claims_challenge directive in the www-authenticate header to be 

983 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

984 It is a string of a JSON object which contains lists of claims being requested from these locations. 

985 

986 :return: The authorization url as a string. 

987 """ 

988 authority = kwargs.pop("authority", None) # Historically we support this 

989 if authority: 

990 warnings.warn( 

991 "We haven't decided if this method will accept authority parameter") 

992 # The previous implementation is, it will use self.authority by default. 

993 # Multi-tenant app can use new authority on demand 

994 the_authority = Authority( 

995 authority, 

996 self.http_client, 

997 instance_discovery=self._instance_discovery, 

998 ) if authority else self.authority 

999 

1000 client = _ClientWithCcsRoutingInfo( 

1001 {"authorization_endpoint": the_authority.authorization_endpoint}, 

1002 self.client_id, 

1003 http_client=self.http_client) 

1004 warnings.warn( 

1005 "Change your get_authorization_request_url() " 

1006 "to initiate_auth_code_flow()", DeprecationWarning) 

1007 with warnings.catch_warnings(record=True): 

1008 return client.build_auth_request_uri( 

1009 response_type=response_type, 

1010 redirect_uri=redirect_uri, state=state, login_hint=login_hint, 

1011 prompt=prompt, 

1012 scope=self._decorate_scope(scopes), 

1013 nonce=nonce, 

1014 domain_hint=domain_hint, 

1015 claims=_merge_claims_challenge_and_capabilities( 

1016 self._client_capabilities, claims_challenge), 

1017 ) 

1018 

1019 def acquire_token_by_auth_code_flow( 

1020 self, auth_code_flow, auth_response, scopes=None, **kwargs): 

1021 """Validate the auth response being redirected back, and obtain tokens. 

1022 

1023 It automatically provides nonce protection. 

1024 

1025 :param dict auth_code_flow: 

1026 The same dict returned by :func:`~initiate_auth_code_flow()`. 

1027 :param dict auth_response: 

1028 A dict of the query string received from auth server. 

1029 :param list[str] scopes: 

1030 Scopes requested to access a protected API (a resource). 

1031 

1032 Most of the time, you can leave it empty. 

1033 

1034 If you requested user consent for multiple resources, here you will 

1035 need to provide a subset of what you required in 

1036 :func:`~initiate_auth_code_flow()`. 

1037 

1038 OAuth2 was designed mostly for singleton services, 

1039 where tokens are always meant for the same resource and the only 

1040 changes are in the scopes. 

1041 In Microsoft Entra, tokens can be issued for multiple 3rd party resources. 

1042 You can ask authorization code for multiple resources, 

1043 but when you redeem it, the token is for only one intended 

1044 recipient, called audience. 

1045 So the developer need to specify a scope so that we can restrict the 

1046 token to be issued for the corresponding audience. 

1047 

1048 :return: 

1049 * A dict containing "access_token" and/or "id_token", among others, 

1050 depends on what scope was used. 

1051 (See https://tools.ietf.org/html/rfc6749#section-5.1) 

1052 * A dict containing "error", optionally "error_description", "error_uri". 

1053 (It is either `this <https://tools.ietf.org/html/rfc6749#section-4.1.2.1>`_ 

1054 or `that <https://tools.ietf.org/html/rfc6749#section-5.2>`_) 

1055 * Most client-side data error would result in ValueError exception. 

1056 So the usage pattern could be without any protocol details:: 

1057 

1058 def authorize(): # A controller in a web app 

1059 try: 

1060 result = msal_app.acquire_token_by_auth_code_flow( 

1061 session.get("flow", {}), request.args) 

1062 if "error" in result: 

1063 return render_template("error.html", result) 

1064 use(result) # Token(s) are available in result and cache 

1065 except ValueError: # Usually caused by CSRF 

1066 pass # Simply ignore them 

1067 return redirect(url_for("index")) 

1068 """ 

1069 self._validate_ssh_cert_input_data(kwargs.get("data", {})) 

1070 telemetry_context = self._build_telemetry_context( 

1071 self.ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID) 

1072 response = _clean_up(self.client.obtain_token_by_auth_code_flow( 

1073 auth_code_flow, 

1074 auth_response, 

1075 scope=self._decorate_scope(scopes) if scopes else None, 

1076 headers=telemetry_context.generate_headers(), 

1077 data=dict( 

1078 kwargs.pop("data", {}), 

1079 claims=_merge_claims_challenge_and_capabilities( 

1080 self._client_capabilities, 

1081 auth_code_flow.pop("claims_challenge", None))), 

1082 **kwargs)) 

1083 if "access_token" in response: 

1084 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

1085 telemetry_context.update_telemetry(response) 

1086 return response 

1087 

1088 def acquire_token_by_authorization_code( 

1089 self, 

1090 code, 

1091 scopes, # Syntactically required. STS accepts empty value though. 

1092 redirect_uri=None, 

1093 # REQUIRED, if the "redirect_uri" parameter was included in the 

1094 # authorization request as described in Section 4.1.1, and their 

1095 # values MUST be identical. 

1096 nonce=None, 

1097 claims_challenge=None, 

1098 **kwargs): 

1099 """The second half of the Authorization Code Grant. 

1100 

1101 :param code: The authorization code returned from Authorization Server. 

1102 :param list[str] scopes: (Required) 

1103 Scopes requested to access a protected API (a resource). 

1104 

1105 If you requested user consent for multiple resources, here you will 

1106 typically want to provide a subset of what you required in AuthCode. 

1107 

1108 OAuth2 was designed mostly for singleton services, 

1109 where tokens are always meant for the same resource and the only 

1110 changes are in the scopes. 

1111 In Microsoft Entra, tokens can be issued for multiple 3rd party resources. 

1112 You can ask authorization code for multiple resources, 

1113 but when you redeem it, the token is for only one intended 

1114 recipient, called audience. 

1115 So the developer need to specify a scope so that we can restrict the 

1116 token to be issued for the corresponding audience. 

1117 

1118 :param nonce: 

1119 If you provided a nonce when calling :func:`get_authorization_request_url`, 

1120 same nonce should also be provided here, so that we'll validate it. 

1121 An exception will be raised if the nonce in id token mismatches. 

1122 

1123 :param claims_challenge: 

1124 The claims_challenge parameter requests specific claims requested by the resource provider 

1125 in the form of a claims_challenge directive in the www-authenticate header to be 

1126 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

1127 It is a string of a JSON object which contains lists of claims being requested from these locations. 

1128 

1129 :return: A dict representing the json response from Microsoft Entra: 

1130 

1131 - A successful response would contain "access_token" key, 

1132 - an error response would contain "error" and usually "error_description". 

1133 """ 

1134 # If scope is absent on the wire, STS will give you a token associated 

1135 # to the FIRST scope sent during the authorization request. 

1136 # So in theory, you can omit scope here when you were working with only 

1137 # one scope. But, MSAL decorates your scope anyway, so they are never 

1138 # really empty. 

1139 assert isinstance(scopes, list), "Invalid parameter type" 

1140 self._validate_ssh_cert_input_data(kwargs.get("data", {})) 

1141 warnings.warn( 

1142 "Change your acquire_token_by_authorization_code() " 

1143 "to acquire_token_by_auth_code_flow()", DeprecationWarning) 

1144 with warnings.catch_warnings(record=True): 

1145 telemetry_context = self._build_telemetry_context( 

1146 self.ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID) 

1147 response = _clean_up(self.client.obtain_token_by_authorization_code( 

1148 code, redirect_uri=redirect_uri, 

1149 scope=self._decorate_scope(scopes), 

1150 headers=telemetry_context.generate_headers(), 

1151 data=dict( 

1152 kwargs.pop("data", {}), 

1153 claims=_merge_claims_challenge_and_capabilities( 

1154 self._client_capabilities, claims_challenge)), 

1155 nonce=nonce, 

1156 **kwargs)) 

1157 if "access_token" in response: 

1158 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

1159 telemetry_context.update_telemetry(response) 

1160 return response 

1161 

1162 def get_accounts(self, username=None): 

1163 """Get a list of accounts which previously signed in, i.e. exists in cache. 

1164 

1165 An account can later be used in :func:`~acquire_token_silent` 

1166 to find its tokens. 

1167 

1168 :param username: 

1169 Filter accounts with this username only. Case insensitive. 

1170 :return: A list of account objects. 

1171 Each account is a dict. For now, we only document its "username" field. 

1172 Your app can choose to display those information to end user, 

1173 and allow user to choose one of his/her accounts to proceed. 

1174 """ 

1175 accounts = self._find_msal_accounts(environment=self.authority.instance) 

1176 if not accounts: # Now try other aliases of this authority instance 

1177 for alias in self._get_authority_aliases(self.authority.instance): 

1178 accounts = self._find_msal_accounts(environment=alias) 

1179 if accounts: 

1180 break 

1181 if username: 

1182 # Federated account["username"] from AAD could contain mixed case 

1183 lowercase_username = username.lower() 

1184 accounts = [a for a in accounts 

1185 if a["username"].lower() == lowercase_username] 

1186 if not accounts: 

1187 logger.debug(( # This would also happen when the cache is empty 

1188 "get_accounts(username='{}') finds no account. " 

1189 "If tokens were acquired without 'profile' scope, " 

1190 "they would contain no username for filtering. " 

1191 "Consider calling get_accounts(username=None) instead." 

1192 ).format(username)) 

1193 # Does not further filter by existing RTs here. It probably won't matter. 

1194 # Because in most cases Accounts and RTs co-exist. 

1195 # Even in the rare case when an RT is revoked and then removed, 

1196 # acquire_token_silent() would then yield no result, 

1197 # apps would fall back to other acquire methods. This is the standard pattern. 

1198 return accounts 

1199 

1200 def _find_msal_accounts(self, environment): 

1201 interested_authority_types = [ 

1202 TokenCache.AuthorityType.ADFS, TokenCache.AuthorityType.MSSTS] 

1203 if _is_running_in_cloud_shell(): 

1204 interested_authority_types.append(_AUTHORITY_TYPE_CLOUDSHELL) 

1205 grouped_accounts = { 

1206 a.get("home_account_id"): # Grouped by home tenant's id 

1207 { # These are minimal amount of non-tenant-specific account info 

1208 "home_account_id": a.get("home_account_id"), 

1209 "environment": a.get("environment"), 

1210 "username": a.get("username"), 

1211 "account_source": a.get("account_source"), 

1212 

1213 # The following fields for backward compatibility, for now 

1214 "authority_type": a.get("authority_type"), 

1215 "local_account_id": a.get("local_account_id"), # Tenant-specific 

1216 "realm": a.get("realm"), # Tenant-specific 

1217 } 

1218 for a in self.token_cache.search( 

1219 TokenCache.CredentialType.ACCOUNT, 

1220 query={"environment": environment}) 

1221 if a["authority_type"] in interested_authority_types 

1222 } 

1223 return list(grouped_accounts.values()) 

1224 

1225 def _get_instance_metadata(self): # This exists so it can be mocked in unit test 

1226 resp = self.http_client.get( 

1227 "https://login.microsoftonline.com/common/discovery/instance?api-version=1.1&authorization_endpoint=https://login.microsoftonline.com/common/oauth2/authorize", # TBD: We may extend this to use self._instance_discovery endpoint 

1228 headers={'Accept': 'application/json'}) 

1229 resp.raise_for_status() 

1230 return json.loads(resp.text)['metadata'] 

1231 

1232 def _get_authority_aliases(self, instance): 

1233 if self._instance_discovery is False: 

1234 return [] 

1235 if self.authority._is_known_to_developer: 

1236 # Then it is an ADFS/B2C/known_authority_hosts situation 

1237 # which may not reach the central endpoint, so we skip it. 

1238 return [] 

1239 if not self.authority_groups: 

1240 self.authority_groups = [ 

1241 set(group['aliases']) for group in self._get_instance_metadata()] 

1242 for group in self.authority_groups: 

1243 if instance in group: 

1244 return [alias for alias in group if alias != instance] 

1245 return [] 

1246 

1247 def remove_account(self, account): 

1248 """Sign me out and forget me from token cache""" 

1249 if self._enable_broker: 

1250 from .broker import _signout_silently 

1251 error = _signout_silently(self.client_id, account["local_account_id"]) 

1252 if error: 

1253 logger.debug("_signout_silently() returns error: %s", error) 

1254 # Broker sign-out has been attempted, even if the _forget_me() below throws. 

1255 self._forget_me(account) 

1256 

1257 def _sign_out(self, home_account): 

1258 # Remove all relevant RTs and ATs from token cache 

1259 owned_by_home_account = { 

1260 "environment": home_account["environment"], 

1261 "home_account_id": home_account["home_account_id"],} # realm-independent 

1262 app_metadata = self._get_app_metadata(home_account["environment"]) 

1263 # Remove RTs/FRTs, and they are realm-independent 

1264 for rt in [ # Remove RTs from a static list (rather than from a dynamic generator), 

1265 # to avoid changing self.token_cache while it is being iterated 

1266 rt for rt in self.token_cache.search( 

1267 TokenCache.CredentialType.REFRESH_TOKEN, query=owned_by_home_account) 

1268 # Do RT's app ownership check as a precaution, in case family apps 

1269 # and 3rd-party apps share same token cache, although they should not. 

1270 if rt["client_id"] == self.client_id or ( 

1271 app_metadata.get("family_id") # Now let's settle family business 

1272 and rt.get("family_id") == app_metadata["family_id"]) 

1273 ]: 

1274 self.token_cache.remove_rt(rt) 

1275 for at in list(self.token_cache.search( # Remove ATs from a static list, 

1276 # to avoid changing self.token_cache while it is being iterated 

1277 TokenCache.CredentialType.ACCESS_TOKEN, query=owned_by_home_account, 

1278 # Regardless of realm, b/c we've removed realm-independent RTs anyway 

1279 )): 

1280 # To avoid the complexity of locating sibling family app's AT, 

1281 # we skip AT's app ownership check. 

1282 # It means ATs for other apps will also be removed, it is OK because: 

1283 # * non-family apps are not supposed to share token cache to begin with; 

1284 # * Even if it happens, we keep other app's RT already, so SSO still works 

1285 self.token_cache.remove_at(at) 

1286 

1287 def _forget_me(self, home_account): 

1288 # It implies signout, and then also remove all relevant accounts and IDTs 

1289 self._sign_out(home_account) 

1290 owned_by_home_account = { 

1291 "environment": home_account["environment"], 

1292 "home_account_id": home_account["home_account_id"],} # realm-independent 

1293 for idt in list(self.token_cache.search( # Remove IDTs from a static list, 

1294 # to avoid changing self.token_cache while it is being iterated 

1295 TokenCache.CredentialType.ID_TOKEN, query=owned_by_home_account, # regardless of realm 

1296 )): 

1297 self.token_cache.remove_idt(idt) 

1298 for a in list(self.token_cache.search( # Remove Accounts from a static list, 

1299 # to avoid changing self.token_cache while it is being iterated 

1300 TokenCache.CredentialType.ACCOUNT, query=owned_by_home_account, # regardless of realm 

1301 )): 

1302 self.token_cache.remove_account(a) 

1303 

1304 def _acquire_token_by_cloud_shell(self, scopes, data=None): 

1305 from .cloudshell import _obtain_token 

1306 response = _obtain_token( 

1307 self.http_client, scopes, client_id=self.client_id, data=data) 

1308 if "error" not in response: 

1309 self.token_cache.add(dict( 

1310 client_id=self.client_id, 

1311 scope=response["scope"].split() if "scope" in response else scopes, 

1312 token_endpoint=self.authority.token_endpoint, 

1313 response=response, 

1314 data=data or {}, 

1315 authority_type=_AUTHORITY_TYPE_CLOUDSHELL, 

1316 )) 

1317 if "access_token" in response: 

1318 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_BROKER 

1319 return response 

1320 

1321 def acquire_token_silent( 

1322 self, 

1323 scopes, # type: List[str] 

1324 account, # type: Optional[Account] 

1325 authority=None, # See get_authorization_request_url() 

1326 force_refresh=False, # type: Optional[boolean] 

1327 claims_challenge=None, 

1328 auth_scheme=None, 

1329 **kwargs): 

1330 """Acquire an access token for given account, without user interaction. 

1331 

1332 It has same parameters as the :func:`~acquire_token_silent_with_error`. 

1333 The difference is the behavior of the return value. 

1334 This method will combine the cache empty and refresh error 

1335 into one return value, `None`. 

1336 If your app does not care about the exact token refresh error during 

1337 token cache look-up, then this method is easier and recommended. 

1338 

1339 :return: 

1340 - A dict containing no "error" key, 

1341 and typically contains an "access_token" key, 

1342 if cache lookup succeeded. 

1343 - None when cache lookup does not yield a token. 

1344 """ 

1345 if not account: 

1346 return None # A backward-compatible NO-OP to drop the account=None usage 

1347 result = _clean_up(self._acquire_token_silent_with_error( 

1348 scopes, account, authority=authority, force_refresh=force_refresh, 

1349 claims_challenge=claims_challenge, auth_scheme=auth_scheme, **kwargs)) 

1350 return result if result and "error" not in result else None 

1351 

1352 def acquire_token_silent_with_error( 

1353 self, 

1354 scopes, # type: List[str] 

1355 account, # type: Optional[Account] 

1356 authority=None, # See get_authorization_request_url() 

1357 force_refresh=False, # type: Optional[boolean] 

1358 claims_challenge=None, 

1359 auth_scheme=None, 

1360 **kwargs): 

1361 """Acquire an access token for given account, without user interaction. 

1362 

1363 It is done either by finding a valid access token from cache, 

1364 or by finding a valid refresh token from cache and then automatically 

1365 use it to redeem a new access token. 

1366 

1367 This method will differentiate cache empty from token refresh error. 

1368 If your app cares the exact token refresh error during 

1369 token cache look-up, then this method is suitable. 

1370 Otherwise, the other method :func:`~acquire_token_silent` is recommended. 

1371 

1372 :param list[str] scopes: (Required) 

1373 Scopes requested to access a protected API (a resource). 

1374 :param account: (Required) 

1375 One of the account object returned by :func:`~get_accounts`. 

1376 Starting from MSAL Python 1.23, 

1377 a ``None`` input will become a NO-OP and always return ``None``. 

1378 :param force_refresh: 

1379 If True, it will skip Access Token look-up, 

1380 and try to find a Refresh Token to obtain a new Access Token. 

1381 :param claims_challenge: 

1382 The claims_challenge parameter requests specific claims requested by the resource provider 

1383 in the form of a claims_challenge directive in the www-authenticate header to be 

1384 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

1385 It is a string of a JSON object which contains lists of claims being requested from these locations. 

1386 :param object auth_scheme: 

1387 You can provide an ``msal.auth_scheme.PopAuthScheme`` object 

1388 so that MSAL will get a Proof-of-Possession (POP) token for you. 

1389 

1390 New in version 1.26.0. 

1391 

1392 :return: 

1393 - A dict containing no "error" key, 

1394 and typically contains an "access_token" key, 

1395 if cache lookup succeeded. 

1396 - None when there is simply no token in the cache. 

1397 - A dict containing an "error" key, when token refresh failed. 

1398 """ 

1399 if not account: 

1400 return None # A backward-compatible NO-OP to drop the account=None usage 

1401 return _clean_up(self._acquire_token_silent_with_error( 

1402 scopes, account, authority=authority, force_refresh=force_refresh, 

1403 claims_challenge=claims_challenge, auth_scheme=auth_scheme, **kwargs)) 

1404 

1405 def _acquire_token_silent_with_error( 

1406 self, 

1407 scopes, # type: List[str] 

1408 account, # type: Optional[Account] 

1409 authority=None, # See get_authorization_request_url() 

1410 force_refresh=False, # type: Optional[boolean] 

1411 claims_challenge=None, 

1412 auth_scheme=None, 

1413 **kwargs): 

1414 assert isinstance(scopes, list), "Invalid parameter type" 

1415 self._validate_ssh_cert_input_data(kwargs.get("data", {})) 

1416 correlation_id = msal.telemetry._get_new_correlation_id() 

1417 if authority: 

1418 warnings.warn("We haven't decided how/if this method will accept authority parameter") 

1419 # the_authority = Authority( 

1420 # authority, 

1421 # self.http_client, 

1422 # instance_discovery=self._instance_discovery, 

1423 # ) if authority else self.authority 

1424 result = self._acquire_token_silent_from_cache_and_possibly_refresh_it( 

1425 scopes, account, self.authority, force_refresh=force_refresh, 

1426 claims_challenge=claims_challenge, 

1427 correlation_id=correlation_id, 

1428 auth_scheme=auth_scheme, 

1429 **kwargs) 

1430 if result and "error" not in result: 

1431 return result 

1432 final_result = result 

1433 for alias in self._get_authority_aliases(self.authority.instance): 

1434 if not list(self.token_cache.search( # Need a list to test emptiness 

1435 self.token_cache.CredentialType.REFRESH_TOKEN, 

1436 # target=scopes, # MUST NOT filter by scopes, because: 

1437 # 1. AAD RTs are scope-independent; 

1438 # 2. therefore target is optional per schema; 

1439 query={"environment": alias})): 

1440 # Skip heavy weight logic when RT for this alias doesn't exist 

1441 continue 

1442 the_authority = Authority( 

1443 "https://" + alias + "/" + self.authority.tenant, 

1444 self.http_client, 

1445 instance_discovery=False, 

1446 ) 

1447 result = self._acquire_token_silent_from_cache_and_possibly_refresh_it( 

1448 scopes, account, the_authority, force_refresh=force_refresh, 

1449 claims_challenge=claims_challenge, 

1450 correlation_id=correlation_id, 

1451 auth_scheme=auth_scheme, 

1452 **kwargs) 

1453 if result: 

1454 if "error" not in result: 

1455 return result 

1456 final_result = result 

1457 if final_result and final_result.get("suberror"): 

1458 final_result["classification"] = { # Suppress these suberrors, per #57 

1459 "bad_token": "", 

1460 "token_expired": "", 

1461 "protection_policy_required": "", 

1462 "client_mismatch": "", 

1463 "device_authentication_failed": "", 

1464 }.get(final_result["suberror"], final_result["suberror"]) 

1465 return final_result 

1466 

1467 def _acquire_token_silent_from_cache_and_possibly_refresh_it( 

1468 self, 

1469 scopes, # type: List[str] 

1470 account, # type: Optional[Account] 

1471 authority, # This can be different than self.authority 

1472 force_refresh=False, # type: Optional[boolean] 

1473 claims_challenge=None, 

1474 correlation_id=None, 

1475 http_exceptions=None, 

1476 auth_scheme=None, 

1477 **kwargs): 

1478 # This internal method has two calling patterns: 

1479 # it accepts a non-empty account to find token for a user, 

1480 # and accepts account=None to find a token for the current app. 

1481 access_token_from_cache = None 

1482 if not (force_refresh or claims_challenge or auth_scheme): # Then attempt AT cache 

1483 query={ 

1484 "client_id": self.client_id, 

1485 "environment": authority.instance, 

1486 "realm": authority.tenant, 

1487 "home_account_id": (account or {}).get("home_account_id"), 

1488 } 

1489 key_id = kwargs.get("data", {}).get("key_id") 

1490 if key_id: # Some token types (SSH-certs, POP) are bound to a key 

1491 query["key_id"] = key_id 

1492 now = time.time() 

1493 refresh_reason = msal.telemetry.AT_ABSENT 

1494 for entry in self.token_cache.search( # A generator allows us to 

1495 # break early in cache-hit without finding a full list 

1496 self.token_cache.CredentialType.ACCESS_TOKEN, 

1497 target=scopes, 

1498 query=query, 

1499 ): # This loop is about token search, not about token deletion. 

1500 # Note that search() holds a lock during this loop; 

1501 # that is fine because this loop is fast 

1502 expires_in = int(entry["expires_on"]) - now 

1503 if expires_in < 5*60: # Then consider it expired 

1504 refresh_reason = msal.telemetry.AT_EXPIRED 

1505 continue # Removal is not necessary, it will be overwritten 

1506 logger.debug("Cache hit an AT") 

1507 access_token_from_cache = { # Mimic a real response 

1508 "access_token": entry["secret"], 

1509 "token_type": entry.get("token_type", "Bearer"), 

1510 "expires_in": int(expires_in), # OAuth2 specs defines it as int 

1511 self._TOKEN_SOURCE: self._TOKEN_SOURCE_CACHE, 

1512 } 

1513 if "refresh_on" in entry: 

1514 access_token_from_cache["refresh_on"] = int(entry["refresh_on"]) 

1515 if int(entry["refresh_on"]) < now: # aging 

1516 refresh_reason = msal.telemetry.AT_AGING 

1517 break # With a fallback in hand, we break here to go refresh 

1518 self._build_telemetry_context(-1).hit_an_access_token() 

1519 return access_token_from_cache # It is still good as new 

1520 else: 

1521 refresh_reason = msal.telemetry.FORCE_REFRESH # TODO: It could also mean claims_challenge 

1522 assert refresh_reason, "It should have been established at this point" 

1523 if not http_exceptions: # It can be a tuple of exceptions 

1524 # The exact HTTP exceptions are transportation-layer dependent 

1525 from requests.exceptions import RequestException # Lazy load 

1526 http_exceptions = (RequestException,) 

1527 try: 

1528 data = kwargs.get("data", {}) 

1529 if account and account.get("authority_type") == _AUTHORITY_TYPE_CLOUDSHELL: 

1530 if auth_scheme: 

1531 raise ValueError("auth_scheme is not supported in Cloud Shell") 

1532 return self._acquire_token_by_cloud_shell(scopes, data=data) 

1533 

1534 if self._enable_broker and account and account.get("account_source") in ( 

1535 _GRANT_TYPE_BROKER, # Broker successfully established this account previously. 

1536 None, # Unknown data from older MSAL. Broker might still work. 

1537 ): 

1538 from .broker import _acquire_token_silently 

1539 response = _acquire_token_silently( 

1540 "https://{}/{}".format(self.authority.instance, self.authority.tenant), 

1541 self.client_id, 

1542 account["local_account_id"], 

1543 scopes, 

1544 claims=_merge_claims_challenge_and_capabilities( 

1545 self._client_capabilities, claims_challenge), 

1546 correlation_id=correlation_id, 

1547 auth_scheme=auth_scheme, 

1548 **data) 

1549 if response: # Broker provides a decisive outcome 

1550 account_was_established_by_broker = account.get( 

1551 "account_source") == _GRANT_TYPE_BROKER 

1552 broker_attempt_succeeded_just_now = "error" not in response 

1553 if account_was_established_by_broker or broker_attempt_succeeded_just_now: 

1554 return self._process_broker_response(response, scopes, data) 

1555 

1556 if auth_scheme: 

1557 raise ValueError(self._AUTH_SCHEME_UNSUPPORTED) 

1558 if account: 

1559 result = self._acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family( 

1560 authority, self._decorate_scope(scopes), account, 

1561 refresh_reason=refresh_reason, claims_challenge=claims_challenge, 

1562 correlation_id=correlation_id, 

1563 **kwargs) 

1564 else: # The caller is acquire_token_for_client() 

1565 result = self._acquire_token_for_client( 

1566 scopes, refresh_reason, claims_challenge=claims_challenge, 

1567 **kwargs) 

1568 if result and "access_token" in result: 

1569 result[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

1570 if (result and "error" not in result) or (not access_token_from_cache): 

1571 return result 

1572 except http_exceptions: 

1573 # Typically network error. Potential AAD outage? 

1574 if not access_token_from_cache: # It means there is no fall back option 

1575 raise # We choose to bubble up the exception 

1576 return access_token_from_cache 

1577 

1578 def _process_broker_response(self, response, scopes, data): 

1579 if "error" not in response: 

1580 self.token_cache.add(dict( 

1581 client_id=self.client_id, 

1582 scope=response["scope"].split() if "scope" in response else scopes, 

1583 token_endpoint=self.authority.token_endpoint, 

1584 response=response, 

1585 data=data, 

1586 _account_id=response["_account_id"], 

1587 environment=self.authority.instance, # Be consistent with non-broker flows 

1588 grant_type=_GRANT_TYPE_BROKER, # A pseudo grant type for TokenCache to mark account_source as broker 

1589 )) 

1590 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_BROKER 

1591 return _clean_up(response) 

1592 

1593 def _acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family( 

1594 self, authority, scopes, account, **kwargs): 

1595 query = { 

1596 "environment": authority.instance, 

1597 "home_account_id": (account or {}).get("home_account_id"), 

1598 # "realm": authority.tenant, # AAD RTs are tenant-independent 

1599 } 

1600 app_metadata = self._get_app_metadata(authority.instance) 

1601 if not app_metadata: # Meaning this app is now used for the first time. 

1602 # When/if we have a way to directly detect current app's family, 

1603 # we'll rewrite this block, to support multiple families. 

1604 # For now, we try existing RTs (*). If it works, we are in that family. 

1605 # (*) RTs of a different app/family are not supposed to be 

1606 # shared with or accessible by us in the first place. 

1607 at = self._acquire_token_silent_by_finding_specific_refresh_token( 

1608 authority, scopes, 

1609 dict(query, family_id="1"), # A hack, we have only 1 family for now 

1610 rt_remover=lambda rt_item: None, # NO-OP b/c RTs are likely not mine 

1611 break_condition=lambda response: # Break loop when app not in family 

1612 # Based on an AAD-only behavior mentioned in internal doc here 

1613 # https://msazure.visualstudio.com/One/_git/ESTS-Docs/pullrequest/1138595 

1614 "client_mismatch" in response.get("error_additional_info", []), 

1615 **kwargs) 

1616 if at and "error" not in at: 

1617 return at 

1618 last_resp = None 

1619 if app_metadata.get("family_id"): # Meaning this app belongs to this family 

1620 last_resp = at = self._acquire_token_silent_by_finding_specific_refresh_token( 

1621 authority, scopes, dict(query, family_id=app_metadata["family_id"]), 

1622 **kwargs) 

1623 if at and "error" not in at: 

1624 return at 

1625 # Either this app is an orphan, so we will naturally use its own RT; 

1626 # or all attempts above have failed, so we fall back to non-foci behavior. 

1627 return self._acquire_token_silent_by_finding_specific_refresh_token( 

1628 authority, scopes, dict(query, client_id=self.client_id), 

1629 **kwargs) or last_resp 

1630 

1631 def _get_app_metadata(self, environment): 

1632 return self.token_cache._get_app_metadata( 

1633 environment=environment, client_id=self.client_id, default={}) 

1634 

1635 def _acquire_token_silent_by_finding_specific_refresh_token( 

1636 self, authority, scopes, query, 

1637 rt_remover=None, break_condition=lambda response: False, 

1638 refresh_reason=None, correlation_id=None, claims_challenge=None, 

1639 **kwargs): 

1640 matches = list(self.token_cache.search( # We want a list to test emptiness 

1641 self.token_cache.CredentialType.REFRESH_TOKEN, 

1642 # target=scopes, # AAD RTs are scope-independent 

1643 query=query)) 

1644 logger.debug("Found %d RTs matching %s", len(matches), { 

1645 k: _pii_less_home_account_id(v) if k == "home_account_id" and v else v 

1646 for k, v in query.items() 

1647 }) 

1648 

1649 response = None # A distinguishable value to mean cache is empty 

1650 if not matches: # Then exit early to avoid expensive operations 

1651 return response 

1652 client, _ = self._build_client( 

1653 # Potentially expensive if building regional client 

1654 self.client_credential, authority, skip_regional_client=True) 

1655 telemetry_context = self._build_telemetry_context( 

1656 self.ACQUIRE_TOKEN_SILENT_ID, 

1657 correlation_id=correlation_id, refresh_reason=refresh_reason) 

1658 for entry in sorted( # Since unfit RTs would not be aggressively removed, 

1659 # we start from newer RTs which are more likely fit. 

1660 matches, 

1661 key=lambda e: int(e.get("last_modification_time", "0")), 

1662 reverse=True): 

1663 logger.debug("Cache attempts an RT") 

1664 headers = telemetry_context.generate_headers() 

1665 if query.get("home_account_id"): # Then use it as CCS Routing info 

1666 headers["X-AnchorMailbox"] = "Oid:{}".format( # case-insensitive value 

1667 query["home_account_id"].replace(".", "@")) 

1668 response = client.obtain_token_by_refresh_token( 

1669 entry, rt_getter=lambda token_item: token_item["secret"], 

1670 on_removing_rt=lambda rt_item: None, # Disable RT removal, 

1671 # because an invalid_grant could be caused by new MFA policy, 

1672 # the RT could still be useful for other MFA-less scope or tenant 

1673 on_obtaining_tokens=lambda event: self.token_cache.add(dict( 

1674 event, 

1675 environment=authority.instance, 

1676 skip_account_creation=True, # To honor a concurrent remove_account() 

1677 )), 

1678 scope=scopes, 

1679 headers=headers, 

1680 data=dict( 

1681 kwargs.pop("data", {}), 

1682 claims=_merge_claims_challenge_and_capabilities( 

1683 self._client_capabilities, claims_challenge)), 

1684 **kwargs) 

1685 telemetry_context.update_telemetry(response) 

1686 if "error" not in response: 

1687 return response 

1688 logger.debug("Refresh failed. {error}: {error_description}".format( 

1689 error=response.get("error"), 

1690 error_description=response.get("error_description"), 

1691 )) 

1692 if break_condition(response): 

1693 break 

1694 return response # Returns the latest error (if any), or just None 

1695 

1696 def _validate_ssh_cert_input_data(self, data): 

1697 if data.get("token_type") == "ssh-cert": 

1698 if not data.get("req_cnf"): 

1699 raise ValueError( 

1700 "When requesting an SSH certificate, " 

1701 "you must include a string parameter named 'req_cnf' " 

1702 "containing the public key in JWK format " 

1703 "(https://tools.ietf.org/html/rfc7517).") 

1704 if not data.get("key_id"): 

1705 raise ValueError( 

1706 "When requesting an SSH certificate, " 

1707 "you must include a string parameter named 'key_id' " 

1708 "which identifies the key in the 'req_cnf' argument.") 

1709 

1710 def acquire_token_by_refresh_token(self, refresh_token, scopes, **kwargs): 

1711 """Acquire token(s) based on a refresh token (RT) obtained from elsewhere. 

1712 

1713 You use this method only when you have old RTs from elsewhere, 

1714 and now you want to migrate them into MSAL. 

1715 Calling this method results in new tokens automatically storing into MSAL. 

1716 

1717 You do NOT need to use this method if you are already using MSAL. 

1718 MSAL maintains RT automatically inside its token cache, 

1719 and an access token can be retrieved 

1720 when you call :func:`~acquire_token_silent`. 

1721 

1722 :param str refresh_token: The old refresh token, as a string. 

1723 

1724 :param list scopes: 

1725 The scopes associate with this old RT. 

1726 Each scope needs to be in the Microsoft identity platform (v2) format. 

1727 See `Scopes not resources <https://docs.microsoft.com/en-us/azure/active-directory/develop/migrate-python-adal-msal#scopes-not-resources>`_. 

1728 

1729 :return: 

1730 * A dict contains "error" and some other keys, when error happened. 

1731 * A dict contains no "error" key means migration was successful. 

1732 """ 

1733 self._validate_ssh_cert_input_data(kwargs.get("data", {})) 

1734 telemetry_context = self._build_telemetry_context( 

1735 self.ACQUIRE_TOKEN_BY_REFRESH_TOKEN, 

1736 refresh_reason=msal.telemetry.FORCE_REFRESH) 

1737 response = _clean_up(self.client.obtain_token_by_refresh_token( 

1738 refresh_token, 

1739 scope=self._decorate_scope(scopes), 

1740 headers=telemetry_context.generate_headers(), 

1741 rt_getter=lambda rt: rt, 

1742 on_updating_rt=False, 

1743 on_removing_rt=lambda rt_item: None, # No OP 

1744 **kwargs)) 

1745 if "access_token" in response: 

1746 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

1747 telemetry_context.update_telemetry(response) 

1748 return response 

1749 

1750 def acquire_token_by_username_password( 

1751 self, username, password, scopes, claims_challenge=None, 

1752 # Note: We shouldn't need to surface enable_msa_passthrough, 

1753 # because this ROPC won't work with MSA account anyway. 

1754 auth_scheme=None, 

1755 **kwargs): 

1756 """Gets a token for a given resource via user credentials. 

1757 

1758 See this page for constraints of Username Password Flow. 

1759 https://github.com/AzureAD/microsoft-authentication-library-for-python/wiki/Username-Password-Authentication 

1760 

1761 :param str username: Typically a UPN in the form of an email address. 

1762 :param str password: The password. 

1763 :param list[str] scopes: 

1764 Scopes requested to access a protected API (a resource). 

1765 :param claims_challenge: 

1766 The claims_challenge parameter requests specific claims requested by the resource provider 

1767 in the form of a claims_challenge directive in the www-authenticate header to be 

1768 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

1769 It is a string of a JSON object which contains lists of claims being requested from these locations. 

1770 

1771 :param object auth_scheme: 

1772 You can provide an ``msal.auth_scheme.PopAuthScheme`` object 

1773 so that MSAL will get a Proof-of-Possession (POP) token for you. 

1774 

1775 New in version 1.26.0. 

1776 

1777 :return: A dict representing the json response from Microsoft Entra: 

1778 

1779 - A successful response would contain "access_token" key, 

1780 - an error response would contain "error" and usually "error_description". 

1781 """ 

1782 claims = _merge_claims_challenge_and_capabilities( 

1783 self._client_capabilities, claims_challenge) 

1784 if self._enable_broker: 

1785 from .broker import _signin_silently 

1786 response = _signin_silently( 

1787 "https://{}/{}".format(self.authority.instance, self.authority.tenant), 

1788 self.client_id, 

1789 scopes, # Decorated scopes won't work due to offline_access 

1790 MSALRuntime_Username=username, 

1791 MSALRuntime_Password=password, 

1792 validateAuthority="no" if ( 

1793 self.authority._is_known_to_developer 

1794 or self._instance_discovery is False) else None, 

1795 claims=claims, 

1796 auth_scheme=auth_scheme, 

1797 ) 

1798 return self._process_broker_response(response, scopes, kwargs.get("data", {})) 

1799 

1800 if auth_scheme: 

1801 raise ValueError(self._AUTH_SCHEME_UNSUPPORTED) 

1802 scopes = self._decorate_scope(scopes) 

1803 telemetry_context = self._build_telemetry_context( 

1804 self.ACQUIRE_TOKEN_BY_USERNAME_PASSWORD_ID) 

1805 headers = telemetry_context.generate_headers() 

1806 data = dict(kwargs.pop("data", {}), claims=claims) 

1807 response = None 

1808 if not self.authority.is_adfs: 

1809 user_realm_result = self.authority.user_realm_discovery( 

1810 username, correlation_id=headers[msal.telemetry.CLIENT_REQUEST_ID]) 

1811 if user_realm_result.get("account_type") == "Federated": 

1812 response = _clean_up(self._acquire_token_by_username_password_federated( 

1813 user_realm_result, username, password, scopes=scopes, 

1814 data=data, 

1815 headers=headers, **kwargs)) 

1816 if response is None: # Either ADFS or not federated 

1817 response = _clean_up(self.client.obtain_token_by_username_password( 

1818 username, password, scope=scopes, 

1819 headers=headers, 

1820 data=data, 

1821 **kwargs)) 

1822 if "access_token" in response: 

1823 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

1824 telemetry_context.update_telemetry(response) 

1825 return response 

1826 

1827 def _acquire_token_by_username_password_federated( 

1828 self, user_realm_result, username, password, scopes=None, **kwargs): 

1829 wstrust_endpoint = {} 

1830 if user_realm_result.get("federation_metadata_url"): 

1831 wstrust_endpoint = mex_send_request( 

1832 user_realm_result["federation_metadata_url"], 

1833 self.http_client) 

1834 if wstrust_endpoint is None: 

1835 raise ValueError("Unable to find wstrust endpoint from MEX. " 

1836 "This typically happens when attempting MSA accounts. " 

1837 "More details available here. " 

1838 "https://github.com/AzureAD/microsoft-authentication-library-for-python/wiki/Username-Password-Authentication") 

1839 logger.debug("wstrust_endpoint = %s", wstrust_endpoint) 

1840 wstrust_result = wst_send_request( 

1841 username, password, 

1842 user_realm_result.get("cloud_audience_urn", "urn:federation:MicrosoftOnline"), 

1843 wstrust_endpoint.get("address", 

1844 # Fallback to an AAD supplied endpoint 

1845 user_realm_result.get("federation_active_auth_url")), 

1846 wstrust_endpoint.get("action"), self.http_client) 

1847 if not ("token" in wstrust_result and "type" in wstrust_result): 

1848 raise RuntimeError("Unsuccessful RSTR. %s" % wstrust_result) 

1849 GRANT_TYPE_SAML1_1 = 'urn:ietf:params:oauth:grant-type:saml1_1-bearer' 

1850 grant_type = { 

1851 SAML_TOKEN_TYPE_V1: GRANT_TYPE_SAML1_1, 

1852 SAML_TOKEN_TYPE_V2: self.client.GRANT_TYPE_SAML2, 

1853 WSS_SAML_TOKEN_PROFILE_V1_1: GRANT_TYPE_SAML1_1, 

1854 WSS_SAML_TOKEN_PROFILE_V2: self.client.GRANT_TYPE_SAML2 

1855 }.get(wstrust_result.get("type")) 

1856 if not grant_type: 

1857 raise RuntimeError( 

1858 "RSTR returned unknown token type: %s", wstrust_result.get("type")) 

1859 self.client.grant_assertion_encoders.setdefault( # Register a non-standard type 

1860 grant_type, self.client.encode_saml_assertion) 

1861 return self.client.obtain_token_by_assertion( 

1862 wstrust_result["token"], grant_type, scope=scopes, 

1863 on_obtaining_tokens=lambda event: self.token_cache.add(dict( 

1864 event, 

1865 environment=self.authority.instance, 

1866 username=username, # Useful in case IDT contains no such info 

1867 )), 

1868 **kwargs) 

1869 

1870 

1871class PublicClientApplication(ClientApplication): # browser app or mobile app 

1872 

1873 DEVICE_FLOW_CORRELATION_ID = "_correlation_id" 

1874 CONSOLE_WINDOW_HANDLE = object() 

1875 

1876 def __init__(self, client_id, client_credential=None, **kwargs): 

1877 """Same as :func:`ClientApplication.__init__`, 

1878 except that ``client_credential`` parameter shall remain ``None``. 

1879 

1880 .. note:: 

1881 

1882 You may set enable_broker_on_windows to True. 

1883 

1884 **What is a broker, and why use it?** 

1885 

1886 A broker is a component installed on your device. 

1887 Broker implicitly gives your device an identity. By using a broker, 

1888 your device becomes a factor that can satisfy MFA (Multi-factor authentication). 

1889 This factor would become mandatory 

1890 if a tenant's admin enables a corresponding Conditional Access (CA) policy. 

1891 The broker's presence allows Microsoft identity platform 

1892 to have higher confidence that the tokens are being issued to your device, 

1893 and that is more secure. 

1894 

1895 An additional benefit of broker is, 

1896 it runs as a long-lived process with your device's OS, 

1897 and maintains its own cache, 

1898 so that your broker-enabled apps (even a CLI) 

1899 could automatically SSO from a previously established signed-in session. 

1900 

1901 **You shall only enable broker when your app:** 

1902 

1903 1. is running on supported platforms, 

1904 and already registered their corresponding redirect_uri 

1905 

1906 * ``ms-appx-web://Microsoft.AAD.BrokerPlugin/your_client_id`` 

1907 if your app is expected to run on Windows 10+ 

1908 

1909 2. installed broker dependency, 

1910 e.g. ``pip install msal[broker]>=1.25,<2``. 

1911 

1912 3. tested with ``acquire_token_interactive()`` and ``acquire_token_silent()``. 

1913 

1914 **The fallback behaviors of MSAL Python's broker support** 

1915 

1916 MSAL will either error out, or silently fallback to non-broker flows. 

1917 

1918 1. MSAL will ignore the `enable_broker_...` and bypass broker 

1919 on those auth flows that are known to be NOT supported by broker. 

1920 This includes ADFS, B2C, etc.. 

1921 For other "could-use-broker" scenarios, please see below. 

1922 2. MSAL errors out when app developer opted-in to use broker 

1923 but a direct dependency "mid-tier" package is not installed. 

1924 Error message guides app developer to declare the correct dependency 

1925 ``msal[broker]``. 

1926 We error out here because the error is actionable to app developers. 

1927 3. MSAL silently "deactivates" the broker and fallback to non-broker, 

1928 when opted-in, dependency installed yet failed to initialize. 

1929 We anticipate this would happen on a device whose OS is too old 

1930 or the underlying broker component is somehow unavailable. 

1931 There is not much an app developer or the end user can do here. 

1932 Eventually, the conditional access policy shall 

1933 force the user to switch to a different device. 

1934 4. MSAL errors out when broker is opted in, installed, initialized, 

1935 but subsequent token request(s) failed. 

1936 

1937 :param boolean enable_broker_on_windows: 

1938 This setting is only effective if your app is running on Windows 10+. 

1939 This parameter defaults to None, which means MSAL will not utilize a broker. 

1940 

1941 New in MSAL Python 1.25.0. 

1942 """ 

1943 if client_credential is not None: 

1944 raise ValueError("Public Client should not possess credentials") 

1945 # Using kwargs notation for now. We will switch to keyword-only arguments. 

1946 enable_broker_on_windows = kwargs.pop("enable_broker_on_windows", False) 

1947 self._enable_broker = enable_broker_on_windows and sys.platform == "win32" 

1948 super(PublicClientApplication, self).__init__( 

1949 client_id, client_credential=None, **kwargs) 

1950 

1951 def acquire_token_interactive( 

1952 self, 

1953 scopes, # type: list[str] 

1954 prompt=None, 

1955 login_hint=None, # type: Optional[str] 

1956 domain_hint=None, # type: Optional[str] 

1957 claims_challenge=None, 

1958 timeout=None, 

1959 port=None, 

1960 extra_scopes_to_consent=None, 

1961 max_age=None, 

1962 parent_window_handle=None, 

1963 on_before_launching_ui=None, 

1964 auth_scheme=None, 

1965 **kwargs): 

1966 """Acquire token interactively i.e. via a local browser. 

1967 

1968 Prerequisite: In Azure Portal, configure the Redirect URI of your 

1969 "Mobile and Desktop application" as ``http://localhost``. 

1970 If you opts in to use broker during ``PublicClientApplication`` creation, 

1971 your app also need this Redirect URI: 

1972 ``ms-appx-web://Microsoft.AAD.BrokerPlugin/YOUR_CLIENT_ID`` 

1973 

1974 :param list scopes: 

1975 It is a list of case-sensitive strings. 

1976 :param str prompt: 

1977 By default, no prompt value will be sent, not even string ``"none"``. 

1978 You will have to specify a value explicitly. 

1979 Its valid values are the constants defined in 

1980 :class:`Prompt <msal.Prompt>`. 

1981 :param str login_hint: 

1982 Optional. Identifier of the user. Generally a User Principal Name (UPN). 

1983 :param domain_hint: 

1984 Can be one of "consumers" or "organizations" or your tenant domain "contoso.com". 

1985 If included, it will skip the email-based discovery process that user goes 

1986 through on the sign-in page, leading to a slightly more streamlined user experience. 

1987 More information on possible values available in 

1988 `Auth Code Flow doc <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-auth-code-flow#request-an-authorization-code>`_ and 

1989 `domain_hint doc <https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-oapx/86fb452d-e34a-494e-ac61-e526e263b6d8>`_. 

1990 

1991 :param claims_challenge: 

1992 The claims_challenge parameter requests specific claims requested by the resource provider 

1993 in the form of a claims_challenge directive in the www-authenticate header to be 

1994 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

1995 It is a string of a JSON object which contains lists of claims being requested from these locations. 

1996 

1997 :param int timeout: 

1998 This method will block the current thread. 

1999 This parameter specifies the timeout value in seconds. 

2000 Default value ``None`` means wait indefinitely. 

2001 

2002 :param int port: 

2003 The port to be used to listen to an incoming auth response. 

2004 By default we will use a system-allocated port. 

2005 (The rest of the redirect_uri is hard coded as ``http://localhost``.) 

2006 

2007 :param list extra_scopes_to_consent: 

2008 "Extra scopes to consent" is a concept only available in Microsoft Entra. 

2009 It refers to other resources you might want to prompt to consent for, 

2010 in the same interaction, but for which you won't get back a 

2011 token for in this particular operation. 

2012 

2013 :param int max_age: 

2014 OPTIONAL. Maximum Authentication Age. 

2015 Specifies the allowable elapsed time in seconds 

2016 since the last time the End-User was actively authenticated. 

2017 If the elapsed time is greater than this value, 

2018 Microsoft identity platform will actively re-authenticate the End-User. 

2019 

2020 MSAL Python will also automatically validate the auth_time in ID token. 

2021 

2022 New in version 1.15. 

2023 

2024 :param int parent_window_handle: 

2025 Required if your app is running on Windows and opted in to use broker. 

2026 

2027 If your app is a GUI app, 

2028 you are recommended to also provide its window handle, 

2029 so that the sign in UI window will properly pop up on top of your window. 

2030 

2031 If your app is a console app (most Python scripts are console apps), 

2032 you can use a placeholder value ``msal.PublicClientApplication.CONSOLE_WINDOW_HANDLE``. 

2033 

2034 New in version 1.20.0. 

2035 

2036 :param function on_before_launching_ui: 

2037 A callback with the form of 

2038 ``lambda ui="xyz", **kwargs: print("A {} will be launched".format(ui))``, 

2039 where ``ui`` will be either "browser" or "broker". 

2040 You can use it to inform your end user to expect a pop-up window. 

2041 

2042 New in version 1.20.0. 

2043 

2044 :param object auth_scheme: 

2045 You can provide an ``msal.auth_scheme.PopAuthScheme`` object 

2046 so that MSAL will get a Proof-of-Possession (POP) token for you. 

2047 

2048 New in version 1.26.0. 

2049 

2050 :return: 

2051 - A dict containing no "error" key, 

2052 and typically contains an "access_token" key. 

2053 - A dict containing an "error" key, when token refresh failed. 

2054 """ 

2055 data = kwargs.pop("data", {}) 

2056 enable_msa_passthrough = kwargs.pop( # MUST remove it from kwargs 

2057 "enable_msa_passthrough", # Keep it as a hidden param, for now. 

2058 # OPTIONAL. MSA-Passthrough is a legacy configuration, 

2059 # needed by a small amount of Microsoft first-party apps, 

2060 # which would login MSA accounts via ".../organizations" authority. 

2061 # If you app belongs to this category, AND you are enabling broker, 

2062 # you would want to enable this flag. Default value is False. 

2063 # More background of MSA-PT is available from this internal docs: 

2064 # https://microsoft.sharepoint.com/:w:/t/Identity-DevEx/EatIUauX3c9Ctw1l7AQ6iM8B5CeBZxc58eoQCE0IuZ0VFw?e=tgc3jP&CID=39c853be-76ea-79d7-ee73-f1b2706ede05 

2065 False 

2066 ) and data.get("token_type") != "ssh-cert" # Work around a known issue as of PyMsalRuntime 0.8 

2067 self._validate_ssh_cert_input_data(data) 

2068 if not on_before_launching_ui: 

2069 on_before_launching_ui = lambda **kwargs: None 

2070 if _is_running_in_cloud_shell() and prompt == "none": 

2071 # Note: _acquire_token_by_cloud_shell() is always silent, 

2072 # so we would not fire on_before_launching_ui() 

2073 return self._acquire_token_by_cloud_shell(scopes, data=data) 

2074 claims = _merge_claims_challenge_and_capabilities( 

2075 self._client_capabilities, claims_challenge) 

2076 if self._enable_broker: 

2077 if parent_window_handle is None: 

2078 raise ValueError( 

2079 "parent_window_handle is required when you opted into using broker. " 

2080 "You need to provide the window handle of your GUI application, " 

2081 "or use msal.PublicClientApplication.CONSOLE_WINDOW_HANDLE " 

2082 "when and only when your application is a console app.") 

2083 if extra_scopes_to_consent: 

2084 logger.warning( 

2085 "Ignoring parameter extra_scopes_to_consent, " 

2086 "which is not supported by broker") 

2087 response = self._acquire_token_interactive_via_broker( 

2088 scopes, 

2089 parent_window_handle, 

2090 enable_msa_passthrough, 

2091 claims, 

2092 data, 

2093 on_before_launching_ui, 

2094 auth_scheme, 

2095 prompt=prompt, 

2096 login_hint=login_hint, 

2097 max_age=max_age, 

2098 ) 

2099 return self._process_broker_response(response, scopes, data) 

2100 

2101 if auth_scheme: 

2102 raise ValueError(self._AUTH_SCHEME_UNSUPPORTED) 

2103 on_before_launching_ui(ui="browser") 

2104 telemetry_context = self._build_telemetry_context( 

2105 self.ACQUIRE_TOKEN_INTERACTIVE) 

2106 response = _clean_up(self.client.obtain_token_by_browser( 

2107 scope=self._decorate_scope(scopes) if scopes else None, 

2108 extra_scope_to_consent=extra_scopes_to_consent, 

2109 redirect_uri="http://localhost:{port}".format( 

2110 # Hardcode the host, for now. AAD portal rejects 127.0.0.1 anyway 

2111 port=port or 0), 

2112 prompt=prompt, 

2113 login_hint=login_hint, 

2114 max_age=max_age, 

2115 timeout=timeout, 

2116 auth_params={ 

2117 "claims": claims, 

2118 "domain_hint": domain_hint, 

2119 }, 

2120 data=dict(data, claims=claims), 

2121 headers=telemetry_context.generate_headers(), 

2122 browser_name=_preferred_browser(), 

2123 **kwargs)) 

2124 if "access_token" in response: 

2125 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

2126 telemetry_context.update_telemetry(response) 

2127 return response 

2128 

2129 def _acquire_token_interactive_via_broker( 

2130 self, 

2131 scopes, # type: list[str] 

2132 parent_window_handle, # type: int 

2133 enable_msa_passthrough, # type: boolean 

2134 claims, # type: str 

2135 data, # type: dict 

2136 on_before_launching_ui, # type: callable 

2137 auth_scheme, # type: object 

2138 prompt=None, 

2139 login_hint=None, # type: Optional[str] 

2140 max_age=None, 

2141 **kwargs): 

2142 from .broker import _signin_interactively, _signin_silently, _acquire_token_silently 

2143 if "welcome_template" in kwargs: 

2144 logger.debug(kwargs["welcome_template"]) # Experimental 

2145 authority = "https://{}/{}".format( 

2146 self.authority.instance, self.authority.tenant) 

2147 validate_authority = "no" if ( 

2148 self.authority._is_known_to_developer 

2149 or self._instance_discovery is False) else None 

2150 # Calls different broker methods to mimic the OIDC behaviors 

2151 if login_hint and prompt != "select_account": # OIDC prompts when the user did not sign in 

2152 accounts = self.get_accounts(username=login_hint) 

2153 if len(accounts) == 1: # Unambiguously proceed with this account 

2154 logger.debug("Calling broker._acquire_token_silently()") 

2155 response = _acquire_token_silently( # When it works, it bypasses prompt 

2156 authority, 

2157 self.client_id, 

2158 accounts[0]["local_account_id"], 

2159 scopes, 

2160 claims=claims, 

2161 auth_scheme=auth_scheme, 

2162 **data) 

2163 if response and "error" not in response: 

2164 return response 

2165 # login_hint undecisive or not exists 

2166 if prompt == "none" or not prompt: # Must/Can attempt _signin_silently() 

2167 logger.debug("Calling broker._signin_silently()") 

2168 response = _signin_silently( # Unlike OIDC, it doesn't honor login_hint 

2169 authority, self.client_id, scopes, 

2170 validateAuthority=validate_authority, 

2171 claims=claims, 

2172 max_age=max_age, 

2173 enable_msa_pt=enable_msa_passthrough, 

2174 auth_scheme=auth_scheme, 

2175 **data) 

2176 is_wrong_account = bool( 

2177 # _signin_silently() only gets tokens for default account, 

2178 # but this seems to have been fixed in PyMsalRuntime 0.11.2 

2179 "access_token" in response and login_hint 

2180 and response.get("id_token_claims", {}) != login_hint) 

2181 wrong_account_error_message = ( 

2182 'prompt="none" will not work for login_hint="non-default-user"') 

2183 if is_wrong_account: 

2184 logger.debug(wrong_account_error_message) 

2185 if prompt == "none": 

2186 return response if not is_wrong_account else { 

2187 "error": "broker_error", 

2188 "error_description": wrong_account_error_message, 

2189 } 

2190 else: 

2191 assert bool(prompt) is False 

2192 from pymsalruntime import Response_Status 

2193 recoverable_errors = frozenset([ 

2194 Response_Status.Status_AccountUnusable, 

2195 Response_Status.Status_InteractionRequired, 

2196 ]) 

2197 if is_wrong_account or "error" in response and response.get( 

2198 "_broker_status") in recoverable_errors: 

2199 pass # It will fall back to the _signin_interactively() 

2200 else: 

2201 return response 

2202 

2203 logger.debug("Falls back to broker._signin_interactively()") 

2204 on_before_launching_ui(ui="broker") 

2205 return _signin_interactively( 

2206 authority, self.client_id, scopes, 

2207 None if parent_window_handle is self.CONSOLE_WINDOW_HANDLE 

2208 else parent_window_handle, 

2209 validateAuthority=validate_authority, 

2210 login_hint=login_hint, 

2211 prompt=prompt, 

2212 claims=claims, 

2213 max_age=max_age, 

2214 enable_msa_pt=enable_msa_passthrough, 

2215 auth_scheme=auth_scheme, 

2216 **data) 

2217 

2218 def initiate_device_flow(self, scopes=None, **kwargs): 

2219 """Initiate a Device Flow instance, 

2220 which will be used in :func:`~acquire_token_by_device_flow`. 

2221 

2222 :param list[str] scopes: 

2223 Scopes requested to access a protected API (a resource). 

2224 :return: A dict representing a newly created Device Flow object. 

2225 

2226 - A successful response would contain "user_code" key, among others 

2227 - an error response would contain some other readable key/value pairs. 

2228 """ 

2229 correlation_id = msal.telemetry._get_new_correlation_id() 

2230 flow = self.client.initiate_device_flow( 

2231 scope=self._decorate_scope(scopes or []), 

2232 headers={msal.telemetry.CLIENT_REQUEST_ID: correlation_id}, 

2233 **kwargs) 

2234 flow[self.DEVICE_FLOW_CORRELATION_ID] = correlation_id 

2235 return flow 

2236 

2237 def acquire_token_by_device_flow(self, flow, claims_challenge=None, **kwargs): 

2238 """Obtain token by a device flow object, with customizable polling effect. 

2239 

2240 :param dict flow: 

2241 A dict previously generated by :func:`~initiate_device_flow`. 

2242 By default, this method's polling effect will block current thread. 

2243 You can abort the polling loop at any time, 

2244 by changing the value of the flow's "expires_at" key to 0. 

2245 :param claims_challenge: 

2246 The claims_challenge parameter requests specific claims requested by the resource provider 

2247 in the form of a claims_challenge directive in the www-authenticate header to be 

2248 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

2249 It is a string of a JSON object which contains lists of claims being requested from these locations. 

2250 

2251 :return: A dict representing the json response from Microsoft Entra: 

2252 

2253 - A successful response would contain "access_token" key, 

2254 - an error response would contain "error" and usually "error_description". 

2255 """ 

2256 telemetry_context = self._build_telemetry_context( 

2257 self.ACQUIRE_TOKEN_BY_DEVICE_FLOW_ID, 

2258 correlation_id=flow.get(self.DEVICE_FLOW_CORRELATION_ID)) 

2259 response = _clean_up(self.client.obtain_token_by_device_flow( 

2260 flow, 

2261 data=dict( 

2262 kwargs.pop("data", {}), 

2263 code=flow["device_code"], # 2018-10-4 Hack: 

2264 # during transition period, 

2265 # service seemingly need both device_code and code parameter. 

2266 claims=_merge_claims_challenge_and_capabilities( 

2267 self._client_capabilities, claims_challenge), 

2268 ), 

2269 headers=telemetry_context.generate_headers(), 

2270 **kwargs)) 

2271 if "access_token" in response: 

2272 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

2273 telemetry_context.update_telemetry(response) 

2274 return response 

2275 

2276 

2277class ConfidentialClientApplication(ClientApplication): # server-side web app 

2278 """Same as :func:`ClientApplication.__init__`, 

2279 except that ``allow_broker`` parameter shall remain ``None``. 

2280 """ 

2281 

2282 def acquire_token_for_client(self, scopes, claims_challenge=None, **kwargs): 

2283 """Acquires token for the current confidential client, not for an end user. 

2284 

2285 Since MSAL Python 1.23, it will automatically look for token from cache, 

2286 and only send request to Identity Provider when cache misses. 

2287 

2288 :param list[str] scopes: (Required) 

2289 Scopes requested to access a protected API (a resource). 

2290 :param claims_challenge: 

2291 The claims_challenge parameter requests specific claims requested by the resource provider 

2292 in the form of a claims_challenge directive in the www-authenticate header to be 

2293 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

2294 It is a string of a JSON object which contains lists of claims being requested from these locations. 

2295 

2296 :return: A dict representing the json response from Microsoft Entra: 

2297 

2298 - A successful response would contain "access_token" key, 

2299 - an error response would contain "error" and usually "error_description". 

2300 """ 

2301 if kwargs.get("force_refresh"): 

2302 raise ValueError( # We choose to disallow force_refresh 

2303 "Historically, this method does not support force_refresh behavior. " 

2304 ) 

2305 return _clean_up(self._acquire_token_silent_with_error( 

2306 scopes, None, claims_challenge=claims_challenge, **kwargs)) 

2307 

2308 def _acquire_token_for_client( 

2309 self, 

2310 scopes, 

2311 refresh_reason, 

2312 claims_challenge=None, 

2313 **kwargs 

2314 ): 

2315 if self.authority.tenant.lower() in ["common", "organizations"]: 

2316 warnings.warn( 

2317 "Using /common or /organizations authority " 

2318 "in acquire_token_for_client() is unreliable. " 

2319 "Please use a specific tenant instead.", DeprecationWarning) 

2320 self._validate_ssh_cert_input_data(kwargs.get("data", {})) 

2321 telemetry_context = self._build_telemetry_context( 

2322 self.ACQUIRE_TOKEN_FOR_CLIENT_ID, refresh_reason=refresh_reason) 

2323 client = self._regional_client or self.client 

2324 response = client.obtain_token_for_client( 

2325 scope=scopes, # This grant flow requires no scope decoration 

2326 headers=telemetry_context.generate_headers(), 

2327 data=dict( 

2328 kwargs.pop("data", {}), 

2329 claims=_merge_claims_challenge_and_capabilities( 

2330 self._client_capabilities, claims_challenge)), 

2331 **kwargs) 

2332 telemetry_context.update_telemetry(response) 

2333 return response 

2334 

2335 def remove_tokens_for_client(self): 

2336 """Remove all tokens that were previously acquired via 

2337 :func:`~acquire_token_for_client()` for the current client.""" 

2338 for env in [self.authority.instance] + self._get_authority_aliases( 

2339 self.authority.instance): 

2340 for at in list(self.token_cache.search( # Remove ATs from a snapshot 

2341 TokenCache.CredentialType.ACCESS_TOKEN, query={ 

2342 "client_id": self.client_id, 

2343 "environment": env, 

2344 "home_account_id": None, # These are mostly app-only tokens 

2345 })): 

2346 self.token_cache.remove_at(at) 

2347 # acquire_token_for_client() obtains no RTs, so we have no RT to remove 

2348 

2349 def acquire_token_on_behalf_of(self, user_assertion, scopes, claims_challenge=None, **kwargs): 

2350 """Acquires token using on-behalf-of (OBO) flow. 

2351 

2352 The current app is a middle-tier service which was called with a token 

2353 representing an end user. 

2354 The current app can use such token (a.k.a. a user assertion) to request 

2355 another token to access downstream web API, on behalf of that user. 

2356 See `detail docs here <https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-on-behalf-of-flow>`_ . 

2357 

2358 The current middle-tier app has no user interaction to obtain consent. 

2359 See how to gain consent upfront for your middle-tier app from this article. 

2360 https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-on-behalf-of-flow#gaining-consent-for-the-middle-tier-application 

2361 

2362 :param str user_assertion: The incoming token already received by this app 

2363 :param list[str] scopes: Scopes required by downstream API (a resource). 

2364 :param claims_challenge: 

2365 The claims_challenge parameter requests specific claims requested by the resource provider 

2366 in the form of a claims_challenge directive in the www-authenticate header to be 

2367 returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. 

2368 It is a string of a JSON object which contains lists of claims being requested from these locations. 

2369 

2370 :return: A dict representing the json response from Microsoft Entra: 

2371 

2372 - A successful response would contain "access_token" key, 

2373 - an error response would contain "error" and usually "error_description". 

2374 """ 

2375 telemetry_context = self._build_telemetry_context( 

2376 self.ACQUIRE_TOKEN_ON_BEHALF_OF_ID) 

2377 # The implementation is NOT based on Token Exchange (RFC 8693) 

2378 response = _clean_up(self.client.obtain_token_by_assertion( # bases on assertion RFC 7521 

2379 user_assertion, 

2380 self.client.GRANT_TYPE_JWT, # IDTs and AAD ATs are all JWTs 

2381 scope=self._decorate_scope(scopes), # Decoration is used for: 

2382 # 1. Explicitly requesting an RT, without relying on AAD default 

2383 # behavior, even though it currently still issues an RT. 

2384 # 2. Requesting an IDT (which would otherwise be unavailable) 

2385 # so that the calling app could use id_token_claims to implement 

2386 # their own cache mapping, which is likely needed in web apps. 

2387 data=dict( 

2388 kwargs.pop("data", {}), 

2389 requested_token_use="on_behalf_of", 

2390 claims=_merge_claims_challenge_and_capabilities( 

2391 self._client_capabilities, claims_challenge)), 

2392 headers=telemetry_context.generate_headers(), 

2393 # TBD: Expose a login_hint (or ccs_routing_hint) param for web app 

2394 **kwargs)) 

2395 if "access_token" in response: 

2396 response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP 

2397 telemetry_context.update_telemetry(response) 

2398 return response