Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pip/_internal/network/auth.py: 26%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

282 statements  

1"""Network Authentication Helpers 

2 

3Contains interface (MultiDomainBasicAuth) and associated glue code for 

4providing credentials in the context of network requests. 

5""" 

6 

7from __future__ import annotations 

8 

9import logging 

10import os 

11import shutil 

12import subprocess 

13import sysconfig 

14import typing 

15import urllib.parse 

16from abc import ABC, abstractmethod 

17from functools import cache 

18from os.path import commonprefix 

19from pathlib import Path 

20from typing import Any, NamedTuple 

21 

22from pip._vendor.requests.auth import AuthBase, HTTPBasicAuth 

23from pip._vendor.requests.utils import get_netrc_auth 

24 

25from pip._internal.utils.logging import getLogger 

26from pip._internal.utils.misc import ( 

27 ask, 

28 ask_input, 

29 ask_password, 

30 remove_auth_from_url, 

31 split_auth_netloc_from_url, 

32) 

33from pip._internal.vcs.versioncontrol import AuthInfo 

34 

35if typing.TYPE_CHECKING: 

36 from pip._vendor.requests import PreparedRequest 

37 from pip._vendor.requests.models import Response 

38 

39logger = getLogger(__name__) 

40 

41KEYRING_DISABLED = False 

42 

43 

44class Credentials(NamedTuple): 

45 url: str 

46 username: str 

47 password: str 

48 

49 

50class KeyRingBaseProvider(ABC): 

51 """Keyring base provider interface""" 

52 

53 has_keyring: bool 

54 

55 @abstractmethod 

56 def get_auth_info(self, url: str, username: str | None) -> AuthInfo | None: ... 

57 

58 @abstractmethod 

59 def save_auth_info(self, url: str, username: str, password: str) -> None: ... 

60 

61 

62class KeyRingNullProvider(KeyRingBaseProvider): 

63 """Keyring null provider""" 

64 

65 has_keyring = False 

66 

67 def get_auth_info(self, url: str, username: str | None) -> AuthInfo | None: 

68 return None 

69 

70 def save_auth_info(self, url: str, username: str, password: str) -> None: 

71 return None 

72 

73 

74class KeyRingPythonProvider(KeyRingBaseProvider): 

75 """Keyring interface which uses locally imported `keyring`""" 

76 

77 has_keyring = True 

78 

79 def __init__(self) -> None: 

80 import keyring 

81 

82 self.keyring = keyring 

83 

84 def get_auth_info(self, url: str, username: str | None) -> AuthInfo | None: 

85 # Support keyring's get_credential interface which supports getting 

86 # credentials without a username. This is only available for 

87 # keyring>=15.2.0. 

88 if hasattr(self.keyring, "get_credential"): 

89 logger.debug("Getting credentials from keyring for %s", url) 

90 cred = self.keyring.get_credential(url, username) 

91 if cred is not None: 

92 return cred.username, cred.password 

93 return None 

94 

95 if username is not None: 

96 logger.debug("Getting password from keyring for %s", url) 

97 password = self.keyring.get_password(url, username) 

98 if password: 

99 return username, password 

100 return None 

101 

102 def save_auth_info(self, url: str, username: str, password: str) -> None: 

103 self.keyring.set_password(url, username, password) 

104 

105 

106class KeyRingCliProvider(KeyRingBaseProvider): 

107 """Provider which uses `keyring` cli 

108 

109 Instead of calling the keyring package installed alongside pip 

110 we call keyring on the command line which will enable pip to 

111 use which ever installation of keyring is available first in 

112 PATH. 

113 """ 

114 

115 has_keyring = True 

116 

117 def __init__(self, cmd: str) -> None: 

118 self.keyring = cmd 

119 

120 def get_auth_info(self, url: str, username: str | None) -> AuthInfo | None: 

121 # This is the default implementation of keyring.get_credential 

122 # https://github.com/jaraco/keyring/blob/97689324abcf01bd1793d49063e7ca01e03d7d07/keyring/backend.py#L134-L139 

123 if username is not None: 

124 password = self._get_password(url, username) 

125 if password is not None: 

126 return username, password 

127 return None 

128 

129 def save_auth_info(self, url: str, username: str, password: str) -> None: 

130 return self._set_password(url, username, password) 

131 

132 def _get_password(self, service_name: str, username: str) -> str | None: 

133 """Mirror the implementation of keyring.get_password using cli""" 

134 if self.keyring is None: 

135 return None 

136 

137 cmd = [self.keyring, "get", service_name, username] 

138 env = os.environ.copy() 

139 env["PYTHONIOENCODING"] = "utf-8" 

140 res = subprocess.run( 

141 cmd, 

142 stdin=subprocess.DEVNULL, 

143 stdout=subprocess.PIPE, 

144 env=env, 

145 ) 

146 if res.returncode: 

147 return None 

148 return res.stdout.decode("utf-8").strip(os.linesep) 

149 

150 def _set_password(self, service_name: str, username: str, password: str) -> None: 

151 """Mirror the implementation of keyring.set_password using cli""" 

152 if self.keyring is None: 

153 return None 

154 env = os.environ.copy() 

155 env["PYTHONIOENCODING"] = "utf-8" 

156 subprocess.run( 

157 [self.keyring, "set", service_name, username], 

158 input=f"{password}{os.linesep}".encode(), 

159 env=env, 

160 check=True, 

161 ) 

162 return None 

163 

164 

165@cache 

166def get_keyring_provider(provider: str) -> KeyRingBaseProvider: 

167 logger.verbose("Keyring provider requested: %s", provider) 

168 

169 # keyring has previously failed and been disabled 

170 if KEYRING_DISABLED: 

171 provider = "disabled" 

172 if provider in ["import", "auto"]: 

173 try: 

174 impl = KeyRingPythonProvider() 

175 logger.verbose("Keyring provider set: import") 

176 return impl 

177 except ImportError: 

178 pass 

179 except Exception as exc: 

180 # In the event of an unexpected exception 

181 # we should warn the user 

182 msg = "Installed copy of keyring fails with exception %s" 

183 if provider == "auto": 

184 msg = msg + ", trying to find a keyring executable as a fallback" 

185 logger.warning(msg, exc, exc_info=logger.isEnabledFor(logging.DEBUG)) 

186 if provider in ["subprocess", "auto"]: 

187 cli = shutil.which("keyring") 

188 if cli and cli.startswith(sysconfig.get_path("scripts")): 

189 # all code within this function is stolen from shutil.which implementation 

190 @typing.no_type_check 

191 def PATH_as_shutil_which_determines_it() -> str: 

192 path = os.environ.get("PATH", None) 

193 if path is None: 

194 try: 

195 path = os.confstr("CS_PATH") 

196 except (AttributeError, ValueError): 

197 # os.confstr() or CS_PATH is not available 

198 path = os.defpath 

199 # bpo-35755: Don't use os.defpath if the PATH environment variable is 

200 # set to an empty string 

201 

202 return path 

203 

204 scripts = Path(sysconfig.get_path("scripts")) 

205 

206 paths = [] 

207 for path in PATH_as_shutil_which_determines_it().split(os.pathsep): 

208 p = Path(path) 

209 try: 

210 if not p.samefile(scripts): 

211 paths.append(path) 

212 except FileNotFoundError: 

213 pass 

214 

215 path = os.pathsep.join(paths) 

216 

217 cli = shutil.which("keyring", path=path) 

218 

219 if cli: 

220 logger.verbose("Keyring provider set: subprocess with executable %s", cli) 

221 return KeyRingCliProvider(cli) 

222 

223 logger.verbose("Keyring provider set: disabled") 

224 return KeyRingNullProvider() 

225 

226 

227class MultiDomainBasicAuth(AuthBase): 

228 def __init__( 

229 self, 

230 prompting: bool = True, 

231 index_urls: list[str] | None = None, 

232 keyring_provider: str = "auto", 

233 ) -> None: 

234 self.prompting = prompting 

235 self.index_urls = index_urls 

236 self.keyring_provider = keyring_provider 

237 self.passwords: dict[str, AuthInfo] = {} 

238 # When the user is prompted to enter credentials and keyring is 

239 # available, we will offer to save them. If the user accepts, 

240 # this value is set to the credentials they entered. After the 

241 # request authenticates, the caller should call 

242 # ``save_credentials`` to save these. 

243 self._credentials_to_save: Credentials | None = None 

244 

245 @property 

246 def keyring_provider(self) -> KeyRingBaseProvider: 

247 return get_keyring_provider(self._keyring_provider) 

248 

249 @keyring_provider.setter 

250 def keyring_provider(self, provider: str) -> None: 

251 # The free function get_keyring_provider has been decorated with 

252 # functools.cache. If an exception occurs in get_keyring_auth that 

253 # cache will be cleared and keyring disabled, take that into account 

254 # if you want to remove this indirection. 

255 self._keyring_provider = provider 

256 

257 @property 

258 def use_keyring(self) -> bool: 

259 # We won't use keyring when --no-input is passed unless 

260 # a specific provider is requested because it might require 

261 # user interaction 

262 return self.prompting or self._keyring_provider not in ["auto", "disabled"] 

263 

264 def _get_keyring_auth( 

265 self, 

266 url: str | None, 

267 username: str | None, 

268 ) -> AuthInfo | None: 

269 """Return the tuple auth for a given url from keyring.""" 

270 # Do nothing if no url was provided 

271 if not url: 

272 return None 

273 

274 try: 

275 return self.keyring_provider.get_auth_info(url, username) 

276 except Exception as exc: 

277 # Log the full exception (with stacktrace) at debug, so it'll only 

278 # show up when running in verbose mode. 

279 logger.debug("Keyring is skipped due to an exception", exc_info=True) 

280 # Always log a shortened version of the exception. 

281 logger.warning( 

282 "Keyring is skipped due to an exception: %s", 

283 str(exc), 

284 ) 

285 global KEYRING_DISABLED 

286 KEYRING_DISABLED = True 

287 get_keyring_provider.cache_clear() 

288 return None 

289 

290 def _get_index_url(self, url: str) -> str | None: 

291 """Return the original index URL matching the requested URL. 

292 

293 Cached or dynamically generated credentials may work against 

294 the original index URL rather than just the netloc. 

295 

296 The provided url should have had its username and password 

297 removed already. If the original index url had credentials then 

298 they will be included in the return value. 

299 

300 Returns None if no matching index was found, or if --no-index 

301 was specified by the user. 

302 """ 

303 if not url or not self.index_urls: 

304 return None 

305 

306 url = remove_auth_from_url(url).rstrip("/") + "/" 

307 parsed_url = urllib.parse.urlsplit(url) 

308 

309 candidates = [] 

310 

311 for index in self.index_urls: 

312 index = index.rstrip("/") + "/" 

313 parsed_index = urllib.parse.urlsplit(remove_auth_from_url(index)) 

314 if parsed_url == parsed_index: 

315 return index 

316 

317 if parsed_url.netloc != parsed_index.netloc: 

318 continue 

319 

320 candidate = urllib.parse.urlsplit(index) 

321 candidates.append(candidate) 

322 

323 if not candidates: 

324 return None 

325 

326 candidates.sort( 

327 reverse=True, 

328 key=lambda candidate: commonprefix( 

329 [ 

330 parsed_url.path, 

331 candidate.path, 

332 ] 

333 ).rfind("/"), 

334 ) 

335 

336 return urllib.parse.urlunsplit(candidates[0]) 

337 

338 def _get_new_credentials( 

339 self, 

340 original_url: str, 

341 *, 

342 allow_netrc: bool = True, 

343 allow_keyring: bool = False, 

344 ) -> AuthInfo: 

345 """Find and return credentials for the specified URL.""" 

346 # Split the credentials and netloc from the url. 

347 url, netloc, url_user_password = split_auth_netloc_from_url( 

348 original_url, 

349 ) 

350 

351 # Start with the credentials embedded in the url 

352 username, password = url_user_password 

353 if username is not None and password is not None: 

354 logger.debug("Found credentials in url for %s", netloc) 

355 return url_user_password 

356 

357 # Find a matching index url for this request 

358 index_url = self._get_index_url(url) 

359 if index_url: 

360 # Split the credentials from the url. 

361 index_info = split_auth_netloc_from_url(index_url) 

362 if index_info: 

363 index_url, _, index_url_user_password = index_info 

364 logger.debug("Found index url %s", index_url) 

365 

366 # If an index URL was found, try its embedded credentials 

367 if index_url and index_url_user_password[0] is not None: 

368 username, password = index_url_user_password 

369 if username is not None and password is not None: 

370 logger.debug("Found credentials in index url for %s", netloc) 

371 return index_url_user_password 

372 

373 # Get creds from netrc if we still don't have them 

374 if allow_netrc: 

375 netrc_auth = get_netrc_auth(original_url) 

376 if netrc_auth: 

377 logger.debug("Found credentials in netrc for %s", netloc) 

378 return netrc_auth 

379 

380 # If we don't have a password and keyring is available, use it. 

381 if allow_keyring: 

382 # The index url is more specific than the netloc, so try it first 

383 # fmt: off 

384 kr_auth = ( 

385 self._get_keyring_auth(index_url, username) or 

386 self._get_keyring_auth(netloc, username) 

387 ) 

388 # fmt: on 

389 if kr_auth: 

390 logger.debug("Found credentials in keyring for %s", netloc) 

391 return kr_auth 

392 

393 return username, password 

394 

395 def _get_url_and_credentials( 

396 self, original_url: str 

397 ) -> tuple[str, str | None, str | None]: 

398 """Return the credentials to use for the provided URL. 

399 

400 If allowed, netrc and keyring may be used to obtain the 

401 correct credentials. 

402 

403 Returns (url_without_credentials, username, password). Note 

404 that even if the original URL contains credentials, this 

405 function may return a different username and password. 

406 """ 

407 url, netloc, _ = split_auth_netloc_from_url(original_url) 

408 

409 # Try to get credentials from original url 

410 username, password = self._get_new_credentials(original_url) 

411 

412 # If credentials not found, use any stored credentials for this netloc. 

413 # Do this if either the username or the password is missing. 

414 # This accounts for the situation in which the user has specified 

415 # the username in the index url, but the password comes from keyring. 

416 if (username is None or password is None) and netloc in self.passwords: 

417 un, pw = self.passwords[netloc] 

418 # It is possible that the cached credentials are for a different username, 

419 # in which case the cache should be ignored. 

420 if username is None or username == un: 

421 username, password = un, pw 

422 

423 if username is not None or password is not None: 

424 # Convert the username and password if they're None, so that 

425 # this netloc will show up as "cached" in the conditional above. 

426 # Further, HTTPBasicAuth doesn't accept None, so it makes sense to 

427 # cache the value that is going to be used. 

428 username = username or "" 

429 password = password or "" 

430 

431 # Store any acquired credentials. 

432 self.passwords[netloc] = (username, password) 

433 

434 assert ( 

435 # Credentials were found 

436 (username is not None and password is not None) 

437 # Credentials were not found 

438 or (username is None and password is None) 

439 ), f"Could not load credentials from url: {original_url}" 

440 

441 return url, username, password 

442 

443 def __call__(self, req: PreparedRequest) -> PreparedRequest: 

444 # Get credentials for this request 

445 assert req.url is not None 

446 url, username, password = self._get_url_and_credentials(req.url) 

447 

448 # Set the url of the request to the url without any credentials 

449 req.url = url 

450 

451 if username is not None and password is not None: 

452 # Send the basic auth with this request 

453 req = HTTPBasicAuth(username, password)(req) 

454 

455 # Attach a hook to handle 401 responses 

456 req.register_hook("response", self.handle_401) 

457 

458 return req 

459 

460 # Factored out to allow for easy patching in tests 

461 def _prompt_for_password(self, netloc: str) -> tuple[str | None, str | None, bool]: 

462 username = ask_input(f"User for {netloc}: ") if self.prompting else None 

463 if not username: 

464 return None, None, False 

465 if self.use_keyring: 

466 auth = self._get_keyring_auth(netloc, username) 

467 if auth and auth[0] is not None and auth[1] is not None: 

468 return auth[0], auth[1], False 

469 password = ask_password("Password: ") 

470 return username, password, True 

471 

472 # Factored out to allow for easy patching in tests 

473 def _should_save_password_to_keyring(self) -> bool: 

474 if ( 

475 not self.prompting 

476 or not self.use_keyring 

477 or not self.keyring_provider.has_keyring 

478 ): 

479 return False 

480 return ask("Save credentials to keyring [y/N]: ", ["y", "n"]) == "y" 

481 

482 def handle_401(self, resp: Response, **kwargs: Any) -> Response: 

483 # We only care about 401 responses, anything else we want to just 

484 # pass through the actual response 

485 if resp.status_code != 401: 

486 return resp 

487 

488 username, password = None, None 

489 

490 # Query the keyring for credentials: 

491 if self.use_keyring: 

492 username, password = self._get_new_credentials( 

493 resp.url, 

494 allow_netrc=False, 

495 allow_keyring=True, 

496 ) 

497 

498 # We are not able to prompt the user so simply return the response 

499 if not self.prompting and not username and not password: 

500 return resp 

501 

502 parsed = urllib.parse.urlparse(resp.url) 

503 

504 # Prompt the user for a new username and password 

505 save = False 

506 if not username and not password: 

507 username, password, save = self._prompt_for_password(parsed.netloc) 

508 

509 # Store the new username and password to use for future requests 

510 self._credentials_to_save = None 

511 if username is not None and password is not None: 

512 self.passwords[parsed.netloc] = (username, password) 

513 

514 # Prompt to save the password to keyring 

515 if save and self._should_save_password_to_keyring(): 

516 self._credentials_to_save = Credentials( 

517 url=parsed.netloc, 

518 username=username, 

519 password=password, 

520 ) 

521 

522 # Consume content and release the original connection to allow our new 

523 # request to reuse the same one. 

524 # The result of the assignment isn't used, it's just needed to consume 

525 # the content. 

526 _ = resp.content 

527 resp.raw.release_conn() 

528 

529 # Add our new username and password to the request 

530 req = HTTPBasicAuth(username or "", password or "")(resp.request) 

531 req.register_hook("response", self.warn_on_401) 

532 

533 # On successful request, save the credentials that were used to 

534 # keyring. (Note that if the user responded "no" above, this member 

535 # is not set and nothing will be saved.) 

536 if self._credentials_to_save: 

537 req.register_hook("response", self.save_credentials) 

538 

539 # Send our new request 

540 new_resp = resp.connection.send(req, **kwargs) 

541 new_resp.history.append(resp) 

542 

543 return new_resp 

544 

545 def warn_on_401(self, resp: Response, **kwargs: Any) -> None: 

546 """Response callback to warn about incorrect credentials.""" 

547 if resp.status_code == 401: 

548 logger.warning( 

549 "401 Error, Credentials not correct for %s", 

550 resp.request.url, 

551 ) 

552 

553 def save_credentials(self, resp: Response, **kwargs: Any) -> None: 

554 """Response callback to save credentials on success.""" 

555 assert ( 

556 self.keyring_provider.has_keyring 

557 ), "should never reach here without keyring" 

558 

559 creds = self._credentials_to_save 

560 self._credentials_to_save = None 

561 if creds and resp.status_code < 400: 

562 try: 

563 logger.info("Saving credentials to keyring") 

564 self.keyring_provider.save_auth_info( 

565 creds.url, creds.username, creds.password 

566 ) 

567 except Exception: 

568 logger.exception("Failed to save credentials")